1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | /* |
40 | |
41 | Managing the tree shape: How insertion, deletion, and querying work |
42 | |
43 | When we insert a message into the FT_HANDLE, here's what happens. |
44 | |
45 | to insert a message at the root |
46 | |
47 | - find the root node |
48 | - capture the next msn of the root node and assign it to the message |
49 | - split the root if it needs to be split |
50 | - insert the message into the root buffer |
51 | - if the root is too full, then toku_ft_flush_some_child() of the root on a flusher thread |
52 | |
53 | flusher functions use an advice struct with provides some functions to |
54 | call that tell it what to do based on the context of the flush. see ft-flusher.h |
55 | |
56 | to flush some child, given a parent and some advice |
57 | - pick the child using advice->pick_child() |
58 | - remove that childs buffer from the parent |
59 | - flush the buffer to the child |
60 | - if the child has stable reactivity and |
61 | advice->should_recursively_flush() is true, then |
62 | toku_ft_flush_some_child() of the child |
63 | - otherwise split the child if it needs to be split |
64 | - otherwise maybe merge the child if it needs to be merged |
65 | |
66 | flusher threads: |
67 | |
68 | flusher threads are created on demand as the result of internal nodes |
69 | becoming gorged by insertions. this allows flushing to be done somewhere |
70 | other than the client thread. these work items are enqueued onto |
71 | the cachetable kibbutz and are done in a first in first out order. |
72 | |
73 | cleaner threads: |
74 | |
75 | the cleaner thread wakes up every so often (say, 1 second) and chooses |
76 | a small number (say, 5) of nodes as candidates for a flush. the one |
77 | with the largest cache pressure is chosen to be flushed. cache pressure |
78 | is a function of the size of the node in the cachetable plus the work done. |
79 | the cleaner thread need not actually do a flush when awoken, so only |
80 | nodes that have sufficient cache pressure are flushed. |
81 | |
82 | checkpointing: |
83 | |
84 | the checkpoint thread wakes up every minute to checkpoint dirty nodes |
85 | to disk. at the time of this writing, nodes during checkpoint are |
86 | locked and cannot be queried or flushed to. a design in which nodes |
87 | are copied before checkpoint is being considered as a way to reduce |
88 | the performance variability caused by a checkpoint locking too |
89 | many nodes and preventing other threads from traversing down the tree, |
90 | for a query or otherwise. |
91 | |
92 | To shrink a file: Let X be the size of the reachable data. |
93 | We define an acceptable bloat constant of C. For example we set C=2 if we are willing to allow the file to be as much as 2X in size. |
94 | The goal is to find the smallest amount of stuff we can move to get the file down to size CX. |
95 | That seems like a difficult problem, so we use the following heuristics: |
96 | If we can relocate the last block to an lower location, then do so immediately. (The file gets smaller right away, so even though the new location |
97 | may even not be in the first CX bytes, we are making the file smaller.) |
98 | Otherwise all of the earlier blocks are smaller than the last block (of size L). So find the smallest region that has L free bytes in it. |
99 | (This can be computed in one pass) |
100 | Move the first allocated block in that region to some location not in the interior of the region. |
101 | (Outside of the region is OK, and reallocating the block at the edge of the region is OK). |
102 | This has the effect of creating a smaller region with at least L free bytes in it. |
103 | Go back to the top (because by now some other block may have been allocated or freed). |
104 | Claim: if there are no other allocations going on concurrently, then this algorithm will shrink the file reasonably efficiently. By this I mean that |
105 | each block of shrinkage does the smallest amount of work possible. That doesn't mean that the work overall is minimized. |
106 | Note: If there are other allocations and deallocations going on concurrently, we might never get enough space to move the last block. But it takes a lot |
107 | of allocations and deallocations to make that happen, and it's probably reasonable for the file not to shrink in this case. |
108 | |
109 | To split or merge a child of a node: |
110 | Split_or_merge (node, childnum) { |
111 | If the child needs to be split (it's a leaf with too much stuff or a nonleaf with too much fanout) |
112 | fetch the node and the child into main memory. |
113 | split the child, producing two nodes A and B, and also a pivot. Don't worry if the resulting child is still too big or too small. Fix it on the next pass. |
114 | fixup node to point at the two new children. Don't worry about the node getting too much fanout. |
115 | return; |
116 | If the child needs to be merged (it's a leaf with too little stuff (less than 1/4 full) or a nonleaf with too little fanout (less than 1/4) |
117 | fetch node, the child and a sibling of the child into main memory. |
118 | move all messages from the node to the two children (so that the message buffers are empty) |
119 | If the two siblings together fit into one node then |
120 | merge the two siblings. |
121 | fixup the node to point at one child |
122 | Otherwise |
123 | load balance the content of the two nodes |
124 | Don't worry about the resulting children having too many messages or otherwise being too big or too small. Fix it on the next pass. |
125 | } |
126 | } |
127 | |
128 | Here's how querying works: |
129 | |
130 | lookups: |
131 | - As of Dr. No, we don't do any tree shaping on lookup. |
132 | - We don't promote eagerly or use aggressive promotion or passive-aggressive |
133 | promotion. We just push messages down according to the traditional FT_HANDLE |
134 | algorithm on insertions. |
135 | - when a node is brought into memory, we apply ancestor messages above it. |
136 | |
137 | basement nodes, bulk fetch, and partial fetch: |
138 | - leaf nodes are comprised of N basement nodes, each of nominal size. when |
139 | a query hits a leaf node. it may require one or more basement nodes to be in memory. |
140 | - for point queries, we do not read the entire node into memory. instead, |
141 | we only read in the required basement node |
142 | - for range queries, cursors may return cursor continue in their callback |
143 | to take a the shortcut path until the end of the basement node. |
144 | - for range queries, cursors may prelock a range of keys (with or without a txn). |
145 | the fractal tree will prefetch nodes aggressively until the end of the range. |
146 | - without a prelocked range, range queries behave like successive point queries. |
147 | |
148 | */ |
149 | |
150 | #include <my_global.h> |
151 | #include "ft/cachetable/checkpoint.h" |
152 | #include "ft/cursor.h" |
153 | #include "ft/ft-cachetable-wrappers.h" |
154 | #include "ft/ft-flusher.h" |
155 | #include "ft/ft-internal.h" |
156 | #include "ft/ft.h" |
157 | #include "ft/leafentry.h" |
158 | #include "ft/logger/log-internal.h" |
159 | #include "ft/msg.h" |
160 | #include "ft/node.h" |
161 | #include "ft/serialize/block_table.h" |
162 | #include "ft/serialize/ft-serialize.h" |
163 | #include "ft/serialize/ft_layout_version.h" |
164 | #include "ft/serialize/ft_node-serialize.h" |
165 | #include "ft/serialize/sub_block.h" |
166 | #include "ft/txn/txn_manager.h" |
167 | #include "ft/txn/xids.h" |
168 | #include "ft/ule.h" |
169 | #include "src/ydb-internal.h" |
170 | |
171 | #include <toku_race_tools.h> |
172 | |
173 | #include <portability/toku_atomic.h> |
174 | |
175 | #include <util/context.h> |
176 | #include <util/mempool.h> |
177 | #include <util/status.h> |
178 | #include <util/rwlock.h> |
179 | #include <util/sort.h> |
180 | #include <util/scoped_malloc.h> |
181 | |
182 | #include <stdint.h> |
183 | |
184 | #include <memory> |
185 | /* Status is intended for display to humans to help understand system behavior. |
186 | * It does not need to be perfectly thread-safe. |
187 | */ |
188 | |
189 | static toku_mutex_t ft_open_close_lock; |
190 | static toku_instr_key *ft_open_close_lock_mutex_key; |
191 | // FIXME: the instrumentation keys below are defined here even though they |
192 | // belong to other modules, because they are registered here. If desired, they |
193 | // can be moved to their proper modules and registration done there in a |
194 | // one-time init function |
195 | // locktree |
196 | toku_instr_key *treenode_mutex_key; |
197 | toku_instr_key *manager_mutex_key; |
198 | toku_instr_key *manager_escalation_mutex_key; |
199 | toku_instr_key *manager_escalator_mutex_key; |
200 | // src |
201 | toku_instr_key *db_txn_struct_i_txn_mutex_key; |
202 | toku_instr_key *indexer_i_indexer_lock_mutex_key; |
203 | toku_instr_key *indexer_i_indexer_estimate_lock_mutex_key; |
204 | toku_instr_key *result_i_open_dbs_rwlock_key; |
205 | // locktree |
206 | toku_instr_key *lock_request_m_wait_cond_key; |
207 | toku_instr_key *manager_m_escalator_done_key; |
208 | toku_instr_key *locktree_request_info_mutex_key; |
209 | toku_instr_key *locktree_request_info_retry_mutex_key; |
210 | toku_instr_key *locktree_request_info_retry_cv_key; |
211 | |
212 | // this is a sample probe for custom instrumentation |
213 | static toku_instr_key *fti_probe_1_key; |
214 | |
215 | // This is a sample probe for custom instrumentation |
216 | toku_instr_probe *toku_instr_probe_1; |
217 | |
218 | void toku_ft_get_status(FT_STATUS s) { |
219 | ft_status.init(); |
220 | *s = ft_status; |
221 | |
222 | // Calculate compression ratios for leaf and nonleaf nodes |
223 | const double compressed_leaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_BYTES) + |
224 | FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT); |
225 | const double uncompressed_leaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES) + |
226 | FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT); |
227 | const double compressed_nonleaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_BYTES) + |
228 | FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT); |
229 | const double uncompressed_nonleaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES) + |
230 | FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT); |
231 | |
232 | if (compressed_leaf_bytes > 0) { |
233 | s->status[FT_STATUS_S::FT_DISK_FLUSH_LEAF_COMPRESSION_RATIO].value.dnum |
234 | = uncompressed_leaf_bytes / compressed_leaf_bytes; |
235 | } |
236 | if (compressed_nonleaf_bytes > 0) { |
237 | s->status[FT_STATUS_S::FT_DISK_FLUSH_NONLEAF_COMPRESSION_RATIO].value.dnum |
238 | = uncompressed_nonleaf_bytes / compressed_nonleaf_bytes; |
239 | } |
240 | if (compressed_leaf_bytes > 0 || compressed_nonleaf_bytes > 0) { |
241 | s->status[FT_STATUS_S::FT_DISK_FLUSH_OVERALL_COMPRESSION_RATIO].value.dnum |
242 | = (uncompressed_leaf_bytes + uncompressed_nonleaf_bytes) / |
243 | (compressed_leaf_bytes + compressed_nonleaf_bytes); |
244 | } |
245 | } |
246 | |
247 | void toku_note_deserialized_basement_node(bool fixed_key_size) { |
248 | if (fixed_key_size) { |
249 | FT_STATUS_INC(FT_BASEMENT_DESERIALIZE_FIXED_KEYSIZE, 1); |
250 | } else { |
251 | FT_STATUS_INC(FT_BASEMENT_DESERIALIZE_VARIABLE_KEYSIZE, 1); |
252 | } |
253 | } |
254 | |
255 | static void ft_verify_flags(FT UU(ft), FTNODE UU(node)) { |
256 | paranoid_invariant(ft->h->flags == node->flags); |
257 | } |
258 | |
259 | int toku_ft_debug_mode = 0; |
260 | |
261 | uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) { |
262 | paranoid_invariant(node->height>0); |
263 | paranoid_invariant(childnum<node->n_children); |
264 | return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum)); |
265 | } |
266 | |
267 | // |
268 | // pivot bounds |
269 | // TODO: move me to ft/node.cc? |
270 | // |
271 | |
272 | pivot_bounds::pivot_bounds(const DBT &lbe_dbt, const DBT &ubi_dbt) : |
273 | _lower_bound_exclusive(lbe_dbt), _upper_bound_inclusive(ubi_dbt) { |
274 | } |
275 | |
276 | pivot_bounds pivot_bounds::infinite_bounds() { |
277 | DBT dbt; |
278 | toku_init_dbt(&dbt); |
279 | |
280 | // infinity is represented by an empty dbt |
281 | invariant(toku_dbt_is_empty(&dbt)); |
282 | return pivot_bounds(dbt, dbt); |
283 | } |
284 | |
285 | const DBT *pivot_bounds::lbe() const { |
286 | return &_lower_bound_exclusive; |
287 | } |
288 | |
289 | const DBT *pivot_bounds::ubi() const { |
290 | return &_upper_bound_inclusive; |
291 | } |
292 | |
293 | DBT pivot_bounds::_prepivotkey(FTNODE node, int childnum, const DBT &lbe_dbt) const { |
294 | if (childnum == 0) { |
295 | return lbe_dbt; |
296 | } else { |
297 | return node->pivotkeys.get_pivot(childnum - 1); |
298 | } |
299 | } |
300 | |
301 | DBT pivot_bounds::_postpivotkey(FTNODE node, int childnum, const DBT &ubi_dbt) const { |
302 | if (childnum + 1 == node->n_children) { |
303 | return ubi_dbt; |
304 | } else { |
305 | return node->pivotkeys.get_pivot(childnum); |
306 | } |
307 | } |
308 | |
309 | pivot_bounds pivot_bounds::next_bounds(FTNODE node, int childnum) const { |
310 | return pivot_bounds(_prepivotkey(node, childnum, _lower_bound_exclusive), |
311 | _postpivotkey(node, childnum, _upper_bound_inclusive)); |
312 | } |
313 | |
314 | //////////////////////////////////////////////////////////////////////////////// |
315 | |
316 | static long get_avail_internal_node_partition_size(FTNODE node, int i) { |
317 | paranoid_invariant(node->height > 0); |
318 | return toku_bnc_memory_size(BNC(node, i)); |
319 | } |
320 | |
321 | static long ftnode_cachepressure_size(FTNODE node) { |
322 | long retval = 0; |
323 | bool totally_empty = true; |
324 | if (node->height == 0) { |
325 | goto exit; |
326 | } |
327 | else { |
328 | for (int i = 0; i < node->n_children; i++) { |
329 | if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) { |
330 | continue; |
331 | } |
332 | else if (BP_STATE(node,i) == PT_COMPRESSED) { |
333 | SUB_BLOCK sb = BSB(node, i); |
334 | totally_empty = false; |
335 | retval += sb->compressed_size; |
336 | } |
337 | else if (BP_STATE(node,i) == PT_AVAIL) { |
338 | totally_empty = totally_empty && (toku_bnc_n_entries(BNC(node, i)) == 0); |
339 | retval += get_avail_internal_node_partition_size(node, i); |
340 | retval += BP_WORKDONE(node, i); |
341 | } |
342 | else { |
343 | abort(); |
344 | } |
345 | } |
346 | } |
347 | exit: |
348 | if (totally_empty) { |
349 | return 0; |
350 | } |
351 | return retval; |
352 | } |
353 | |
354 | static long |
355 | ftnode_memory_size (FTNODE node) |
356 | // Effect: Estimate how much main memory a node requires. |
357 | { |
358 | long retval = 0; |
359 | int n_children = node->n_children; |
360 | retval += sizeof(*node); |
361 | retval += (n_children)*(sizeof(node->bp[0])); |
362 | retval += node->pivotkeys.total_size(); |
363 | |
364 | // now calculate the sizes of the partitions |
365 | for (int i = 0; i < n_children; i++) { |
366 | if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) { |
367 | continue; |
368 | } |
369 | else if (BP_STATE(node,i) == PT_COMPRESSED) { |
370 | SUB_BLOCK sb = BSB(node, i); |
371 | retval += sizeof(*sb); |
372 | retval += sb->compressed_size; |
373 | } |
374 | else if (BP_STATE(node,i) == PT_AVAIL) { |
375 | if (node->height > 0) { |
376 | retval += get_avail_internal_node_partition_size(node, i); |
377 | } |
378 | else { |
379 | BASEMENTNODE bn = BLB(node, i); |
380 | retval += sizeof(*bn); |
381 | retval += BLB_DATA(node, i)->get_memory_size(); |
382 | } |
383 | } |
384 | else { |
385 | abort(); |
386 | } |
387 | } |
388 | return retval; |
389 | } |
390 | |
391 | PAIR_ATTR make_ftnode_pair_attr(FTNODE node) { |
392 | long size = ftnode_memory_size(node); |
393 | long cachepressure_size = ftnode_cachepressure_size(node); |
394 | PAIR_ATTR result={ |
395 | .size = size, |
396 | .nonleaf_size = (node->height > 0) ? size : 0, |
397 | .leaf_size = (node->height > 0) ? 0 : size, |
398 | .rollback_size = 0, |
399 | .cache_pressure_size = cachepressure_size, |
400 | .is_valid = true |
401 | }; |
402 | return result; |
403 | } |
404 | |
405 | PAIR_ATTR make_invalid_pair_attr(void) { |
406 | PAIR_ATTR result={ |
407 | .size = 0, |
408 | .nonleaf_size = 0, |
409 | .leaf_size = 0, |
410 | .rollback_size = 0, |
411 | .cache_pressure_size = 0, |
412 | .is_valid = false |
413 | }; |
414 | return result; |
415 | } |
416 | |
417 | |
418 | // assign unique dictionary id |
419 | static uint64_t dict_id_serial = 1; |
420 | static DICTIONARY_ID |
421 | next_dict_id(void) { |
422 | uint64_t i = toku_sync_fetch_and_add(&dict_id_serial, 1); |
423 | assert(i); // guarantee unique dictionary id by asserting 64-bit counter never wraps |
424 | DICTIONARY_ID d = {.dictid = i}; |
425 | return d; |
426 | } |
427 | |
428 | // TODO: This isn't so pretty |
429 | void ftnode_fetch_extra::(FT ft_) { |
430 | ft = ft_; |
431 | type = ftnode_fetch_none; |
432 | search = nullptr; |
433 | |
434 | toku_init_dbt(&range_lock_left_key); |
435 | toku_init_dbt(&range_lock_right_key); |
436 | left_is_neg_infty = false; |
437 | right_is_pos_infty = false; |
438 | |
439 | // -1 means 'unknown', which is the correct default state |
440 | child_to_read = -1; |
441 | disable_prefetching = false; |
442 | read_all_partitions = false; |
443 | |
444 | bytes_read = 0; |
445 | io_time = 0; |
446 | deserialize_time = 0; |
447 | decompress_time = 0; |
448 | } |
449 | |
450 | void ftnode_fetch_extra::(FT ft_) { |
451 | _create_internal(ft_); |
452 | |
453 | type = ftnode_fetch_all; |
454 | } |
455 | |
456 | void ftnode_fetch_extra::(FT ft_, const DBT *left, const DBT *right, |
457 | bool disable_prefetching_, bool read_all_partitions_) { |
458 | _create_internal(ft_); |
459 | invariant(ft->h->type == FT_CURRENT); |
460 | |
461 | type = ftnode_fetch_keymatch; |
462 | if (left != nullptr) { |
463 | toku_copyref_dbt(&range_lock_left_key, *left); |
464 | } |
465 | if (right != nullptr) { |
466 | toku_copyref_dbt(&range_lock_right_key, *right); |
467 | } |
468 | left_is_neg_infty = left == nullptr; |
469 | right_is_pos_infty = right == nullptr; |
470 | disable_prefetching = disable_prefetching_; |
471 | read_all_partitions = read_all_partitions_; |
472 | } |
473 | |
474 | void ftnode_fetch_extra::(FT ft_, ft_search *search_, |
475 | const DBT *left, const DBT *right, |
476 | bool left_is_neg_infty_, bool right_is_pos_infty_, |
477 | bool disable_prefetching_, bool read_all_partitions_) { |
478 | _create_internal(ft_); |
479 | invariant(ft->h->type == FT_CURRENT); |
480 | |
481 | type = ftnode_fetch_subset; |
482 | search = search_; |
483 | if (left != nullptr) { |
484 | toku_copyref_dbt(&range_lock_left_key, *left); |
485 | } |
486 | if (right != nullptr) { |
487 | toku_copyref_dbt(&range_lock_right_key, *right); |
488 | } |
489 | left_is_neg_infty = left_is_neg_infty_; |
490 | right_is_pos_infty = right_is_pos_infty_; |
491 | disable_prefetching = disable_prefetching_; |
492 | read_all_partitions = read_all_partitions_; |
493 | } |
494 | |
495 | void ftnode_fetch_extra::(FT ft_) { |
496 | _create_internal(ft_); |
497 | invariant(ft->h->type == FT_CURRENT); |
498 | |
499 | type = ftnode_fetch_none; |
500 | } |
501 | |
502 | void ftnode_fetch_extra::(FT ft_, struct ft_cursor *cursor) { |
503 | _create_internal(ft_); |
504 | invariant(ft->h->type == FT_CURRENT); |
505 | |
506 | type = ftnode_fetch_prefetch; |
507 | const DBT *left = &cursor->range_lock_left_key; |
508 | if (left->data) { |
509 | toku_clone_dbt(&range_lock_left_key, *left); |
510 | } |
511 | const DBT *right = &cursor->range_lock_right_key; |
512 | if (right->data) { |
513 | toku_clone_dbt(&range_lock_right_key, *right); |
514 | } |
515 | left_is_neg_infty = cursor->left_is_neg_infty; |
516 | right_is_pos_infty = cursor->right_is_pos_infty; |
517 | disable_prefetching = cursor->disable_prefetching; |
518 | } |
519 | |
520 | void ftnode_fetch_extra::(void) { |
521 | toku_destroy_dbt(&range_lock_left_key); |
522 | toku_destroy_dbt(&range_lock_right_key); |
523 | } |
524 | |
525 | // Requires: child_to_read to have been set |
526 | bool ftnode_fetch_extra::(int childnum) const { |
527 | return type == ftnode_fetch_all || |
528 | (child_to_read == childnum && |
529 | (type == ftnode_fetch_subset || type == ftnode_fetch_keymatch)); |
530 | } |
531 | |
532 | int ftnode_fetch_extra::(FTNODE node) const { |
533 | paranoid_invariant(type == ftnode_fetch_subset || |
534 | type == ftnode_fetch_prefetch || |
535 | type == ftnode_fetch_keymatch); |
536 | if (left_is_neg_infty) { |
537 | return 0; |
538 | } else if (range_lock_left_key.data == nullptr) { |
539 | return -1; |
540 | } else { |
541 | return toku_ftnode_which_child(node, &range_lock_left_key, ft->cmp); |
542 | } |
543 | } |
544 | |
545 | int ftnode_fetch_extra::(FTNODE node) const { |
546 | paranoid_invariant(type == ftnode_fetch_subset || |
547 | type == ftnode_fetch_prefetch || |
548 | type == ftnode_fetch_keymatch); |
549 | if (right_is_pos_infty) { |
550 | return node->n_children - 1; |
551 | } else if (range_lock_right_key.data == nullptr) { |
552 | return -1; |
553 | } else { |
554 | return toku_ftnode_which_child(node, &range_lock_right_key, ft->cmp); |
555 | } |
556 | } |
557 | |
558 | static int |
559 | ft_cursor_rightmost_child_wanted(FT_CURSOR cursor, FT_HANDLE ft_handle, FTNODE node) |
560 | { |
561 | if (cursor->right_is_pos_infty) { |
562 | return node->n_children - 1; |
563 | } else if (cursor->range_lock_right_key.data == nullptr) { |
564 | return -1; |
565 | } else { |
566 | return toku_ftnode_which_child(node, &cursor->range_lock_right_key, ft_handle->ft->cmp); |
567 | } |
568 | } |
569 | |
570 | STAT64INFO_S |
571 | toku_get_and_clear_basement_stats(FTNODE leafnode) { |
572 | invariant(leafnode->height == 0); |
573 | STAT64INFO_S deltas = ZEROSTATS; |
574 | for (int i = 0; i < leafnode->n_children; i++) { |
575 | BASEMENTNODE bn = BLB(leafnode, i); |
576 | invariant(BP_STATE(leafnode,i) == PT_AVAIL); |
577 | deltas.numrows += bn->stat64_delta.numrows; |
578 | deltas.numbytes += bn->stat64_delta.numbytes; |
579 | bn->stat64_delta = ZEROSTATS; |
580 | } |
581 | return deltas; |
582 | } |
583 | |
584 | void toku_ft_status_update_flush_reason(FTNODE node, |
585 | uint64_t uncompressed_bytes_flushed, uint64_t bytes_written, |
586 | tokutime_t write_time, bool for_checkpoint) { |
587 | if (node->height == 0) { |
588 | if (for_checkpoint) { |
589 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, 1); |
590 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT, bytes_written); |
591 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed); |
592 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME_FOR_CHECKPOINT, write_time); |
593 | } |
594 | else { |
595 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF, 1); |
596 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES, bytes_written); |
597 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed); |
598 | FT_STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME, write_time); |
599 | } |
600 | } |
601 | else { |
602 | if (for_checkpoint) { |
603 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, 1); |
604 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT, bytes_written); |
605 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed); |
606 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, write_time); |
607 | } |
608 | else { |
609 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF, 1); |
610 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES, bytes_written); |
611 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed); |
612 | FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME, write_time); |
613 | } |
614 | } |
615 | } |
616 | |
617 | void toku_ftnode_checkpoint_complete_callback(void *value_data) { |
618 | FTNODE node = static_cast<FTNODE>(value_data); |
619 | if (node->height > 0) { |
620 | for (int i = 0; i < node->n_children; ++i) { |
621 | if (BP_STATE(node, i) == PT_AVAIL) { |
622 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
623 | bnc->flow[1] = bnc->flow[0]; |
624 | bnc->flow[0] = 0; |
625 | } |
626 | } |
627 | } |
628 | } |
629 | |
630 | void toku_ftnode_clone_callback(void *value_data, |
631 | void **cloned_value_data, |
632 | long *clone_size, |
633 | PAIR_ATTR *new_attr, |
634 | bool for_checkpoint, |
635 | void *) { |
636 | FTNODE node = static_cast<FTNODE>(value_data); |
637 | toku_ftnode_assert_fully_in_memory(node); |
638 | FT ft = static_cast<FT>(write_extraargs); |
639 | FTNODE XCALLOC(cloned_node); |
640 | if (node->height == 0) { |
641 | // set header stats, must be done before rebalancing |
642 | toku_ftnode_update_disk_stats(node, ft, for_checkpoint); |
643 | // rebalance the leaf node |
644 | toku_ftnode_leaf_rebalance(node, ft->h->basementnodesize); |
645 | } |
646 | |
647 | cloned_node->oldest_referenced_xid_known = |
648 | node->oldest_referenced_xid_known; |
649 | cloned_node->max_msn_applied_to_node_on_disk = |
650 | node->max_msn_applied_to_node_on_disk; |
651 | cloned_node->flags = node->flags; |
652 | cloned_node->blocknum = node->blocknum; |
653 | cloned_node->layout_version = node->layout_version; |
654 | cloned_node->layout_version_original = node->layout_version_original; |
655 | cloned_node->layout_version_read_from_disk = |
656 | node->layout_version_read_from_disk; |
657 | cloned_node->build_id = node->build_id; |
658 | cloned_node->height = node->height; |
659 | cloned_node->dirty = node->dirty; |
660 | cloned_node->fullhash = node->fullhash; |
661 | cloned_node->n_children = node->n_children; |
662 | |
663 | XMALLOC_N(node->n_children, cloned_node->bp); |
664 | // clone pivots |
665 | cloned_node->pivotkeys.create_from_pivot_keys(node->pivotkeys); |
666 | if (node->height > 0) { |
667 | // need to move messages here so that we don't serialize stale |
668 | // messages to the fresh tree - ft verify code complains otherwise. |
669 | toku_move_ftnode_messages_to_stale(ft, node); |
670 | } |
671 | // clone partition |
672 | toku_ftnode_clone_partitions(node, cloned_node); |
673 | |
674 | // clear dirty bit |
675 | node->dirty = 0; |
676 | cloned_node->dirty = 0; |
677 | node->layout_version_read_from_disk = FT_LAYOUT_VERSION; |
678 | // set new pair attr if necessary |
679 | if (node->height == 0) { |
680 | *new_attr = make_ftnode_pair_attr(node); |
681 | for (int i = 0; i < node->n_children; i++) { |
682 | if (BP_STATE(node, i) == PT_AVAIL) { |
683 | BLB_LRD(node, i) = 0; |
684 | BLB_LRD(cloned_node, i) = 0; |
685 | } |
686 | } |
687 | } else { |
688 | new_attr->is_valid = false; |
689 | } |
690 | *clone_size = ftnode_memory_size(cloned_node); |
691 | *cloned_value_data = cloned_node; |
692 | } |
693 | |
694 | void toku_ftnode_flush_callback(CACHEFILE UU(cachefile), |
695 | int fd, |
696 | BLOCKNUM blocknum, |
697 | void *ftnode_v, |
698 | void **disk_data, |
699 | void *, |
700 | PAIR_ATTR size __attribute__((unused)), |
701 | PAIR_ATTR *new_size, |
702 | bool write_me, |
703 | bool keep_me, |
704 | bool for_checkpoint, |
705 | bool is_clone) { |
706 | FT ft = (FT)extraargs; |
707 | FTNODE ftnode = (FTNODE)ftnode_v; |
708 | FTNODE_DISK_DATA *ndd = (FTNODE_DISK_DATA *)disk_data; |
709 | assert(ftnode->blocknum.b == blocknum.b); |
710 | int height = ftnode->height; |
711 | if (write_me) { |
712 | toku_ftnode_assert_fully_in_memory(ftnode); |
713 | if (height > 0 && !is_clone) { |
714 | // cloned nodes already had their stale messages moved, see |
715 | // toku_ftnode_clone_callback() |
716 | toku_move_ftnode_messages_to_stale(ft, ftnode); |
717 | } else if (height == 0) { |
718 | toku_ftnode_leaf_run_gc(ft, ftnode); |
719 | if (!is_clone) { |
720 | toku_ftnode_update_disk_stats(ftnode, ft, for_checkpoint); |
721 | } |
722 | } |
723 | int r = toku_serialize_ftnode_to( |
724 | fd, ftnode->blocknum, ftnode, ndd, !is_clone, ft, for_checkpoint); |
725 | assert_zero(r); |
726 | ftnode->layout_version_read_from_disk = FT_LAYOUT_VERSION; |
727 | } |
728 | if (!keep_me) { |
729 | if (!is_clone) { |
730 | long node_size = ftnode_memory_size(ftnode); |
731 | if (ftnode->height == 0) { |
732 | FT_STATUS_INC(FT_FULL_EVICTIONS_LEAF, 1); |
733 | FT_STATUS_INC(FT_FULL_EVICTIONS_LEAF_BYTES, node_size); |
734 | |
735 | // A leaf node (height == 0) is being evicted (!keep_me) and is |
736 | // not a checkpoint clone (!is_clone). This leaf node may have |
737 | // had messages applied to satisfy a query, but was never |
738 | // actually dirtied (!ftnode->dirty && !write_me). **Note that |
739 | // if (write_me) would persist the node and clear the dirty |
740 | // flag **. This message application may have updated the trees |
741 | // logical row count. Since these message applications are not |
742 | // persisted, we need undo the logical row count adjustments as |
743 | // they may occur again in the future if/when the node is |
744 | // re-read from disk for another query or change. |
745 | if (!ftnode->dirty && !write_me) { |
746 | int64_t lrc_delta = 0; |
747 | for (int i = 0; i < ftnode->n_children; i++) { |
748 | if (BP_STATE(ftnode, i) == PT_AVAIL) { |
749 | lrc_delta -= BLB_LRD(ftnode, i); |
750 | BLB_LRD(ftnode, i) = 0; |
751 | } |
752 | } |
753 | toku_ft_adjust_logical_row_count(ft, lrc_delta); |
754 | } |
755 | } else { |
756 | FT_STATUS_INC(FT_FULL_EVICTIONS_NONLEAF, 1); |
757 | FT_STATUS_INC(FT_FULL_EVICTIONS_NONLEAF_BYTES, node_size); |
758 | } |
759 | toku_free(*disk_data); |
760 | } else { |
761 | if (ftnode->height == 0) { |
762 | // No need to adjust logical row counts when flushing a clone |
763 | // as they should have been zeroed out anyway when cloned. |
764 | // Clones are 'copies' of work already done so doing it again |
765 | // (adjusting row counts) would be redundant and leads to |
766 | // inaccurate counts. |
767 | for (int i = 0; i < ftnode->n_children; i++) { |
768 | if (BP_STATE(ftnode, i) == PT_AVAIL) { |
769 | BASEMENTNODE bn = BLB(ftnode, i); |
770 | toku_ft_decrease_stats(&ft->in_memory_stats, |
771 | bn->stat64_delta); |
772 | } |
773 | } |
774 | } |
775 | } |
776 | toku_ftnode_free(&ftnode); |
777 | } else { |
778 | *new_size = make_ftnode_pair_attr(ftnode); |
779 | } |
780 | } |
781 | |
782 | void |
783 | (ftnode_fetch_extra *bfe) |
784 | { |
785 | if (bfe->type == ftnode_fetch_prefetch) { |
786 | FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1); |
787 | FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_PREFETCH, bfe->bytes_read); |
788 | FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->io_time); |
789 | } else if (bfe->type == ftnode_fetch_all) { |
790 | FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1); |
791 | FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_WRITE, bfe->bytes_read); |
792 | FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->io_time); |
793 | } else if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_keymatch) { |
794 | FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1); |
795 | FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_QUERY, bfe->bytes_read); |
796 | FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->io_time); |
797 | } |
798 | } |
799 | |
800 | int toku_ftnode_fetch_callback(CACHEFILE UU(cachefile), |
801 | PAIR p, |
802 | int fd, |
803 | BLOCKNUM blocknum, |
804 | uint32_t fullhash, |
805 | void **ftnode_pv, |
806 | void **disk_data, |
807 | PAIR_ATTR *sizep, |
808 | int *dirtyp, |
809 | void *) { |
810 | assert(extraargs); |
811 | assert(*ftnode_pv == nullptr); |
812 | FTNODE_DISK_DATA *ndd = (FTNODE_DISK_DATA *)disk_data; |
813 | ftnode_fetch_extra *bfe = (ftnode_fetch_extra *)extraargs; |
814 | FTNODE *node = (FTNODE *)ftnode_pv; |
815 | // deserialize the node, must pass the bfe in because we cannot |
816 | // evaluate what piece of the the node is necessary until we get it at |
817 | // least partially into memory |
818 | int r = |
819 | toku_deserialize_ftnode_from(fd, blocknum, fullhash, node, ndd, bfe); |
820 | if (r != 0) { |
821 | if (r == TOKUDB_BAD_CHECKSUM) { |
822 | fprintf( |
823 | stderr, |
824 | "%s:%d:toku_ftnode_fetch_callback - " |
825 | "file[%s], blocknum[%ld], toku_deserialize_ftnode_from " |
826 | "failed with a checksum error.\n" , |
827 | __FILE__, |
828 | __LINE__, |
829 | toku_cachefile_fname_in_env(cachefile), |
830 | blocknum.b); |
831 | } else { |
832 | fprintf( |
833 | stderr, |
834 | "%s:%d:toku_ftnode_fetch_callback - " |
835 | "file[%s], blocknum[%ld], toku_deserialize_ftnode_from " |
836 | "failed with %d.\n" , |
837 | __FILE__, |
838 | __LINE__, |
839 | toku_cachefile_fname_in_env(cachefile), |
840 | blocknum.b, |
841 | r); |
842 | } |
843 | // make absolutely sure we crash before doing anything else. |
844 | abort(); |
845 | } |
846 | |
847 | if (r == 0) { |
848 | *sizep = make_ftnode_pair_attr(*node); |
849 | (*node)->ct_pair = p; |
850 | *dirtyp = (*node)->dirty; // deserialize could mark the node as dirty |
851 | // (presumably for upgrade) |
852 | } |
853 | return r; |
854 | } |
855 | |
856 | static bool ft_compress_buffers_before_eviction = true; |
857 | |
858 | void toku_ft_set_compress_buffers_before_eviction(bool compress_buffers) { |
859 | ft_compress_buffers_before_eviction = compress_buffers; |
860 | } |
861 | |
862 | void toku_ftnode_pe_est_callback( |
863 | void* ftnode_pv, |
864 | void* disk_data, |
865 | long* bytes_freed_estimate, |
866 | enum partial_eviction_cost *cost, |
867 | void* UU() |
868 | ) |
869 | { |
870 | paranoid_invariant(ftnode_pv != NULL); |
871 | long bytes_to_free = 0; |
872 | FTNODE node = static_cast<FTNODE>(ftnode_pv); |
873 | if (node->dirty || node->height == 0 || |
874 | node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) { |
875 | *bytes_freed_estimate = 0; |
876 | *cost = PE_CHEAP; |
877 | goto exit; |
878 | } |
879 | |
880 | // |
881 | // we are dealing with a clean internal node |
882 | // |
883 | *cost = PE_EXPENSIVE; |
884 | // now lets get an estimate for how much data we can free up |
885 | // we estimate the compressed size of data to be how large |
886 | // the compressed data is on disk |
887 | for (int i = 0; i < node->n_children; i++) { |
888 | if (BP_STATE(node,i) == PT_AVAIL && BP_SHOULD_EVICT(node,i)) { |
889 | // calculate how much data would be freed if |
890 | // we compress this node and add it to |
891 | // bytes_to_free |
892 | |
893 | if (ft_compress_buffers_before_eviction) { |
894 | // first get an estimate for how much space will be taken |
895 | // after compression, it is simply the size of compressed |
896 | // data on disk plus the size of the struct that holds it |
897 | FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data; |
898 | uint32_t compressed_data_size = BP_SIZE(ndd, i); |
899 | compressed_data_size += sizeof(struct sub_block); |
900 | |
901 | // now get the space taken now |
902 | uint32_t decompressed_data_size = get_avail_internal_node_partition_size(node,i); |
903 | bytes_to_free += (decompressed_data_size - compressed_data_size); |
904 | } else { |
905 | bytes_to_free += get_avail_internal_node_partition_size(node, i); |
906 | } |
907 | } |
908 | } |
909 | |
910 | *bytes_freed_estimate = bytes_to_free; |
911 | exit: |
912 | return; |
913 | } |
914 | |
915 | // replace the child buffer with a compressed version of itself. |
916 | static void compress_internal_node_partition(FTNODE node, int i, enum toku_compression_method compression_method) { |
917 | // if we should evict, compress the |
918 | // message buffer into a sub_block |
919 | assert(BP_STATE(node, i) == PT_AVAIL); |
920 | assert(node->height > 0); |
921 | SUB_BLOCK XMALLOC(sb); |
922 | sub_block_init(sb); |
923 | toku_create_compressed_partition_from_available(node, i, compression_method, sb); |
924 | |
925 | // now set the state to compressed |
926 | set_BSB(node, i, sb); |
927 | BP_STATE(node,i) = PT_COMPRESSED; |
928 | } |
929 | |
930 | // callback for partially evicting a node |
931 | int toku_ftnode_pe_callback(void *ftnode_pv, |
932 | PAIR_ATTR old_attr, |
933 | void *, |
934 | void (*finalize)(PAIR_ATTR new_attr, void *), |
935 | void *) { |
936 | FTNODE node = (FTNODE)ftnode_pv; |
937 | FT ft = (FT)write_extraargs; |
938 | int num_partial_evictions = 0; |
939 | |
940 | // Hold things we intend to destroy here. |
941 | // They will be taken care of after finalize(). |
942 | int num_basements_to_destroy = 0; |
943 | int num_buffers_to_destroy = 0; |
944 | int num_pointers_to_free = 0; |
945 | BASEMENTNODE basements_to_destroy[node->n_children]; |
946 | NONLEAF_CHILDINFO buffers_to_destroy[node->n_children]; |
947 | void *pointers_to_free[node->n_children * 2]; |
948 | |
949 | // Don't partially evict dirty nodes |
950 | if (node->dirty) { |
951 | goto exit; |
952 | } |
953 | // Don't partially evict nodes whose partitions can't be read back |
954 | // from disk individually |
955 | if (node->layout_version_read_from_disk < |
956 | FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) { |
957 | goto exit; |
958 | } |
959 | // |
960 | // partial eviction for nonleaf nodes |
961 | // |
962 | if (node->height > 0) { |
963 | for (int i = 0; i < node->n_children; i++) { |
964 | if (BP_STATE(node, i) == PT_AVAIL) { |
965 | if (BP_SHOULD_EVICT(node, i)) { |
966 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
967 | if (ft_compress_buffers_before_eviction && |
968 | // We may not serialize and compress a partition in |
969 | // memory if its in memory layout version is different |
970 | // than what's on disk (and therefore requires upgrade). |
971 | // |
972 | // Auto-upgrade code assumes that if a node's layout |
973 | // version read from disk is not current, it MUST |
974 | // require upgrade. |
975 | // Breaking this rule would cause upgrade code to |
976 | // upgrade this partition again after we serialize it as |
977 | // the current version, which is bad. |
978 | node->layout_version == |
979 | node->layout_version_read_from_disk) { |
980 | toku_ft_bnc_move_messages_to_stale(ft, bnc); |
981 | compress_internal_node_partition( |
982 | node, |
983 | i, |
984 | // Always compress with quicklz |
985 | TOKU_QUICKLZ_METHOD); |
986 | } else { |
987 | // We're not compressing buffers before eviction. Simply |
988 | // detach the buffer and set the child's state to |
989 | // on-disk. |
990 | set_BNULL(node, i); |
991 | BP_STATE(node, i) = PT_ON_DISK; |
992 | } |
993 | buffers_to_destroy[num_buffers_to_destroy++] = bnc; |
994 | num_partial_evictions++; |
995 | } else { |
996 | BP_SWEEP_CLOCK(node, i); |
997 | } |
998 | } else { |
999 | continue; |
1000 | } |
1001 | } |
1002 | } else { |
1003 | // |
1004 | // partial eviction strategy for basement nodes: |
1005 | // if the bn is compressed, evict it |
1006 | // else: check if it requires eviction, if it does, evict it, if not, |
1007 | // sweep the clock count |
1008 | // |
1009 | for (int i = 0; i < node->n_children; i++) { |
1010 | // Get rid of compressed stuff no matter what. |
1011 | if (BP_STATE(node, i) == PT_COMPRESSED) { |
1012 | SUB_BLOCK sb = BSB(node, i); |
1013 | pointers_to_free[num_pointers_to_free++] = sb->compressed_ptr; |
1014 | pointers_to_free[num_pointers_to_free++] = sb; |
1015 | set_BNULL(node, i); |
1016 | BP_STATE(node, i) = PT_ON_DISK; |
1017 | num_partial_evictions++; |
1018 | } else if (BP_STATE(node, i) == PT_AVAIL) { |
1019 | if (BP_SHOULD_EVICT(node, i)) { |
1020 | BASEMENTNODE bn = BLB(node, i); |
1021 | basements_to_destroy[num_basements_to_destroy++] = bn; |
1022 | toku_ft_decrease_stats(&ft->in_memory_stats, |
1023 | bn->stat64_delta); |
1024 | // A basement node is being partially evicted. |
1025 | // This masement node may have had messages applied to it to |
1026 | // satisfy a query, but was never actually dirtied. |
1027 | // This message application may have updated the trees |
1028 | // logical row count. Since these message applications are |
1029 | // not being persisted, we need undo the logical row count |
1030 | // adjustments as they may occur again in the future if/when |
1031 | // the node is re-read from disk for another query or change. |
1032 | toku_ft_adjust_logical_row_count(ft, |
1033 | -bn->logical_rows_delta); |
1034 | set_BNULL(node, i); |
1035 | BP_STATE(node, i) = PT_ON_DISK; |
1036 | num_partial_evictions++; |
1037 | } else { |
1038 | BP_SWEEP_CLOCK(node, i); |
1039 | } |
1040 | } else if (BP_STATE(node, i) == PT_ON_DISK) { |
1041 | continue; |
1042 | } else { |
1043 | abort(); |
1044 | } |
1045 | } |
1046 | } |
1047 | |
1048 | exit: |
1049 | // call the finalize callback with a new pair attr |
1050 | int height = node->height; |
1051 | PAIR_ATTR new_attr = make_ftnode_pair_attr(node); |
1052 | finalize(new_attr, finalize_extra); |
1053 | |
1054 | // destroy everything now that we've called finalize(), |
1055 | // and, by contract, and it's safe to do expensive work. |
1056 | for (int i = 0; i < num_basements_to_destroy; i++) { |
1057 | destroy_basement_node(basements_to_destroy[i]); |
1058 | } |
1059 | for (int i = 0; i < num_buffers_to_destroy; i++) { |
1060 | destroy_nonleaf_childinfo(buffers_to_destroy[i]); |
1061 | } |
1062 | for (int i = 0; i < num_pointers_to_free; i++) { |
1063 | toku_free(pointers_to_free[i]); |
1064 | } |
1065 | // stats |
1066 | if (num_partial_evictions > 0) { |
1067 | if (height == 0) { |
1068 | long delta = old_attr.leaf_size - new_attr.leaf_size; |
1069 | FT_STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF, num_partial_evictions); |
1070 | FT_STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF_BYTES, delta); |
1071 | } else { |
1072 | long delta = old_attr.nonleaf_size - new_attr.nonleaf_size; |
1073 | FT_STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF, num_partial_evictions); |
1074 | FT_STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF_BYTES, delta); |
1075 | } |
1076 | } |
1077 | return 0; |
1078 | } |
1079 | |
1080 | // We touch the clock while holding a read lock. |
1081 | // DRD reports a race but we want to ignore it. |
1082 | // Using a valgrind suppressions file is better than the DRD_IGNORE_VAR macro because it's more targeted. |
1083 | // We need a function to have something a drd suppression can reference |
1084 | // see src/tests/drd.suppressions (unsafe_touch_clock) |
1085 | static void unsafe_touch_clock(FTNODE node, int i) { |
1086 | toku_unsafe_set(&node->bp[i].clock_count, static_cast<unsigned char>(1)); |
1087 | } |
1088 | |
1089 | // Callback that states if a partial fetch of the node is necessary |
1090 | // Currently, this function is responsible for the following things: |
1091 | // - reporting to the cachetable whether a partial fetch is required (as required by the contract of the callback) |
1092 | // - A couple of things that are NOT required by the callback, but we do for efficiency and simplicity reasons: |
1093 | // - for queries, set the value of bfe->child_to_read so that the query that called this can proceed with the query |
1094 | // as opposed to having to evaluate toku_ft_search_which_child again. This is done to make the in-memory query faster |
1095 | // - touch the necessary partition's clock. The reason we do it here is so that there is one central place it is done, and not done |
1096 | // by all the various callers |
1097 | // |
1098 | bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* ) { |
1099 | // placeholder for now |
1100 | bool retval = false; |
1101 | FTNODE node = (FTNODE) ftnode_pv; |
1102 | ftnode_fetch_extra *bfe = (ftnode_fetch_extra *) read_extraargs; |
1103 | // |
1104 | // The three types of fetches that the ft layer may request are: |
1105 | // - ftnode_fetch_none: no partitions are necessary (example use: stat64) |
1106 | // - ftnode_fetch_subset: some subset is necessary (example use: toku_ft_search) |
1107 | // - ftnode_fetch_all: entire node is necessary (example use: flush, split, merge) |
1108 | // The code below checks if the necessary partitions are already in memory, |
1109 | // and if they are, return false, and if not, return true |
1110 | // |
1111 | if (bfe->type == ftnode_fetch_none) { |
1112 | retval = false; |
1113 | } |
1114 | else if (bfe->type == ftnode_fetch_all) { |
1115 | retval = false; |
1116 | for (int i = 0; i < node->n_children; i++) { |
1117 | unsafe_touch_clock(node,i); |
1118 | // if we find a partition that is not available, |
1119 | // then a partial fetch is required because |
1120 | // the entire node must be made available |
1121 | if (BP_STATE(node,i) != PT_AVAIL) { |
1122 | retval = true; |
1123 | } |
1124 | } |
1125 | } |
1126 | else if (bfe->type == ftnode_fetch_subset) { |
1127 | // we do not take into account prefetching yet |
1128 | // as of now, if we need a subset, the only thing |
1129 | // we can possibly require is a single basement node |
1130 | // we find out what basement node the query cares about |
1131 | // and check if it is available |
1132 | paranoid_invariant(bfe->search); |
1133 | bfe->child_to_read = toku_ft_search_which_child( |
1134 | bfe->ft->cmp, |
1135 | node, |
1136 | bfe->search |
1137 | ); |
1138 | unsafe_touch_clock(node,bfe->child_to_read); |
1139 | // child we want to read is not available, must set retval to true |
1140 | retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL); |
1141 | } |
1142 | else if (bfe->type == ftnode_fetch_prefetch) { |
1143 | // makes no sense to have prefetching disabled |
1144 | // and still call this function |
1145 | paranoid_invariant(!bfe->disable_prefetching); |
1146 | int lc = bfe->leftmost_child_wanted(node); |
1147 | int rc = bfe->rightmost_child_wanted(node); |
1148 | for (int i = lc; i <= rc; ++i) { |
1149 | if (BP_STATE(node, i) != PT_AVAIL) { |
1150 | retval = true; |
1151 | } |
1152 | } |
1153 | } else if (bfe->type == ftnode_fetch_keymatch) { |
1154 | // we do not take into account prefetching yet |
1155 | // as of now, if we need a subset, the only thing |
1156 | // we can possibly require is a single basement node |
1157 | // we find out what basement node the query cares about |
1158 | // and check if it is available |
1159 | if (node->height == 0) { |
1160 | int left_child = bfe->leftmost_child_wanted(node); |
1161 | int right_child = bfe->rightmost_child_wanted(node); |
1162 | if (left_child == right_child) { |
1163 | bfe->child_to_read = left_child; |
1164 | unsafe_touch_clock(node,bfe->child_to_read); |
1165 | // child we want to read is not available, must set retval to true |
1166 | retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL); |
1167 | } |
1168 | } |
1169 | } else { |
1170 | // we have a bug. The type should be known |
1171 | abort(); |
1172 | } |
1173 | return retval; |
1174 | } |
1175 | |
1176 | static void |
1177 | ( |
1178 | ftnode_fetch_extra *bfe, |
1179 | int childnum, |
1180 | enum pt_state state, |
1181 | bool is_leaf |
1182 | ) |
1183 | { |
1184 | invariant(state == PT_COMPRESSED || state == PT_ON_DISK); |
1185 | if (is_leaf) { |
1186 | if (bfe->type == ftnode_fetch_prefetch) { |
1187 | if (state == PT_COMPRESSED) { |
1188 | FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH, 1); |
1189 | } else { |
1190 | FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1); |
1191 | FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, bfe->bytes_read); |
1192 | FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->io_time); |
1193 | } |
1194 | } else if (bfe->type == ftnode_fetch_all) { |
1195 | if (state == PT_COMPRESSED) { |
1196 | FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE, 1); |
1197 | } else { |
1198 | FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1); |
1199 | FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_WRITE, bfe->bytes_read); |
1200 | FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->io_time); |
1201 | } |
1202 | } else if (childnum == bfe->child_to_read) { |
1203 | if (state == PT_COMPRESSED) { |
1204 | FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, 1); |
1205 | } else { |
1206 | FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1); |
1207 | FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_NORMAL, bfe->bytes_read); |
1208 | FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->io_time); |
1209 | } |
1210 | } else { |
1211 | if (state == PT_COMPRESSED) { |
1212 | FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE, 1); |
1213 | } else { |
1214 | FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1); |
1215 | FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, bfe->bytes_read); |
1216 | FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->io_time); |
1217 | } |
1218 | } |
1219 | } |
1220 | else { |
1221 | if (bfe->type == ftnode_fetch_prefetch) { |
1222 | if (state == PT_COMPRESSED) { |
1223 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, 1); |
1224 | } else { |
1225 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1); |
1226 | FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, bfe->bytes_read); |
1227 | FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->io_time); |
1228 | } |
1229 | } else if (bfe->type == ftnode_fetch_all) { |
1230 | if (state == PT_COMPRESSED) { |
1231 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, 1); |
1232 | } else { |
1233 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1); |
1234 | FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, bfe->bytes_read); |
1235 | FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->io_time); |
1236 | } |
1237 | } else if (childnum == bfe->child_to_read) { |
1238 | if (state == PT_COMPRESSED) { |
1239 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL, 1); |
1240 | } else { |
1241 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1); |
1242 | FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, bfe->bytes_read); |
1243 | FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->io_time); |
1244 | } |
1245 | } else { |
1246 | if (state == PT_COMPRESSED) { |
1247 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, 1); |
1248 | } else { |
1249 | FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1); |
1250 | FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->bytes_read); |
1251 | FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->io_time); |
1252 | } |
1253 | } |
1254 | } |
1255 | } |
1256 | |
1257 | void toku_ft_status_update_serialize_times(FTNODE node, tokutime_t serialize_time, tokutime_t compress_time) { |
1258 | if (node->height == 0) { |
1259 | FT_STATUS_INC(FT_LEAF_SERIALIZE_TOKUTIME, serialize_time); |
1260 | FT_STATUS_INC(FT_LEAF_COMPRESS_TOKUTIME, compress_time); |
1261 | } else { |
1262 | FT_STATUS_INC(FT_NONLEAF_SERIALIZE_TOKUTIME, serialize_time); |
1263 | FT_STATUS_INC(FT_NONLEAF_COMPRESS_TOKUTIME, compress_time); |
1264 | } |
1265 | } |
1266 | |
1267 | void toku_ft_status_update_deserialize_times(FTNODE node, tokutime_t deserialize_time, tokutime_t decompress_time) { |
1268 | if (node->height == 0) { |
1269 | FT_STATUS_INC(FT_LEAF_DESERIALIZE_TOKUTIME, deserialize_time); |
1270 | FT_STATUS_INC(FT_LEAF_DECOMPRESS_TOKUTIME, decompress_time); |
1271 | } else { |
1272 | FT_STATUS_INC(FT_NONLEAF_DESERIALIZE_TOKUTIME, deserialize_time); |
1273 | FT_STATUS_INC(FT_NONLEAF_DECOMPRESS_TOKUTIME, decompress_time); |
1274 | } |
1275 | } |
1276 | |
1277 | void toku_ft_status_note_msn_discard(void) { |
1278 | FT_STATUS_INC(FT_MSN_DISCARDS, 1); |
1279 | } |
1280 | |
1281 | void toku_ft_status_note_update(bool broadcast) { |
1282 | if (broadcast) { |
1283 | FT_STATUS_INC(FT_UPDATES_BROADCAST, 1); |
1284 | } else { |
1285 | FT_STATUS_INC(FT_UPDATES, 1); |
1286 | } |
1287 | } |
1288 | |
1289 | void toku_ft_status_note_msg_bytes_out(size_t buffsize) { |
1290 | FT_STATUS_INC(FT_MSG_BYTES_OUT, buffsize); |
1291 | FT_STATUS_INC(FT_MSG_BYTES_CURR, -buffsize); |
1292 | } |
1293 | void toku_ft_status_note_ftnode(int height, bool created) { |
1294 | if (created) { |
1295 | if (height == 0) { |
1296 | FT_STATUS_INC(FT_CREATE_LEAF, 1); |
1297 | } else { |
1298 | FT_STATUS_INC(FT_CREATE_NONLEAF, 1); |
1299 | } |
1300 | } else { |
1301 | // created = false means destroyed |
1302 | } |
1303 | } |
1304 | |
1305 | // callback for partially reading a node |
1306 | // could have just used toku_ftnode_fetch_callback, but wanted to separate the two cases to separate functions |
1307 | int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* , int fd, PAIR_ATTR* sizep) { |
1308 | int r = 0; |
1309 | FTNODE node = (FTNODE) ftnode_pv; |
1310 | FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data; |
1311 | ftnode_fetch_extra *bfe = (ftnode_fetch_extra *) read_extraargs; |
1312 | // there must be a reason this is being called. If we get a garbage type or the type is ftnode_fetch_none, |
1313 | // then something went wrong |
1314 | assert((bfe->type == ftnode_fetch_subset) || (bfe->type == ftnode_fetch_all) || (bfe->type == ftnode_fetch_prefetch) || (bfe->type == ftnode_fetch_keymatch)); |
1315 | // determine the range to prefetch |
1316 | int lc, rc; |
1317 | if (!bfe->disable_prefetching && |
1318 | (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) |
1319 | ) |
1320 | { |
1321 | lc = bfe->leftmost_child_wanted(node); |
1322 | rc = bfe->rightmost_child_wanted(node); |
1323 | } else { |
1324 | lc = -1; |
1325 | rc = -1; |
1326 | } |
1327 | for (int i = 0; i < node->n_children; i++) { |
1328 | if (BP_STATE(node,i) == PT_AVAIL) { |
1329 | continue; |
1330 | } |
1331 | if ((lc <= i && i <= rc) || bfe->wants_child_available(i)) { |
1332 | enum pt_state state = BP_STATE(node, i); |
1333 | if (state == PT_COMPRESSED) { |
1334 | r = toku_deserialize_bp_from_compressed(node, i, bfe); |
1335 | } else { |
1336 | invariant(state == PT_ON_DISK); |
1337 | r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe); |
1338 | } |
1339 | ft_status_update_partial_fetch_reason(bfe, i, state, (node->height == 0)); |
1340 | } |
1341 | |
1342 | if (r != 0) { |
1343 | if (r == TOKUDB_BAD_CHECKSUM) { |
1344 | fprintf(stderr, |
1345 | "Checksum failure while reading node partition in file %s.\n" , |
1346 | toku_cachefile_fname_in_env(bfe->ft->cf)); |
1347 | } else { |
1348 | fprintf(stderr, |
1349 | "Error while reading node partition %d\n" , |
1350 | get_maybe_error_errno()); |
1351 | } |
1352 | abort(); |
1353 | } |
1354 | } |
1355 | |
1356 | *sizep = make_ftnode_pair_attr(node); |
1357 | |
1358 | return 0; |
1359 | } |
1360 | |
1361 | int (DBT const &kdbt, const struct toku_msg_leafval_heaviside_extra &be) { |
1362 | return be.cmp(&kdbt, be.key); |
1363 | } |
1364 | |
1365 | static void |
1366 | ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp) |
1367 | // Effect: Create a new root node whose two children are the split of oldroot. |
1368 | // oldroot is unpinned in the process. |
1369 | // Leave the new root pinned. |
1370 | { |
1371 | FTNODE newroot; |
1372 | |
1373 | BLOCKNUM old_blocknum = oldroot->blocknum; |
1374 | uint32_t old_fullhash = oldroot->fullhash; |
1375 | |
1376 | int new_height = oldroot->height+1; |
1377 | uint32_t new_fullhash; |
1378 | BLOCKNUM new_blocknum; |
1379 | |
1380 | cachetable_put_empty_node_with_dep_nodes( |
1381 | ft, |
1382 | 1, |
1383 | &oldroot, |
1384 | &new_blocknum, |
1385 | &new_fullhash, |
1386 | &newroot |
1387 | ); |
1388 | |
1389 | assert(newroot); |
1390 | assert(new_height > 0); |
1391 | toku_initialize_empty_ftnode ( |
1392 | newroot, |
1393 | new_blocknum, |
1394 | new_height, |
1395 | 1, |
1396 | ft->h->layout_version, |
1397 | ft->h->flags |
1398 | ); |
1399 | newroot->fullhash = new_fullhash; |
1400 | MSN msna = oldroot->max_msn_applied_to_node_on_disk; |
1401 | newroot->max_msn_applied_to_node_on_disk = msna; |
1402 | BP_STATE(newroot,0) = PT_AVAIL; |
1403 | newroot->dirty = 1; |
1404 | |
1405 | // Set the first child to have the new blocknum, |
1406 | // and then swap newroot with oldroot. The new root |
1407 | // will inherit the hash/blocknum/pair from oldroot, |
1408 | // keeping the root blocknum constant. |
1409 | BP_BLOCKNUM(newroot, 0) = new_blocknum; |
1410 | toku_ftnode_swap_pair_values(newroot, oldroot); |
1411 | |
1412 | toku_ft_split_child( |
1413 | ft, |
1414 | newroot, |
1415 | 0, // childnum to split |
1416 | oldroot, |
1417 | SPLIT_EVENLY |
1418 | ); |
1419 | |
1420 | // ft_split_child released locks on newroot |
1421 | // and oldroot, so now we repin and |
1422 | // return to caller |
1423 | ftnode_fetch_extra bfe; |
1424 | bfe.create_for_full_read(ft); |
1425 | toku_pin_ftnode( |
1426 | ft, |
1427 | old_blocknum, |
1428 | old_fullhash, |
1429 | &bfe, |
1430 | PL_WRITE_EXPENSIVE, // may_modify_node |
1431 | newrootp, |
1432 | true |
1433 | ); |
1434 | } |
1435 | |
1436 | static void inject_message_in_locked_node( |
1437 | FT ft, |
1438 | FTNODE node, |
1439 | int childnum, |
1440 | const ft_msg &msg, |
1441 | size_t flow_deltas[], |
1442 | txn_gc_info *gc_info |
1443 | ) |
1444 | { |
1445 | // No guarantee that we're the writer, but oh well. |
1446 | // TODO(leif): Implement "do I have the lock or is it someone else?" |
1447 | // check in frwlock. Should be possible with TOKU_PTHREAD_DEBUG, nop |
1448 | // otherwise. |
1449 | invariant(toku_ctpair_is_write_locked(node->ct_pair)); |
1450 | toku_ftnode_assert_fully_in_memory(node); |
1451 | |
1452 | // Take the newer of the two oldest referenced xid values from the node and gc_info. |
1453 | // The gc_info usually has a newer value, because we got it at the top of this call |
1454 | // stack from the txn manager. But sometimes the node has a newer value, if some |
1455 | // other thread sees a newer value and writes to this node before we got the lock. |
1456 | if (gc_info->oldest_referenced_xid_for_implicit_promotion > node->oldest_referenced_xid_known) { |
1457 | node->oldest_referenced_xid_known = gc_info->oldest_referenced_xid_for_implicit_promotion; |
1458 | } else if (gc_info->oldest_referenced_xid_for_implicit_promotion < node->oldest_referenced_xid_known) { |
1459 | gc_info->oldest_referenced_xid_for_implicit_promotion = node->oldest_referenced_xid_known; |
1460 | } |
1461 | |
1462 | // Get the MSN from the header. Now that we have a write lock on the |
1463 | // node we're injecting into, we know no other thread will get an MSN |
1464 | // after us and get that message into our subtree before us. |
1465 | MSN msg_msn = { .msn = toku_sync_add_and_fetch(&ft->h->max_msn_in_ft.msn, 1) }; |
1466 | ft_msg msg_with_msn(msg.kdbt(), msg.vdbt(), msg.type(), msg_msn, msg.xids()); |
1467 | paranoid_invariant(msg_with_msn.msn().msn > node->max_msn_applied_to_node_on_disk.msn); |
1468 | |
1469 | STAT64INFO_S stats_delta = { 0,0 }; |
1470 | int64_t logical_rows_delta = 0; |
1471 | toku_ftnode_put_msg( |
1472 | ft->cmp, |
1473 | ft->update_fun, |
1474 | node, |
1475 | childnum, |
1476 | msg_with_msn, |
1477 | true, |
1478 | gc_info, |
1479 | flow_deltas, |
1480 | &stats_delta, |
1481 | &logical_rows_delta); |
1482 | if (stats_delta.numbytes || stats_delta.numrows) { |
1483 | toku_ft_update_stats(&ft->in_memory_stats, stats_delta); |
1484 | } |
1485 | toku_ft_adjust_logical_row_count(ft, logical_rows_delta); |
1486 | // |
1487 | // assumption is that toku_ftnode_put_msg will |
1488 | // mark the node as dirty. |
1489 | // enforcing invariant here. |
1490 | // |
1491 | paranoid_invariant(node->dirty != 0); |
1492 | |
1493 | // update some status variables |
1494 | if (node->height != 0) { |
1495 | size_t msgsize = msg.total_size(); |
1496 | FT_STATUS_INC(FT_MSG_BYTES_IN, msgsize); |
1497 | FT_STATUS_INC(FT_MSG_BYTES_CURR, msgsize); |
1498 | FT_STATUS_INC(FT_MSG_NUM, 1); |
1499 | if (ft_msg_type_applies_all(msg.type())) { |
1500 | FT_STATUS_INC(FT_MSG_NUM_BROADCAST, 1); |
1501 | } |
1502 | } |
1503 | |
1504 | // verify that msn of latest message was captured in root node |
1505 | paranoid_invariant(msg_with_msn.msn().msn == node->max_msn_applied_to_node_on_disk.msn); |
1506 | |
1507 | if (node->blocknum.b == ft->rightmost_blocknum.b) { |
1508 | if (toku_unsafe_fetch(&ft->seqinsert_score) < FT_SEQINSERT_SCORE_THRESHOLD) { |
1509 | // we promoted to the rightmost leaf node and the seqinsert score has not yet saturated. |
1510 | toku_sync_fetch_and_add(&ft->seqinsert_score, 1); |
1511 | } |
1512 | } else if (toku_unsafe_fetch(&ft->seqinsert_score) != 0) { |
1513 | // we promoted to something other than the rightmost leaf node and the score should reset |
1514 | toku_unsafe_set(&ft->seqinsert_score, static_cast<uint32_t>(0)); |
1515 | } |
1516 | |
1517 | // if we call toku_ft_flush_some_child, then that function unpins the root |
1518 | // otherwise, we unpin ourselves |
1519 | if (node->height > 0 && toku_ftnode_nonleaf_is_gorged(node, ft->h->nodesize)) { |
1520 | toku_ft_flush_node_on_background_thread(ft, node); |
1521 | } |
1522 | else { |
1523 | toku_unpin_ftnode(ft, node); |
1524 | } |
1525 | } |
1526 | |
1527 | // seqinsert_loc is a bitmask. |
1528 | // The root counts as being both on the "left extreme" and on the "right extreme". |
1529 | // Therefore, at the root, you're at LEFT_EXTREME | RIGHT_EXTREME. |
1530 | typedef char seqinsert_loc; |
1531 | static const seqinsert_loc NEITHER_EXTREME = 0; |
1532 | static const seqinsert_loc LEFT_EXTREME = 1; |
1533 | static const seqinsert_loc RIGHT_EXTREME = 2; |
1534 | |
1535 | static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int childnum, seqinsert_loc loc) |
1536 | // Effect: |
1537 | // If child needs to be split or merged, do that. |
1538 | // parent and child will be unlocked if this happens |
1539 | // Requires: parent and child are read locked |
1540 | // Returns: |
1541 | // true if relocking is needed |
1542 | // false otherwise |
1543 | { |
1544 | enum reactivity re = toku_ftnode_get_reactivity(ft, child); |
1545 | enum reactivity newre; |
1546 | BLOCKNUM child_blocknum; |
1547 | uint32_t child_fullhash; |
1548 | switch (re) { |
1549 | case RE_STABLE: |
1550 | return false; |
1551 | case RE_FISSIBLE: |
1552 | { |
1553 | // We only have a read lock on the parent. We need to drop both locks, and get write locks. |
1554 | BLOCKNUM parent_blocknum = parent->blocknum; |
1555 | uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum); |
1556 | int parent_height = parent->height; |
1557 | int parent_n_children = parent->n_children; |
1558 | toku_unpin_ftnode_read_only(ft, child); |
1559 | toku_unpin_ftnode_read_only(ft, parent); |
1560 | ftnode_fetch_extra bfe; |
1561 | bfe.create_for_full_read(ft); |
1562 | FTNODE newparent, newchild; |
1563 | toku_pin_ftnode(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, &newparent, true); |
1564 | if (newparent->height != parent_height || newparent->n_children != parent_n_children || |
1565 | childnum >= newparent->n_children || toku_bnc_n_entries(BNC(newparent, childnum))) { |
1566 | // If the height changed or childnum is now off the end, something clearly got split or merged out from under us. |
1567 | // If something got injected in this node, then it got split or merged and we shouldn't be splitting it. |
1568 | // But we already unpinned the child so we need to have the caller re-try the pins. |
1569 | toku_unpin_ftnode_read_only(ft, newparent); |
1570 | return true; |
1571 | } |
1572 | // It's ok to reuse the same childnum because if we get something |
1573 | // else we need to split, well, that's crazy, but let's go ahead |
1574 | // and split it. |
1575 | child_blocknum = BP_BLOCKNUM(newparent, childnum); |
1576 | child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum); |
1577 | toku_pin_ftnode_with_dep_nodes(ft, child_blocknum, child_fullhash, &bfe, PL_WRITE_CHEAP, 1, &newparent, &newchild, true); |
1578 | newre = toku_ftnode_get_reactivity(ft, newchild); |
1579 | if (newre == RE_FISSIBLE) { |
1580 | enum split_mode split_mode; |
1581 | if (newparent->height == 1 && (loc & LEFT_EXTREME) && childnum == 0) { |
1582 | split_mode = SPLIT_RIGHT_HEAVY; |
1583 | } else if (newparent->height == 1 && (loc & RIGHT_EXTREME) && childnum == newparent->n_children - 1) { |
1584 | split_mode = SPLIT_LEFT_HEAVY; |
1585 | } else { |
1586 | split_mode = SPLIT_EVENLY; |
1587 | } |
1588 | toku_ft_split_child(ft, newparent, childnum, newchild, split_mode); |
1589 | } else { |
1590 | // some other thread already got it, just unpin and tell the |
1591 | // caller to retry |
1592 | toku_unpin_ftnode_read_only(ft, newchild); |
1593 | toku_unpin_ftnode_read_only(ft, newparent); |
1594 | } |
1595 | return true; |
1596 | } |
1597 | case RE_FUSIBLE: |
1598 | { |
1599 | if (parent->height == 1) { |
1600 | // prevent re-merging of recently unevenly-split nodes |
1601 | if (((loc & LEFT_EXTREME) && childnum <= 1) || |
1602 | ((loc & RIGHT_EXTREME) && childnum >= parent->n_children - 2)) { |
1603 | return false; |
1604 | } |
1605 | } |
1606 | |
1607 | int parent_height = parent->height; |
1608 | BLOCKNUM parent_blocknum = parent->blocknum; |
1609 | uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum); |
1610 | toku_unpin_ftnode_read_only(ft, child); |
1611 | toku_unpin_ftnode_read_only(ft, parent); |
1612 | ftnode_fetch_extra bfe; |
1613 | bfe.create_for_full_read(ft); |
1614 | FTNODE newparent, newchild; |
1615 | toku_pin_ftnode(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, &newparent, true); |
1616 | if (newparent->height != parent_height || childnum >= newparent->n_children) { |
1617 | // looks like this is the root and it got merged, let's just start over (like in the split case above) |
1618 | toku_unpin_ftnode_read_only(ft, newparent); |
1619 | return true; |
1620 | } |
1621 | child_blocknum = BP_BLOCKNUM(newparent, childnum); |
1622 | child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum); |
1623 | toku_pin_ftnode_with_dep_nodes(ft, child_blocknum, child_fullhash, &bfe, PL_READ, 1, &newparent, &newchild, true); |
1624 | newre = toku_ftnode_get_reactivity(ft, newchild); |
1625 | if (newre == RE_FUSIBLE && newparent->n_children >= 2) { |
1626 | toku_unpin_ftnode_read_only(ft, newchild); |
1627 | toku_ft_merge_child(ft, newparent, childnum); |
1628 | } else { |
1629 | // Could be a weird case where newparent has only one |
1630 | // child. In this case, we want to inject here but we've |
1631 | // already unpinned the caller's copy of parent so we have |
1632 | // to ask them to re-pin, or they could (very rarely) |
1633 | // dereferenced memory in a freed node. TODO: we could |
1634 | // give them back the copy of the parent we pinned. |
1635 | // |
1636 | // Otherwise, some other thread already got it, just unpin |
1637 | // and tell the caller to retry |
1638 | toku_unpin_ftnode_read_only(ft, newchild); |
1639 | toku_unpin_ftnode_read_only(ft, newparent); |
1640 | } |
1641 | return true; |
1642 | } |
1643 | } |
1644 | abort(); |
1645 | } |
1646 | |
1647 | static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, const ft_msg &msg, size_t flow_deltas[], txn_gc_info *gc_info) |
1648 | // Effect: |
1649 | // Inject message into the node at this blocknum (cachekey). |
1650 | // Gets a write lock on the node for you. |
1651 | { |
1652 | toku::context inject_ctx(CTX_MESSAGE_INJECTION); |
1653 | FTNODE node; |
1654 | ftnode_fetch_extra bfe; |
1655 | bfe.create_for_full_read(ft); |
1656 | toku_pin_ftnode(ft, cachekey, fullhash, &bfe, PL_WRITE_CHEAP, &node, true); |
1657 | toku_ftnode_assert_fully_in_memory(node); |
1658 | paranoid_invariant(node->fullhash==fullhash); |
1659 | ft_verify_flags(ft, node); |
1660 | inject_message_in_locked_node(ft, node, -1, msg, flow_deltas, gc_info); |
1661 | } |
1662 | |
1663 | __attribute__((const)) |
1664 | static inline bool should_inject_in_node(seqinsert_loc loc, int height, int depth) |
1665 | // We should inject directly in a node if: |
1666 | // - it's a leaf, or |
1667 | // - it's a height 1 node not at either extreme, or |
1668 | // - it's a depth 2 node not at either extreme |
1669 | { |
1670 | return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2))); |
1671 | } |
1672 | |
1673 | static void ft_verify_or_set_rightmost_blocknum(FT ft, BLOCKNUM b) |
1674 | // Given: 'b', the _definitive_ and constant rightmost blocknum of 'ft' |
1675 | { |
1676 | if (toku_unsafe_fetch(&ft->rightmost_blocknum.b) == RESERVED_BLOCKNUM_NULL) { |
1677 | toku_ft_lock(ft); |
1678 | if (ft->rightmost_blocknum.b == RESERVED_BLOCKNUM_NULL) { |
1679 | toku_unsafe_set(&ft->rightmost_blocknum, b); |
1680 | } |
1681 | toku_ft_unlock(ft); |
1682 | } |
1683 | // The rightmost blocknum only transitions from RESERVED_BLOCKNUM_NULL to non-null. |
1684 | // If it's already set, verify that the stored value is consistent with 'b' |
1685 | invariant(toku_unsafe_fetch(&ft->rightmost_blocknum.b) == b.b); |
1686 | } |
1687 | |
1688 | bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) { |
1689 | static const double factor = 0.125; |
1690 | const uint64_t flow_threshold = ft->h->nodesize * factor; |
1691 | return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold; |
1692 | } |
1693 | |
1694 | static void push_something_in_subtree( |
1695 | FT ft, |
1696 | FTNODE subtree_root, |
1697 | int target_childnum, |
1698 | const ft_msg &msg, |
1699 | size_t flow_deltas[], |
1700 | txn_gc_info *gc_info, |
1701 | int depth, |
1702 | seqinsert_loc loc, |
1703 | bool just_did_split_or_merge |
1704 | ) |
1705 | // Effects: |
1706 | // Assign message an MSN from ft->h. |
1707 | // Put message in the subtree rooted at node. Due to promotion the message may not be injected directly in this node. |
1708 | // Unlock node or schedule it to be unlocked (after a background flush). |
1709 | // Either way, the caller is not responsible for unlocking node. |
1710 | // Requires: |
1711 | // subtree_root is read locked and fully in memory. |
1712 | // Notes: |
1713 | // In Ming, the basic rules of promotion are as follows: |
1714 | // Don't promote broadcast messages. |
1715 | // Don't promote past non-empty buffers. |
1716 | // Otherwise, promote at most to height 1 or depth 2 (whichever is highest), as far as the birdie asks you to promote. |
1717 | // We don't promote to leaves because injecting into leaves is expensive, mostly because of #5605 and some of #5552. |
1718 | // We don't promote past depth 2 because we found that gives us enough parallelism without costing us too much pinning work. |
1719 | // |
1720 | // This is true with the following caveats: |
1721 | // We always promote all the way to the leaves on the rightmost and leftmost edges of the tree, for sequential insertions. |
1722 | // (That means we can promote past depth 2 near the edges of the tree.) |
1723 | // |
1724 | // When the birdie is still saying we should promote, we use get_and_pin so that we wait to get the node. |
1725 | // If the birdie doesn't say to promote, we try maybe_get_and_pin. If we get the node cheaply, and it's dirty, we promote anyway. |
1726 | { |
1727 | toku_ftnode_assert_fully_in_memory(subtree_root); |
1728 | if (should_inject_in_node(loc, subtree_root->height, depth)) { |
1729 | switch (depth) { |
1730 | case 0: |
1731 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break; |
1732 | case 1: |
1733 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break; |
1734 | case 2: |
1735 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break; |
1736 | case 3: |
1737 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break; |
1738 | default: |
1739 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break; |
1740 | } |
1741 | // If the target node is a non-root leaf node on the right extreme, |
1742 | // set the rightmost blocknum. We know there are no messages above us |
1743 | // because promotion would not chose to inject directly into this leaf |
1744 | // otherwise. We explicitly skip the root node because then we don't have |
1745 | // to worry about changing the rightmost blocknum when the root splits. |
1746 | if (subtree_root->height == 0 && loc == RIGHT_EXTREME && subtree_root->blocknum.b != ft->h->root_blocknum.b) { |
1747 | ft_verify_or_set_rightmost_blocknum(ft, subtree_root->blocknum); |
1748 | } |
1749 | inject_message_in_locked_node(ft, subtree_root, target_childnum, msg, flow_deltas, gc_info); |
1750 | } else { |
1751 | int r; |
1752 | int childnum; |
1753 | NONLEAF_CHILDINFO bnc; |
1754 | |
1755 | // toku_ft_root_put_msg should not have called us otherwise. |
1756 | paranoid_invariant(ft_msg_type_applies_once(msg.type())); |
1757 | |
1758 | childnum = (target_childnum >= 0 ? target_childnum |
1759 | : toku_ftnode_which_child(subtree_root, msg.kdbt(), ft->cmp)); |
1760 | bnc = BNC(subtree_root, childnum); |
1761 | |
1762 | if (toku_bnc_n_entries(bnc) > 0) { |
1763 | // The buffer is non-empty, give up on promoting. |
1764 | FT_STATUS_INC(FT_PRO_NUM_STOP_NONEMPTY_BUF, 1); |
1765 | goto relock_and_push_here; |
1766 | } |
1767 | |
1768 | seqinsert_loc next_loc; |
1769 | if ((loc & LEFT_EXTREME) && childnum == 0) { |
1770 | next_loc = LEFT_EXTREME; |
1771 | } else if ((loc & RIGHT_EXTREME) && childnum == subtree_root->n_children - 1) { |
1772 | next_loc = RIGHT_EXTREME; |
1773 | } else { |
1774 | next_loc = NEITHER_EXTREME; |
1775 | } |
1776 | |
1777 | if (next_loc == NEITHER_EXTREME && subtree_root->height <= 1) { |
1778 | // Never promote to leaf nodes except on the edges |
1779 | FT_STATUS_INC(FT_PRO_NUM_STOP_H1, 1); |
1780 | goto relock_and_push_here; |
1781 | } |
1782 | |
1783 | { |
1784 | const BLOCKNUM child_blocknum = BP_BLOCKNUM(subtree_root, childnum); |
1785 | ft->blocktable.verify_blocknum_allocated(child_blocknum); |
1786 | const uint32_t child_fullhash = toku_cachetable_hash(ft->cf, child_blocknum); |
1787 | |
1788 | FTNODE child; |
1789 | { |
1790 | const int child_height = subtree_root->height - 1; |
1791 | const int child_depth = depth + 1; |
1792 | // If we're locking a leaf, or a height 1 node or depth 2 |
1793 | // node in the middle, we know we won't promote further |
1794 | // than that, so just get a write lock now. |
1795 | const pair_lock_type lock_type = (should_inject_in_node(next_loc, child_height, child_depth) |
1796 | ? PL_WRITE_CHEAP |
1797 | : PL_READ); |
1798 | if (next_loc != NEITHER_EXTREME || (toku_bnc_should_promote(ft, bnc) && depth <= 1)) { |
1799 | // If we're on either extreme, or the birdie wants to |
1800 | // promote and we're in the top two levels of the |
1801 | // tree, don't stop just because someone else has the |
1802 | // node locked. |
1803 | ftnode_fetch_extra bfe; |
1804 | bfe.create_for_full_read(ft); |
1805 | if (lock_type == PL_WRITE_CHEAP) { |
1806 | // We intend to take the write lock for message injection |
1807 | toku::context inject_ctx(CTX_MESSAGE_INJECTION); |
1808 | toku_pin_ftnode(ft, child_blocknum, child_fullhash, &bfe, lock_type, &child, true); |
1809 | } else { |
1810 | // We're going to keep promoting |
1811 | toku::context promo_ctx(CTX_PROMO); |
1812 | toku_pin_ftnode(ft, child_blocknum, child_fullhash, &bfe, lock_type, &child, true); |
1813 | } |
1814 | } else { |
1815 | r = toku_maybe_pin_ftnode_clean(ft, child_blocknum, child_fullhash, lock_type, &child); |
1816 | if (r != 0) { |
1817 | // We couldn't get the child cheaply, so give up on promoting. |
1818 | FT_STATUS_INC(FT_PRO_NUM_STOP_LOCK_CHILD, 1); |
1819 | goto relock_and_push_here; |
1820 | } |
1821 | if (toku_ftnode_fully_in_memory(child)) { |
1822 | // toku_pin_ftnode... touches the clock but toku_maybe_pin_ftnode... doesn't. |
1823 | // This prevents partial eviction. |
1824 | for (int i = 0; i < child->n_children; ++i) { |
1825 | BP_TOUCH_CLOCK(child, i); |
1826 | } |
1827 | } else { |
1828 | // We got the child, but it's not fully in memory. Give up on promoting. |
1829 | FT_STATUS_INC(FT_PRO_NUM_STOP_CHILD_INMEM, 1); |
1830 | goto unlock_child_and_push_here; |
1831 | } |
1832 | } |
1833 | } |
1834 | paranoid_invariant_notnull(child); |
1835 | |
1836 | if (!just_did_split_or_merge) { |
1837 | BLOCKNUM subtree_root_blocknum = subtree_root->blocknum; |
1838 | uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum); |
1839 | const bool did_split_or_merge = process_maybe_reactive_child(ft, subtree_root, child, childnum, loc); |
1840 | if (did_split_or_merge) { |
1841 | // Need to re-pin this node and try at this level again. |
1842 | FTNODE newparent; |
1843 | ftnode_fetch_extra bfe; |
1844 | bfe.create_for_full_read(ft); // should be fully in memory, we just split it |
1845 | toku_pin_ftnode(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, &newparent, true); |
1846 | push_something_in_subtree(ft, newparent, -1, msg, flow_deltas, gc_info, depth, loc, true); |
1847 | return; |
1848 | } |
1849 | } |
1850 | |
1851 | if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) { |
1852 | push_something_in_subtree(ft, child, -1, msg, flow_deltas, gc_info, depth + 1, next_loc, false); |
1853 | toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]); |
1854 | // The recursive call unpinned the child, but |
1855 | // we're responsible for unpinning subtree_root. |
1856 | toku_unpin_ftnode_read_only(ft, subtree_root); |
1857 | return; |
1858 | } |
1859 | |
1860 | FT_STATUS_INC(FT_PRO_NUM_DIDNT_WANT_PROMOTE, 1); |
1861 | unlock_child_and_push_here: |
1862 | // We locked the child, but we decided not to promote. |
1863 | // Unlock the child, and fall through to the next case. |
1864 | toku_unpin_ftnode_read_only(ft, child); |
1865 | } |
1866 | relock_and_push_here: |
1867 | // Give up on promoting. |
1868 | // We have subtree_root read-locked and we don't have a child locked. |
1869 | // Drop the read lock, grab a write lock, and inject here. |
1870 | { |
1871 | // Right now we have a read lock on subtree_root, but we want |
1872 | // to inject into it so we get a write lock instead. |
1873 | BLOCKNUM subtree_root_blocknum = subtree_root->blocknum; |
1874 | uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum); |
1875 | toku_unpin_ftnode_read_only(ft, subtree_root); |
1876 | switch (depth) { |
1877 | case 0: |
1878 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break; |
1879 | case 1: |
1880 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break; |
1881 | case 2: |
1882 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break; |
1883 | case 3: |
1884 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break; |
1885 | default: |
1886 | FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break; |
1887 | } |
1888 | inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, msg, flow_deltas, gc_info); |
1889 | } |
1890 | } |
1891 | } |
1892 | |
1893 | void toku_ft_root_put_msg( |
1894 | FT ft, |
1895 | const ft_msg &msg, |
1896 | txn_gc_info *gc_info |
1897 | ) |
1898 | // Effect: |
1899 | // - assign msn to message and update msn in the header |
1900 | // - push the message into the ft |
1901 | |
1902 | // As of Clayface, the root blocknum is a constant, so preventing a |
1903 | // race between message injection and the split of a root is the job |
1904 | // of the cachetable's locking rules. |
1905 | // |
1906 | // We also hold the MO lock for a number of reasons, but an important |
1907 | // one is to make sure that a begin_checkpoint may not start while |
1908 | // this code is executing. A begin_checkpoint does (at least) two things |
1909 | // that can interfere with the operations here: |
1910 | // - Copies the header to a checkpoint header. Because we may change |
1911 | // the max_msn_in_ft below, we don't want the header to be copied in |
1912 | // the middle of these operations. |
1913 | // - Takes note of the log's LSN. Because this put operation has |
1914 | // already been logged, this message injection must be included |
1915 | // in any checkpoint that contains this put's logentry. |
1916 | // Holding the mo lock throughout this function ensures that fact. |
1917 | { |
1918 | toku::context promo_ctx(CTX_PROMO); |
1919 | |
1920 | // blackhole fractal trees drop all messages, so do nothing. |
1921 | if (ft->blackhole) { |
1922 | return; |
1923 | } |
1924 | |
1925 | FTNODE node; |
1926 | |
1927 | uint32_t fullhash; |
1928 | CACHEKEY root_key; |
1929 | toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); |
1930 | ftnode_fetch_extra bfe; |
1931 | bfe.create_for_full_read(ft); |
1932 | |
1933 | size_t flow_deltas[] = { message_buffer::msg_memsize_in_buffer(msg), 0 }; |
1934 | |
1935 | pair_lock_type lock_type; |
1936 | lock_type = PL_READ; // try first for a read lock |
1937 | // If we need to split the root, we'll have to change from a read lock |
1938 | // to a write lock and check again. We change the variable lock_type |
1939 | // and jump back to here. |
1940 | change_lock_type: |
1941 | // get the root node |
1942 | toku_pin_ftnode(ft, root_key, fullhash, &bfe, lock_type, &node, true); |
1943 | toku_ftnode_assert_fully_in_memory(node); |
1944 | paranoid_invariant(node->fullhash==fullhash); |
1945 | ft_verify_flags(ft, node); |
1946 | |
1947 | // First handle a reactive root. |
1948 | // This relocking for split algorithm will cause every message |
1949 | // injection thread to change lock type back and forth, when only one |
1950 | // of them needs to in order to handle the split. That's not great, |
1951 | // but root splits are incredibly rare. |
1952 | enum reactivity re = toku_ftnode_get_reactivity(ft, node); |
1953 | switch (re) { |
1954 | case RE_STABLE: |
1955 | case RE_FUSIBLE: // cannot merge anything at the root |
1956 | if (lock_type != PL_READ) { |
1957 | // We thought we needed to split, but someone else got to |
1958 | // it before us. Downgrade to a read lock. |
1959 | toku_unpin_ftnode_read_only(ft, node); |
1960 | lock_type = PL_READ; |
1961 | goto change_lock_type; |
1962 | } |
1963 | break; |
1964 | case RE_FISSIBLE: |
1965 | if (lock_type == PL_READ) { |
1966 | // Here, we only have a read lock on the root. In order |
1967 | // to split it, we need a write lock, but in the course of |
1968 | // gaining the write lock, someone else may have gotten in |
1969 | // before us and split it. So we upgrade to a write lock |
1970 | // and check again. |
1971 | toku_unpin_ftnode_read_only(ft, node); |
1972 | lock_type = PL_WRITE_CHEAP; |
1973 | goto change_lock_type; |
1974 | } else { |
1975 | // We have a write lock, now we can split. |
1976 | ft_init_new_root(ft, node, &node); |
1977 | // Then downgrade back to a read lock, and we can finally |
1978 | // do the injection. |
1979 | toku_unpin_ftnode(ft, node); |
1980 | lock_type = PL_READ; |
1981 | FT_STATUS_INC(FT_PRO_NUM_ROOT_SPLIT, 1); |
1982 | goto change_lock_type; |
1983 | } |
1984 | break; |
1985 | } |
1986 | // If we get to here, we have a read lock and the root doesn't |
1987 | // need to be split. It's safe to inject the message. |
1988 | paranoid_invariant(lock_type == PL_READ); |
1989 | // We cannot assert that we have the read lock because frwlock asserts |
1990 | // that its mutex is locked when we check if there are any readers. |
1991 | // That wouldn't give us a strong guarantee that we have the read lock |
1992 | // anyway. |
1993 | |
1994 | // Now, either inject here or promote. We decide based on a heuristic: |
1995 | if (node->height == 0 || !ft_msg_type_applies_once(msg.type())) { |
1996 | // If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here. |
1997 | toku_unpin_ftnode_read_only(ft, node); |
1998 | FT_STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1); |
1999 | inject_message_at_this_blocknum(ft, root_key, fullhash, msg, flow_deltas, gc_info); |
2000 | } else if (node->height > 1) { |
2001 | // If the root's above height 1, we are definitely eligible for promotion. |
2002 | push_something_in_subtree(ft, node, -1, msg, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false); |
2003 | } else { |
2004 | // The root's height 1. We may be eligible for promotion here. |
2005 | // On the extremes, we want to promote, in the middle, we don't. |
2006 | int childnum = toku_ftnode_which_child(node, msg.kdbt(), ft->cmp); |
2007 | if (childnum == 0 || childnum == node->n_children - 1) { |
2008 | // On the extremes, promote. We know which childnum we're going to, so pass that down too. |
2009 | push_something_in_subtree(ft, node, childnum, msg, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false); |
2010 | } else { |
2011 | // At height 1 in the middle, don't promote, drop the read lock and inject here. |
2012 | toku_unpin_ftnode_read_only(ft, node); |
2013 | FT_STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1); |
2014 | inject_message_at_this_blocknum(ft, root_key, fullhash, msg, flow_deltas, gc_info); |
2015 | } |
2016 | } |
2017 | } |
2018 | |
2019 | // TODO: Remove me, I'm boring. |
2020 | static int ft_compare_keys(FT ft, const DBT *a, const DBT *b) |
2021 | // Effect: Compare two keys using the given fractal tree's comparator/descriptor |
2022 | { |
2023 | return ft->cmp(a, b); |
2024 | } |
2025 | |
2026 | static LEAFENTRY bn_get_le_and_key(BASEMENTNODE bn, int idx, DBT *key) |
2027 | // Effect: Gets the i'th leafentry from the given basement node and |
2028 | // fill its key in *key |
2029 | // Requires: The i'th leafentry exists. |
2030 | { |
2031 | LEAFENTRY le; |
2032 | uint32_t le_len; |
2033 | void *le_key; |
2034 | int r = bn->data_buffer.fetch_klpair(idx, &le, &le_len, &le_key); |
2035 | invariant_zero(r); |
2036 | toku_fill_dbt(key, le_key, le_len); |
2037 | return le; |
2038 | } |
2039 | |
2040 | static LEAFENTRY ft_leaf_leftmost_le_and_key(FTNODE leaf, DBT *leftmost_key) |
2041 | // Effect: If a leftmost key exists in the given leaf, toku_fill_dbt() |
2042 | // the key into *leftmost_key |
2043 | // Requires: Leaf is fully in memory and pinned for read or write. |
2044 | // Return: leafentry if it exists, nullptr otherwise |
2045 | { |
2046 | for (int i = 0; i < leaf->n_children; i++) { |
2047 | BASEMENTNODE bn = BLB(leaf, i); |
2048 | if (bn->data_buffer.num_klpairs() > 0) { |
2049 | // Get the first (leftmost) leafentry and its key |
2050 | return bn_get_le_and_key(bn, 0, leftmost_key); |
2051 | } |
2052 | } |
2053 | return nullptr; |
2054 | } |
2055 | |
2056 | static LEAFENTRY ft_leaf_rightmost_le_and_key(FTNODE leaf, DBT *rightmost_key) |
2057 | // Effect: If a rightmost key exists in the given leaf, toku_fill_dbt() |
2058 | // the key into *rightmost_key |
2059 | // Requires: Leaf is fully in memory and pinned for read or write. |
2060 | // Return: leafentry if it exists, nullptr otherwise |
2061 | { |
2062 | for (int i = leaf->n_children - 1; i >= 0; i--) { |
2063 | BASEMENTNODE bn = BLB(leaf, i); |
2064 | size_t num_les = bn->data_buffer.num_klpairs(); |
2065 | if (num_les > 0) { |
2066 | // Get the last (rightmost) leafentry and its key |
2067 | return bn_get_le_and_key(bn, num_les - 1, rightmost_key); |
2068 | } |
2069 | } |
2070 | return nullptr; |
2071 | } |
2072 | |
2073 | static int ft_leaf_get_relative_key_pos(FT ft, FTNODE leaf, const DBT *key, bool *nondeleted_key_found, int *target_childnum) |
2074 | // Effect: Determines what the relative position of the given key is with |
2075 | // respect to a leaf node, and if it exists. |
2076 | // Requires: Leaf is fully in memory and pinned for read or write. |
2077 | // Requires: target_childnum is non-null |
2078 | // Return: < 0 if key is less than the leftmost key in the leaf OR the relative position is unknown, for any reason. |
2079 | // 0 if key is in the bounds [leftmost_key, rightmost_key] for this leaf or the leaf is empty |
2080 | // > 0 if key is greater than the rightmost key in the leaf |
2081 | // *nondeleted_key_found is set (if non-null) if the target key was found and is not deleted, unmodified otherwise |
2082 | // *target_childnum is set to the child that (does or would) contain the key, if calculated, unmodified otherwise |
2083 | { |
2084 | DBT rightmost_key; |
2085 | LEAFENTRY rightmost_le = ft_leaf_rightmost_le_and_key(leaf, &rightmost_key); |
2086 | if (rightmost_le == nullptr) { |
2087 | // If we can't get a rightmost key then the leaf is empty. |
2088 | // In such a case, we don't have any information about what keys would be in this leaf. |
2089 | // We have to assume the leaf node that would contain this key is to the left. |
2090 | return -1; |
2091 | } |
2092 | // We have a rightmost leafentry, so it must exist in some child node |
2093 | invariant(leaf->n_children > 0); |
2094 | |
2095 | int relative_pos = 0; |
2096 | int c = ft_compare_keys(ft, key, &rightmost_key); |
2097 | if (c > 0) { |
2098 | relative_pos = 1; |
2099 | *target_childnum = leaf->n_children - 1; |
2100 | } else if (c == 0) { |
2101 | if (nondeleted_key_found != nullptr && !le_latest_is_del(rightmost_le)) { |
2102 | *nondeleted_key_found = true; |
2103 | } |
2104 | relative_pos = 0; |
2105 | *target_childnum = leaf->n_children - 1; |
2106 | } else { |
2107 | // The key is less than the rightmost. It may still be in bounds if it's >= the leftmost. |
2108 | DBT leftmost_key; |
2109 | LEAFENTRY leftmost_le = ft_leaf_leftmost_le_and_key(leaf, &leftmost_key); |
2110 | invariant_notnull(leftmost_le); // Must exist because a rightmost exists |
2111 | c = ft_compare_keys(ft, key, &leftmost_key); |
2112 | if (c > 0) { |
2113 | if (nondeleted_key_found != nullptr) { |
2114 | // The caller wants to know if a nondeleted key can be found. |
2115 | LEAFENTRY target_le; |
2116 | int childnum = toku_ftnode_which_child(leaf, key, ft->cmp); |
2117 | BASEMENTNODE bn = BLB(leaf, childnum); |
2118 | struct toku_msg_leafval_heaviside_extra (ft->cmp, key); |
2119 | int r = bn->data_buffer.find_zero<decltype(extra), toku_msg_leafval_heaviside>( |
2120 | extra, |
2121 | &target_le, |
2122 | nullptr, nullptr, nullptr |
2123 | ); |
2124 | *target_childnum = childnum; |
2125 | if (r == 0 && !le_latest_is_del(target_le)) { |
2126 | *nondeleted_key_found = true; |
2127 | } |
2128 | } |
2129 | relative_pos = 0; |
2130 | } else if (c == 0) { |
2131 | if (nondeleted_key_found != nullptr && !le_latest_is_del(leftmost_le)) { |
2132 | *nondeleted_key_found = true; |
2133 | } |
2134 | relative_pos = 0; |
2135 | *target_childnum = 0; |
2136 | } else { |
2137 | relative_pos = -1; |
2138 | } |
2139 | } |
2140 | |
2141 | return relative_pos; |
2142 | } |
2143 | |
2144 | static void ft_insert_directly_into_leaf(FT ft, FTNODE leaf, int target_childnum, DBT *key, DBT *val, |
2145 | XIDS message_xids, enum ft_msg_type type, txn_gc_info *gc_info); |
2146 | static int getf_nothing(uint32_t, const void *, uint32_t, const void *, void *, bool); |
2147 | |
2148 | static int ft_maybe_insert_into_rightmost_leaf(FT ft, DBT *key, DBT *val, XIDS message_xids, enum ft_msg_type type, |
2149 | txn_gc_info *gc_info, bool unique) |
2150 | // Effect: Pins the rightmost leaf node and attempts to do an insert. |
2151 | // There are three reasons why we may not succeed. |
2152 | // - The rightmost leaf is too full and needs a split. |
2153 | // - The key to insert is not within the provable bounds of this leaf node. |
2154 | // - The key is within bounds, but it already exists. |
2155 | // Return: 0 if this function did insert, DB_KEYEXIST if a unique key constraint exists and |
2156 | // some nondeleted leafentry with the same key exists |
2157 | // < 0 if this function did not insert, for a reason other than DB_KEYEXIST. |
2158 | // Note: Treat this function as a possible, but not necessary, optimization for insert. |
2159 | // Rationale: We want O(1) insertions down the rightmost path of the tree. |
2160 | { |
2161 | int r = -1; |
2162 | |
2163 | uint32_t rightmost_fullhash; |
2164 | BLOCKNUM rightmost_blocknum; |
2165 | FTNODE rightmost_leaf = nullptr; |
2166 | |
2167 | // Don't do the optimization if our heurstic suggests that |
2168 | // insertion pattern is not sequential. |
2169 | if (toku_unsafe_fetch(&ft->seqinsert_score) < FT_SEQINSERT_SCORE_THRESHOLD) { |
2170 | goto cleanup; |
2171 | } |
2172 | |
2173 | // We know the seqinsert score is high enough that we should |
2174 | // attempt to directly insert into the rightmost leaf. Because |
2175 | // the score is non-zero, the rightmost blocknum must have been |
2176 | // set. See inject_message_in_locked_node(), which only increases |
2177 | // the score if the target node blocknum == rightmost_blocknum |
2178 | rightmost_blocknum = ft->rightmost_blocknum; |
2179 | invariant(rightmost_blocknum.b != RESERVED_BLOCKNUM_NULL); |
2180 | |
2181 | // Pin the rightmost leaf with a write lock. |
2182 | rightmost_fullhash = toku_cachetable_hash(ft->cf, rightmost_blocknum); |
2183 | ftnode_fetch_extra bfe; |
2184 | bfe.create_for_full_read(ft); |
2185 | toku_pin_ftnode(ft, rightmost_blocknum, rightmost_fullhash, &bfe, PL_WRITE_CHEAP, &rightmost_leaf, true); |
2186 | |
2187 | // The rightmost blocknum never chances once it is initialized to something |
2188 | // other than null. Verify that the pinned node has the correct blocknum. |
2189 | invariant(rightmost_leaf->blocknum.b == rightmost_blocknum.b); |
2190 | |
2191 | // If the rightmost leaf is reactive, bail out out and let the normal promotion pass |
2192 | // take care of it. This also ensures that if any of our ancestors are reactive, |
2193 | // they'll be taken care of too. |
2194 | if (toku_ftnode_get_leaf_reactivity(rightmost_leaf, ft->h->nodesize) != RE_STABLE) { |
2195 | FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_FAIL_REACTIVE, 1); |
2196 | goto cleanup; |
2197 | } |
2198 | |
2199 | // The groundwork has been laid for an insertion directly into the rightmost |
2200 | // leaf node. We know that it is pinned for write, fully in memory, has |
2201 | // no messages above it, and is not reactive. |
2202 | // |
2203 | // Now, two more things must be true for this insertion to actually happen: |
2204 | // 1. The key to insert is within the bounds of this leafnode, or to the right. |
2205 | // 2. If there is a uniqueness constraint, it passes. |
2206 | bool nondeleted_key_found; |
2207 | int relative_pos; |
2208 | int target_childnum; |
2209 | |
2210 | nondeleted_key_found = false; |
2211 | target_childnum = -1; |
2212 | relative_pos = ft_leaf_get_relative_key_pos(ft, rightmost_leaf, key, |
2213 | unique ? &nondeleted_key_found : nullptr, |
2214 | &target_childnum); |
2215 | if (relative_pos >= 0) { |
2216 | FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_SUCCESS, 1); |
2217 | if (unique && nondeleted_key_found) { |
2218 | r = DB_KEYEXIST; |
2219 | } else { |
2220 | ft_insert_directly_into_leaf(ft, rightmost_leaf, target_childnum, |
2221 | key, val, message_xids, type, gc_info); |
2222 | r = 0; |
2223 | } |
2224 | } else { |
2225 | FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_FAIL_POS, 1); |
2226 | r = -1; |
2227 | } |
2228 | |
2229 | cleanup: |
2230 | // If we did the insert, the rightmost leaf was unpinned for us. |
2231 | if (r != 0 && rightmost_leaf != nullptr) { |
2232 | toku_unpin_ftnode(ft, rightmost_leaf); |
2233 | } |
2234 | |
2235 | return r; |
2236 | } |
2237 | |
2238 | static void ft_txn_log_insert(FT ft, DBT *key, DBT *val, TOKUTXN txn, bool do_logging, enum ft_msg_type type); |
2239 | |
2240 | int toku_ft_insert_unique(FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool do_logging) { |
2241 | // Effect: Insert a unique key-val pair into the fractal tree. |
2242 | // Return: 0 on success, DB_KEYEXIST if the overwrite constraint failed |
2243 | XIDS message_xids = txn != nullptr ? toku_txn_get_xids(txn) : toku_xids_get_root_xids(); |
2244 | |
2245 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
2246 | txn_manager_state txn_state_for_gc(txn_manager); |
2247 | |
2248 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
2249 | txn_gc_info gc_info(&txn_state_for_gc, |
2250 | oldest_referenced_xid_estimate, |
2251 | // no messages above us, we can implicitly promote uxrs based on this xid |
2252 | oldest_referenced_xid_estimate, |
2253 | true); |
2254 | int r = ft_maybe_insert_into_rightmost_leaf(ft_h->ft, key, val, message_xids, FT_INSERT, &gc_info, true); |
2255 | if (r != 0 && r != DB_KEYEXIST) { |
2256 | // Default to a regular unique check + insert algorithm if we couldn't |
2257 | // do it based on the rightmost leaf alone. |
2258 | int lookup_r = toku_ft_lookup(ft_h, key, getf_nothing, nullptr); |
2259 | if (lookup_r == DB_NOTFOUND) { |
2260 | toku_ft_send_insert(ft_h, key, val, message_xids, FT_INSERT, &gc_info); |
2261 | r = 0; |
2262 | } else { |
2263 | r = DB_KEYEXIST; |
2264 | } |
2265 | } |
2266 | |
2267 | if (r == 0) { |
2268 | ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, FT_INSERT); |
2269 | toku_ft_adjust_logical_row_count(ft_h->ft, 1); |
2270 | } |
2271 | return r; |
2272 | } |
2273 | |
2274 | // Effect: Insert the key-val pair into an ft. |
2275 | void toku_ft_insert (FT_HANDLE ft_handle, DBT *key, DBT *val, TOKUTXN txn) { |
2276 | toku_ft_maybe_insert(ft_handle, key, val, txn, false, ZERO_LSN, true, FT_INSERT); |
2277 | } |
2278 | |
2279 | void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) { |
2280 | paranoid_invariant(txn); |
2281 | toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log |
2282 | //before the (old) file is actually unlinked |
2283 | TOKULOGGER logger = toku_txn_logger(txn); |
2284 | |
2285 | BYTESTRING new_iname_bs = {.len=(uint32_t) strlen(new_iname), .data=(char*)new_iname}; |
2286 | toku_logger_save_rollback_load(txn, old_filenum, &new_iname_bs); |
2287 | if (do_log && logger) { |
2288 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2289 | toku_log_load(logger, load_lsn, do_fsync, txn, xid, old_filenum, new_iname_bs); |
2290 | } |
2291 | } |
2292 | |
2293 | // 2954 |
2294 | // this function handles the tasks needed to be recoverable |
2295 | // - write to rollback log |
2296 | // - write to recovery log |
2297 | void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn) |
2298 | { |
2299 | paranoid_invariant(txn); |
2300 | TOKULOGGER logger = toku_txn_logger(txn); |
2301 | |
2302 | // write to the rollback log |
2303 | toku_logger_save_rollback_hot_index(txn, &filenums); |
2304 | if (do_log && logger) { |
2305 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2306 | // write to the recovery log |
2307 | toku_log_hot_index(logger, hot_index_lsn, do_fsync, txn, xid, filenums); |
2308 | } |
2309 | } |
2310 | |
2311 | // Effect: Optimize the ft. |
2312 | void toku_ft_optimize (FT_HANDLE ft_h) { |
2313 | TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf); |
2314 | if (logger) { |
2315 | TXNID oldest = toku_txn_manager_get_oldest_living_xid(logger->txn_manager); |
2316 | |
2317 | XIDS root_xids = toku_xids_get_root_xids(); |
2318 | XIDS message_xids; |
2319 | if (oldest == TXNID_NONE_LIVING) { |
2320 | message_xids = root_xids; |
2321 | } |
2322 | else { |
2323 | int r = toku_xids_create_child(root_xids, &message_xids, oldest); |
2324 | invariant(r == 0); |
2325 | } |
2326 | |
2327 | DBT key; |
2328 | DBT val; |
2329 | toku_init_dbt(&key); |
2330 | toku_init_dbt(&val); |
2331 | ft_msg msg(&key, &val, FT_OPTIMIZE, ZERO_MSN, message_xids); |
2332 | |
2333 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
2334 | txn_manager_state txn_state_for_gc(txn_manager); |
2335 | |
2336 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
2337 | txn_gc_info gc_info(&txn_state_for_gc, |
2338 | oldest_referenced_xid_estimate, |
2339 | // no messages above us, we can implicitly promote uxrs based on this xid |
2340 | oldest_referenced_xid_estimate, |
2341 | true); |
2342 | toku_ft_root_put_msg(ft_h->ft, msg, &gc_info); |
2343 | toku_xids_destroy(&message_xids); |
2344 | } |
2345 | } |
2346 | |
2347 | void toku_ft_load(FT_HANDLE ft_handle, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) { |
2348 | FILENUM old_filenum = toku_cachefile_filenum(ft_handle->ft->cf); |
2349 | int do_log = 1; |
2350 | toku_ft_load_recovery(txn, old_filenum, new_iname, do_fsync, do_log, load_lsn); |
2351 | } |
2352 | |
2353 | // ft actions for logging hot index filenums |
2354 | void toku_ft_hot_index(FT_HANDLE ft_handle __attribute__ ((unused)), TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn) { |
2355 | int do_log = 1; |
2356 | toku_ft_hot_index_recovery(txn, filenums, do_fsync, do_log, lsn); |
2357 | } |
2358 | |
2359 | void |
2360 | toku_ft_log_put (TOKUTXN txn, FT_HANDLE ft_handle, const DBT *key, const DBT *val) { |
2361 | TOKULOGGER logger = toku_txn_logger(txn); |
2362 | if (logger) { |
2363 | BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; |
2364 | BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; |
2365 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2366 | toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_handle->ft->cf), xid, keybs, valbs); |
2367 | } |
2368 | } |
2369 | |
2370 | void |
2371 | toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *fts, uint32_t num_fts, const DBT *key, const DBT *val) { |
2372 | assert(txn); |
2373 | assert(num_fts > 0); |
2374 | TOKULOGGER logger = toku_txn_logger(txn); |
2375 | if (logger) { |
2376 | FILENUM fnums[num_fts]; |
2377 | uint32_t i; |
2378 | for (i = 0; i < num_fts; i++) { |
2379 | fnums[i] = toku_cachefile_filenum(fts[i]->ft->cf); |
2380 | } |
2381 | FILENUMS filenums = {.num = num_fts, .filenums = fnums}; |
2382 | BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; |
2383 | BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; |
2384 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2385 | FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE; |
2386 | toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs); |
2387 | } |
2388 | } |
2389 | |
2390 | TXN_MANAGER toku_ft_get_txn_manager(FT_HANDLE ft_h) { |
2391 | TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf); |
2392 | return logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr; |
2393 | } |
2394 | |
2395 | TXNID toku_ft_get_oldest_referenced_xid_estimate(FT_HANDLE ft_h) { |
2396 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
2397 | return txn_manager != nullptr ? toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager) : TXNID_NONE; |
2398 | } |
2399 | |
2400 | static void ft_txn_log_insert(FT ft, DBT *key, DBT *val, TOKUTXN txn, bool do_logging, enum ft_msg_type type) { |
2401 | paranoid_invariant(type == FT_INSERT || type == FT_INSERT_NO_OVERWRITE); |
2402 | |
2403 | //By default use committed messages |
2404 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2405 | if (txn) { |
2406 | BYTESTRING keybs = {key->size, (char *) key->data}; |
2407 | toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft->cf), &keybs); |
2408 | toku_txn_maybe_note_ft(txn, ft); |
2409 | } |
2410 | TOKULOGGER logger = toku_txn_logger(txn); |
2411 | if (do_logging && logger) { |
2412 | BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; |
2413 | BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; |
2414 | if (type == FT_INSERT) { |
2415 | toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft->cf), xid, keybs, valbs); |
2416 | } |
2417 | else { |
2418 | toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft->cf), xid, keybs, valbs); |
2419 | } |
2420 | } |
2421 | } |
2422 | |
2423 | void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) { |
2424 | ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, type); |
2425 | |
2426 | LSN treelsn; |
2427 | if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) { |
2428 | // do nothing |
2429 | } else { |
2430 | XIDS message_xids = txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids(); |
2431 | |
2432 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
2433 | txn_manager_state txn_state_for_gc(txn_manager); |
2434 | |
2435 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
2436 | txn_gc_info gc_info(&txn_state_for_gc, |
2437 | oldest_referenced_xid_estimate, |
2438 | // no messages above us, we can implicitly promote uxrs based on this xid |
2439 | oldest_referenced_xid_estimate, |
2440 | txn != nullptr ? !txn->for_recovery : false); |
2441 | int r = ft_maybe_insert_into_rightmost_leaf(ft_h->ft, key, val, message_xids, FT_INSERT, &gc_info, false); |
2442 | if (r != 0) { |
2443 | toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info); |
2444 | } |
2445 | toku_ft_adjust_logical_row_count(ft_h->ft, 1); |
2446 | } |
2447 | } |
2448 | |
2449 | static void ft_insert_directly_into_leaf(FT ft, FTNODE leaf, int target_childnum, DBT *key, DBT *val, |
2450 | XIDS message_xids, enum ft_msg_type type, txn_gc_info *gc_info) |
2451 | // Effect: Insert directly into a leaf node a fractal tree. Does not do any logging. |
2452 | // Requires: Leaf is fully in memory and pinned for write. |
2453 | // Requires: If this insertion were to happen through the root node, the promotion |
2454 | // algorithm would have selected the given leaf node as the point of injection. |
2455 | // That means this function relies on the current implementation of promotion. |
2456 | { |
2457 | ft_msg msg(key, val, type, ZERO_MSN, message_xids); |
2458 | size_t flow_deltas[] = { 0, 0 }; |
2459 | inject_message_in_locked_node(ft, leaf, target_childnum, msg, flow_deltas, gc_info); |
2460 | } |
2461 | |
2462 | static void |
2463 | ft_send_update_msg(FT_HANDLE ft_h, const ft_msg &msg, TOKUTXN txn) { |
2464 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
2465 | txn_manager_state txn_state_for_gc(txn_manager); |
2466 | |
2467 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
2468 | txn_gc_info gc_info(&txn_state_for_gc, |
2469 | oldest_referenced_xid_estimate, |
2470 | // no messages above us, we can implicitly promote uxrs based on this xid |
2471 | oldest_referenced_xid_estimate, |
2472 | txn != nullptr ? !txn->for_recovery : false); |
2473 | toku_ft_root_put_msg(ft_h->ft, msg, &gc_info); |
2474 | } |
2475 | |
2476 | void toku_ft_maybe_update(FT_HANDLE ft_h, |
2477 | const DBT *key, |
2478 | const DBT *, |
2479 | TOKUTXN txn, |
2480 | bool oplsn_valid, |
2481 | LSN oplsn, |
2482 | bool do_logging) { |
2483 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2484 | if (txn) { |
2485 | BYTESTRING keybs = {key->size, (char *)key->data}; |
2486 | toku_logger_save_rollback_cmdupdate( |
2487 | txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs); |
2488 | toku_txn_maybe_note_ft(txn, ft_h->ft); |
2489 | } |
2490 | |
2491 | TOKULOGGER logger; |
2492 | logger = toku_txn_logger(txn); |
2493 | if (do_logging && logger) { |
2494 | BYTESTRING keybs = {.len = key->size, .data = (char *)key->data}; |
2495 | BYTESTRING = {.len = update_function_extra->size, |
2496 | .data = (char *)update_function_extra->data}; |
2497 | toku_log_enq_update(logger, |
2498 | NULL, |
2499 | 0, |
2500 | txn, |
2501 | toku_cachefile_filenum(ft_h->ft->cf), |
2502 | xid, |
2503 | keybs, |
2504 | extrabs); |
2505 | } |
2506 | |
2507 | LSN treelsn; |
2508 | if (oplsn_valid && |
2509 | oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) { |
2510 | // do nothing |
2511 | } else { |
2512 | XIDS message_xids = |
2513 | txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids(); |
2514 | ft_msg msg( |
2515 | key, update_function_extra, FT_UPDATE, ZERO_MSN, message_xids); |
2516 | ft_send_update_msg(ft_h, msg, txn); |
2517 | } |
2518 | // updates get converted to insert messages, which should do a -1 on the |
2519 | // logical row count when the messages are permanently applied |
2520 | toku_ft_adjust_logical_row_count(ft_h->ft, 1); |
2521 | } |
2522 | |
2523 | void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *, |
2524 | TOKUTXN txn, bool oplsn_valid, LSN oplsn, |
2525 | bool do_logging, bool is_resetting_op) { |
2526 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2527 | uint8_t resetting = is_resetting_op ? 1 : 0; |
2528 | if (txn) { |
2529 | toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(ft_h->ft->cf), resetting); |
2530 | toku_txn_maybe_note_ft(txn, ft_h->ft); |
2531 | } |
2532 | |
2533 | TOKULOGGER logger; |
2534 | logger = toku_txn_logger(txn); |
2535 | if (do_logging && logger) { |
2536 | BYTESTRING = {.len=update_function_extra->size, |
2537 | .data = (char *) update_function_extra->data}; |
2538 | toku_log_enq_updatebroadcast(logger, NULL, 0, txn, |
2539 | toku_cachefile_filenum(ft_h->ft->cf), |
2540 | xid, extrabs, resetting); |
2541 | } |
2542 | |
2543 | //TODO(yoni): remove treelsn here and similar calls (no longer being used) |
2544 | LSN treelsn; |
2545 | if (oplsn_valid && |
2546 | oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) { |
2547 | |
2548 | } else { |
2549 | DBT empty_dbt; |
2550 | XIDS message_xids = txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids(); |
2551 | ft_msg msg(toku_init_dbt(&empty_dbt), update_function_extra, FT_UPDATE_BROADCAST_ALL, ZERO_MSN, message_xids); |
2552 | ft_send_update_msg(ft_h, msg, txn); |
2553 | } |
2554 | } |
2555 | |
2556 | void toku_ft_send_insert(FT_HANDLE ft_handle, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, txn_gc_info *gc_info) { |
2557 | ft_msg msg(key, val, type, ZERO_MSN, xids); |
2558 | toku_ft_root_put_msg(ft_handle->ft, msg, gc_info); |
2559 | } |
2560 | |
2561 | void toku_ft_send_commit_any(FT_HANDLE ft_handle, DBT *key, XIDS xids, txn_gc_info *gc_info) { |
2562 | DBT val; |
2563 | ft_msg msg(key, toku_init_dbt(&val), FT_COMMIT_ANY, ZERO_MSN, xids); |
2564 | toku_ft_root_put_msg(ft_handle->ft, msg, gc_info); |
2565 | } |
2566 | |
2567 | void toku_ft_delete(FT_HANDLE ft_handle, DBT *key, TOKUTXN txn) { |
2568 | toku_ft_maybe_delete(ft_handle, key, txn, false, ZERO_LSN, true); |
2569 | } |
2570 | |
2571 | void |
2572 | toku_ft_log_del(TOKUTXN txn, FT_HANDLE ft_handle, const DBT *key) { |
2573 | TOKULOGGER logger = toku_txn_logger(txn); |
2574 | if (logger) { |
2575 | BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; |
2576 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2577 | toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_handle->ft->cf), xid, keybs); |
2578 | } |
2579 | } |
2580 | |
2581 | void |
2582 | toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *fts, uint32_t num_fts, const DBT *key, const DBT *val) { |
2583 | assert(txn); |
2584 | assert(num_fts > 0); |
2585 | TOKULOGGER logger = toku_txn_logger(txn); |
2586 | if (logger) { |
2587 | FILENUM fnums[num_fts]; |
2588 | uint32_t i; |
2589 | for (i = 0; i < num_fts; i++) { |
2590 | fnums[i] = toku_cachefile_filenum(fts[i]->ft->cf); |
2591 | } |
2592 | FILENUMS filenums = {.num = num_fts, .filenums = fnums}; |
2593 | BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; |
2594 | BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; |
2595 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2596 | FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE; |
2597 | toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs); |
2598 | } |
2599 | } |
2600 | |
2601 | void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging) { |
2602 | XIDS message_xids = toku_xids_get_root_xids(); //By default use committed messages |
2603 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2604 | if (txn) { |
2605 | BYTESTRING keybs = {key->size, (char *) key->data}; |
2606 | toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs); |
2607 | toku_txn_maybe_note_ft(txn, ft_h->ft); |
2608 | message_xids = toku_txn_get_xids(txn); |
2609 | } |
2610 | TOKULOGGER logger = toku_txn_logger(txn); |
2611 | if (do_logging && logger) { |
2612 | BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; |
2613 | toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs); |
2614 | } |
2615 | |
2616 | LSN treelsn; |
2617 | if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) { |
2618 | // do nothing |
2619 | } else { |
2620 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
2621 | txn_manager_state txn_state_for_gc(txn_manager); |
2622 | |
2623 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
2624 | txn_gc_info gc_info(&txn_state_for_gc, |
2625 | oldest_referenced_xid_estimate, |
2626 | // no messages above us, we can implicitly promote uxrs based on this xid |
2627 | oldest_referenced_xid_estimate, |
2628 | txn != nullptr ? !txn->for_recovery : false); |
2629 | toku_ft_send_delete(ft_h, key, message_xids, &gc_info); |
2630 | toku_ft_adjust_logical_row_count(ft_h->ft, -1); |
2631 | } |
2632 | } |
2633 | |
2634 | void toku_ft_send_delete(FT_HANDLE ft_handle, DBT *key, XIDS xids, txn_gc_info *gc_info) { |
2635 | DBT val; toku_init_dbt(&val); |
2636 | ft_msg msg(key, toku_init_dbt(&val), FT_DELETE_ANY, ZERO_MSN, xids); |
2637 | toku_ft_root_put_msg(ft_handle->ft, msg, gc_info); |
2638 | } |
2639 | |
2640 | /* ******************** open,close and create ********************** */ |
2641 | |
2642 | // Test only function (not used in running system). This one has no env |
2643 | int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *ft_handle_p, int nodesize, |
2644 | int basementnodesize, |
2645 | enum toku_compression_method compression_method, |
2646 | CACHETABLE cachetable, TOKUTXN txn, |
2647 | int (*compare_fun)(DB *, const DBT*,const DBT*)) { |
2648 | FT_HANDLE ft_handle; |
2649 | const int only_create = 0; |
2650 | |
2651 | toku_ft_handle_create(&ft_handle); |
2652 | toku_ft_handle_set_nodesize(ft_handle, nodesize); |
2653 | toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize); |
2654 | toku_ft_handle_set_compression_method(ft_handle, compression_method); |
2655 | toku_ft_handle_set_fanout(ft_handle, 16); |
2656 | toku_ft_set_bt_compare(ft_handle, compare_fun); |
2657 | |
2658 | int r = toku_ft_handle_open(ft_handle, fname, is_create, only_create, cachetable, txn); |
2659 | if (r != 0) { |
2660 | return r; |
2661 | } |
2662 | |
2663 | *ft_handle_p = ft_handle; |
2664 | return r; |
2665 | } |
2666 | |
2667 | static bool use_direct_io = true; |
2668 | |
2669 | void toku_ft_set_direct_io (bool direct_io_on) { |
2670 | use_direct_io = direct_io_on; |
2671 | } |
2672 | |
2673 | static inline int ft_open_maybe_direct(const char *filename, |
2674 | int oflag, |
2675 | int mode) { |
2676 | if (use_direct_io) { |
2677 | return toku_os_open_direct( |
2678 | filename, oflag, mode, *tokudb_file_data_key); |
2679 | } else { |
2680 | return toku_os_open(filename, oflag, mode, *tokudb_file_data_key); |
2681 | } |
2682 | } |
2683 | |
2684 | static const mode_t file_mode = S_IRUSR+S_IWUSR+S_IRGRP+S_IWGRP+S_IROTH+S_IWOTH; |
2685 | |
2686 | inline bool toku_file_is_root(const char *path, const char *last_slash) { |
2687 | return last_slash == path; |
2688 | } |
2689 | |
2690 | static std::unique_ptr<char[], decltype(&toku_free)> toku_file_get_parent_dir( |
2691 | const char *path) { |
2692 | std::unique_ptr<char[], decltype(&toku_free)> result(nullptr, &toku_free); |
2693 | |
2694 | bool has_trailing_slash = false; |
2695 | |
2696 | /* Find the offset of the last slash */ |
2697 | const char *last_slash = strrchr(path, OS_PATH_SEPARATOR); |
2698 | |
2699 | if (!last_slash) { |
2700 | /* No slash in the path, return NULL */ |
2701 | return result; |
2702 | } |
2703 | |
2704 | /* Ok, there is a slash. Is there anything after it? */ |
2705 | if (static_cast<size_t>(last_slash - path + 1) == strlen(path)) { |
2706 | has_trailing_slash = true; |
2707 | } |
2708 | |
2709 | /* Reduce repetative slashes. */ |
2710 | while (last_slash > path && last_slash[-1] == OS_PATH_SEPARATOR) { |
2711 | last_slash--; |
2712 | } |
2713 | |
2714 | /* Check for the root of a drive. */ |
2715 | if (toku_file_is_root(path, last_slash)) { |
2716 | return result; |
2717 | } |
2718 | |
2719 | /* If a trailing slash prevented the first strrchr() from trimming |
2720 | the last component of the path, trim that component now. */ |
2721 | if (has_trailing_slash) { |
2722 | /* Back up to the previous slash. */ |
2723 | last_slash--; |
2724 | while (last_slash > path && last_slash[0] != OS_PATH_SEPARATOR) { |
2725 | last_slash--; |
2726 | } |
2727 | |
2728 | /* Reduce repetative slashes. */ |
2729 | while (last_slash > path && last_slash[-1] == OS_PATH_SEPARATOR) { |
2730 | last_slash--; |
2731 | } |
2732 | } |
2733 | |
2734 | /* Check for the root of a drive. */ |
2735 | if (toku_file_is_root(path, last_slash)) { |
2736 | return result; |
2737 | } |
2738 | |
2739 | result.reset(toku_strndup(path, last_slash - path)); |
2740 | return result; |
2741 | } |
2742 | |
2743 | bool toku_create_subdirs_if_needed(const char *path) { |
2744 | static const mode_t dir_mode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | |
2745 | S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH; |
2746 | |
2747 | toku_struct_stat stat; |
2748 | bool subdir_exists = true; |
2749 | auto subdir = toku_file_get_parent_dir(path); |
2750 | |
2751 | if (!subdir.get()) |
2752 | return true; |
2753 | |
2754 | if (toku_stat(subdir.get(), &stat, toku_uninstrumented) == -1) { |
2755 | if (ENOENT == get_error_errno()) |
2756 | subdir_exists = false; |
2757 | else |
2758 | return false; |
2759 | } |
2760 | |
2761 | if (subdir_exists) { |
2762 | if (!S_ISDIR(stat.st_mode)) |
2763 | return false; |
2764 | return true; |
2765 | } |
2766 | |
2767 | if (!toku_create_subdirs_if_needed(subdir.get())) |
2768 | return false; |
2769 | |
2770 | if (toku_os_mkdir(subdir.get(), dir_mode)) |
2771 | return false; |
2772 | |
2773 | return true; |
2774 | } |
2775 | |
2776 | // open a file for use by the ft |
2777 | // Requires: File does not exist. |
2778 | static int ft_create_file(FT_HANDLE UU(ft_handle), const char *fname, int *fdp) { |
2779 | int r; |
2780 | int fd; |
2781 | int er; |
2782 | if (!toku_create_subdirs_if_needed(fname)) |
2783 | return get_error_errno(); |
2784 | fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, file_mode); |
2785 | assert(fd==-1); |
2786 | if ((er = get_maybe_error_errno()) != ENOENT) { |
2787 | return er; |
2788 | } |
2789 | fd = ft_open_maybe_direct(fname, O_RDWR | O_CREAT | O_BINARY, file_mode); |
2790 | if (fd==-1) { |
2791 | r = get_error_errno(); |
2792 | return r; |
2793 | } |
2794 | |
2795 | r = toku_fsync_directory(fname); |
2796 | if (r == 0) { |
2797 | *fdp = fd; |
2798 | } else { |
2799 | int rr = close(fd); |
2800 | assert_zero(rr); |
2801 | } |
2802 | return r; |
2803 | } |
2804 | |
2805 | // open a file for use by the ft. if the file does not exist, error |
2806 | static int ft_open_file(const char *fname, int *fdp) { |
2807 | int fd; |
2808 | fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, file_mode); |
2809 | if (fd==-1) { |
2810 | return get_error_errno(); |
2811 | } |
2812 | *fdp = fd; |
2813 | return 0; |
2814 | } |
2815 | |
2816 | void |
2817 | toku_ft_handle_set_compression_method(FT_HANDLE t, enum toku_compression_method method) |
2818 | { |
2819 | if (t->ft) { |
2820 | toku_ft_set_compression_method(t->ft, method); |
2821 | } |
2822 | else { |
2823 | t->options.compression_method = method; |
2824 | } |
2825 | } |
2826 | |
2827 | void |
2828 | toku_ft_handle_get_compression_method(FT_HANDLE t, enum toku_compression_method *methodp) |
2829 | { |
2830 | if (t->ft) { |
2831 | toku_ft_get_compression_method(t->ft, methodp); |
2832 | } |
2833 | else { |
2834 | *methodp = t->options.compression_method; |
2835 | } |
2836 | } |
2837 | |
2838 | void |
2839 | toku_ft_handle_set_fanout(FT_HANDLE ft_handle, unsigned int fanout) |
2840 | { |
2841 | if (ft_handle->ft) { |
2842 | toku_ft_set_fanout(ft_handle->ft, fanout); |
2843 | } |
2844 | else { |
2845 | ft_handle->options.fanout = fanout; |
2846 | } |
2847 | } |
2848 | |
2849 | void |
2850 | toku_ft_handle_get_fanout(FT_HANDLE ft_handle, unsigned int *fanout) |
2851 | { |
2852 | if (ft_handle->ft) { |
2853 | toku_ft_get_fanout(ft_handle->ft, fanout); |
2854 | } |
2855 | else { |
2856 | *fanout = ft_handle->options.fanout; |
2857 | } |
2858 | } |
2859 | |
2860 | // The memcmp magic byte may be set on a per fractal tree basis to communicate |
2861 | // that if two keys begin with this byte, they may be compared with the builtin |
2862 | // key comparison function. This greatly optimizes certain in-memory workloads, |
2863 | // such as lookups by OID primary key in TokuMX. |
2864 | int toku_ft_handle_set_memcmp_magic(FT_HANDLE ft_handle, uint8_t magic) { |
2865 | if (magic == comparator::MEMCMP_MAGIC_NONE) { |
2866 | return EINVAL; |
2867 | } |
2868 | if (ft_handle->ft != nullptr) { |
2869 | // if the handle is already open, then we cannot set the memcmp magic |
2870 | // (because it may or may not have been set by someone else already) |
2871 | return EINVAL; |
2872 | } |
2873 | ft_handle->options.memcmp_magic = magic; |
2874 | return 0; |
2875 | } |
2876 | |
2877 | static int |
2878 | verify_builtin_comparisons_consistent(FT_HANDLE t, uint32_t flags) { |
2879 | if ((flags & TOKU_DB_KEYCMP_BUILTIN) && (t->options.compare_fun != toku_builtin_compare_fun)) { |
2880 | return EINVAL; |
2881 | } |
2882 | return 0; |
2883 | } |
2884 | |
2885 | // |
2886 | // See comments in toku_db_change_descriptor to understand invariants |
2887 | // in the system when this function is called |
2888 | // |
2889 | void toku_ft_change_descriptor( |
2890 | FT_HANDLE ft_h, |
2891 | const DBT* old_descriptor, |
2892 | const DBT* new_descriptor, |
2893 | bool do_log, |
2894 | TOKUTXN txn, |
2895 | bool update_cmp_descriptor |
2896 | ) |
2897 | { |
2898 | DESCRIPTOR_S new_d; |
2899 | |
2900 | // if running with txns, save to rollback + write to recovery log |
2901 | if (txn) { |
2902 | // put information into rollback file |
2903 | BYTESTRING old_desc_bs = { old_descriptor->size, (char *) old_descriptor->data }; |
2904 | BYTESTRING new_desc_bs = { new_descriptor->size, (char *) new_descriptor->data }; |
2905 | toku_logger_save_rollback_change_fdescriptor( |
2906 | txn, |
2907 | toku_cachefile_filenum(ft_h->ft->cf), |
2908 | &old_desc_bs |
2909 | ); |
2910 | toku_txn_maybe_note_ft(txn, ft_h->ft); |
2911 | |
2912 | if (do_log) { |
2913 | TOKULOGGER logger = toku_txn_logger(txn); |
2914 | TXNID_PAIR xid = toku_txn_get_txnid(txn); |
2915 | toku_log_change_fdescriptor( |
2916 | logger, NULL, 0, |
2917 | txn, |
2918 | toku_cachefile_filenum(ft_h->ft->cf), |
2919 | xid, |
2920 | old_desc_bs, |
2921 | new_desc_bs, |
2922 | update_cmp_descriptor |
2923 | ); |
2924 | } |
2925 | } |
2926 | |
2927 | // write new_descriptor to header |
2928 | new_d.dbt = *new_descriptor; |
2929 | toku_ft_update_descriptor(ft_h->ft, &new_d); |
2930 | // very infrequent operation, worth precise threadsafe count |
2931 | FT_STATUS_INC(FT_DESCRIPTOR_SET, 1); |
2932 | |
2933 | if (update_cmp_descriptor) { |
2934 | toku_ft_update_cmp_descriptor(ft_h->ft); |
2935 | } |
2936 | } |
2937 | |
2938 | static void |
2939 | toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) { |
2940 | struct ft_options options = { |
2941 | .nodesize = ft->h->nodesize, |
2942 | .basementnodesize = ft->h->basementnodesize, |
2943 | .compression_method = ft->h->compression_method, |
2944 | .fanout = ft->h->fanout, |
2945 | .flags = ft->h->flags, |
2946 | .memcmp_magic = ft->cmp.get_memcmp_magic(), |
2947 | .compare_fun = ft->cmp.get_compare_func(), |
2948 | .update_fun = ft->update_fun |
2949 | }; |
2950 | t->options = options; |
2951 | t->did_set_flags = true; |
2952 | } |
2953 | |
2954 | // This is the actual open, used for various purposes, such as normal use, recovery, and redirect. |
2955 | // fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix). |
2956 | // The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn . |
2957 | // Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring. |
2958 | static int |
2959 | ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) { |
2960 | int r; |
2961 | bool txn_created = false; |
2962 | char *fname_in_cwd = NULL; |
2963 | CACHEFILE cf = NULL; |
2964 | FT ft = NULL; |
2965 | bool did_create = false; |
2966 | bool was_already_open = false; |
2967 | |
2968 | toku_ft_open_close_lock(); |
2969 | |
2970 | if (ft_h->did_set_flags) { |
2971 | r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags); |
2972 | if (r!=0) { goto exit; } |
2973 | } |
2974 | |
2975 | assert(is_create || !only_create); |
2976 | FILENUM reserved_filenum; |
2977 | reserved_filenum = use_filenum; |
2978 | fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env); |
2979 | { |
2980 | int fd = -1; |
2981 | r = ft_open_file(fname_in_cwd, &fd); |
2982 | if (reserved_filenum.fileid == FILENUM_NONE.fileid) { |
2983 | reserved_filenum = toku_cachetable_reserve_filenum(cachetable); |
2984 | } |
2985 | if (r==ENOENT && is_create) { |
2986 | did_create = true; |
2987 | if (txn) { |
2988 | BYTESTRING bs = { .len=(uint32_t) strlen(fname_in_env), .data = (char*)fname_in_env }; |
2989 | toku_logger_save_rollback_fcreate(txn, reserved_filenum, &bs); // bs is a copy of the fname relative to the environment |
2990 | } |
2991 | txn_created = (bool)(txn!=NULL); |
2992 | toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, file_mode, ft_h->options.flags, ft_h->options.nodesize, ft_h->options.basementnodesize, ft_h->options.compression_method); |
2993 | r = ft_create_file(ft_h, fname_in_cwd, &fd); |
2994 | if (r) { goto exit; } |
2995 | } |
2996 | if (r) { goto exit; } |
2997 | r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum, &was_already_open); |
2998 | if (r) { goto exit; } |
2999 | } |
3000 | assert(ft_h->options.nodesize>0); |
3001 | if (is_create) { |
3002 | r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft); |
3003 | if (r==TOKUDB_DICTIONARY_NO_HEADER) { |
3004 | toku_ft_create(&ft, &ft_h->options, cf, txn); |
3005 | } |
3006 | else if (r!=0) { |
3007 | goto exit; |
3008 | } |
3009 | else if (only_create) { |
3010 | assert_zero(r); |
3011 | r = EEXIST; |
3012 | goto exit; |
3013 | } |
3014 | // if we get here, then is_create was true but only_create was false, |
3015 | // so it is ok for toku_read_ft_and_store_in_cachefile to have read |
3016 | // the header via toku_read_ft_and_store_in_cachefile |
3017 | } else { |
3018 | r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft); |
3019 | if (r) { goto exit; } |
3020 | } |
3021 | if (!ft_h->did_set_flags) { |
3022 | r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags); |
3023 | if (r) { goto exit; } |
3024 | } else if (ft_h->options.flags != ft->h->flags) { /* if flags have been set then flags must match */ |
3025 | r = EINVAL; |
3026 | goto exit; |
3027 | } |
3028 | |
3029 | // Ensure that the memcmp magic bits are consistent, if set. |
3030 | if (ft->cmp.get_memcmp_magic() != toku::comparator::MEMCMP_MAGIC_NONE && |
3031 | ft_h->options.memcmp_magic != toku::comparator::MEMCMP_MAGIC_NONE && |
3032 | ft_h->options.memcmp_magic != ft->cmp.get_memcmp_magic()) { |
3033 | r = EINVAL; |
3034 | goto exit; |
3035 | } |
3036 | toku_ft_handle_inherit_options(ft_h, ft); |
3037 | |
3038 | if (!was_already_open) { |
3039 | if (!did_create) { //Only log the fopen that OPENs the file. If it was already open, don't log. |
3040 | toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), ft_h->options.flags); |
3041 | } |
3042 | } |
3043 | int use_reserved_dict_id; |
3044 | use_reserved_dict_id = use_dictionary_id.dictid != DICTIONARY_ID_NONE.dictid; |
3045 | if (!was_already_open) { |
3046 | DICTIONARY_ID dict_id; |
3047 | if (use_reserved_dict_id) { |
3048 | dict_id = use_dictionary_id; |
3049 | } |
3050 | else { |
3051 | dict_id = next_dict_id(); |
3052 | } |
3053 | ft->dict_id = dict_id; |
3054 | } |
3055 | else { |
3056 | // dict_id is already in header |
3057 | if (use_reserved_dict_id) { |
3058 | assert(ft->dict_id.dictid == use_dictionary_id.dictid); |
3059 | } |
3060 | } |
3061 | assert(ft); |
3062 | assert(ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid); |
3063 | assert(ft->dict_id.dictid < dict_id_serial); |
3064 | |
3065 | // important note here, |
3066 | // after this point, where we associate the header |
3067 | // with the ft_handle, the function is not allowed to fail |
3068 | // Code that handles failure (located below "exit"), |
3069 | // depends on this |
3070 | toku_ft_note_ft_handle_open(ft, ft_h); |
3071 | if (txn_created) { |
3072 | assert(txn); |
3073 | toku_txn_maybe_note_ft(txn, ft); |
3074 | } |
3075 | |
3076 | // Opening an ft may restore to previous checkpoint. |
3077 | // Truncate if necessary. |
3078 | { |
3079 | int fd = toku_cachefile_get_fd (ft->cf); |
3080 | ft->blocktable.maybe_truncate_file_on_open(fd); |
3081 | } |
3082 | |
3083 | r = 0; |
3084 | exit: |
3085 | if (fname_in_cwd) { |
3086 | toku_free(fname_in_cwd); |
3087 | } |
3088 | if (r != 0 && cf) { |
3089 | if (ft) { |
3090 | // we only call toku_ft_note_ft_handle_open |
3091 | // when the function succeeds, so if we are here, |
3092 | // then that means we have a reference to the header |
3093 | // but we have not linked it to this ft. So, |
3094 | // we can simply try to remove the header. |
3095 | // We don't need to unlink this ft from the header |
3096 | toku_ft_grab_reflock(ft); |
3097 | bool needed = toku_ft_needed_unlocked(ft); |
3098 | toku_ft_release_reflock(ft); |
3099 | if (!needed) { |
3100 | // close immediately. |
3101 | toku_ft_evict_from_memory(ft, false, ZERO_LSN); |
3102 | } |
3103 | } |
3104 | else { |
3105 | toku_cachefile_close(&cf, false, ZERO_LSN); |
3106 | } |
3107 | } |
3108 | toku_ft_open_close_unlock(); |
3109 | return r; |
3110 | } |
3111 | |
3112 | // Open an ft for the purpose of recovery, which requires that the ft be open to a pre-determined FILENUM |
3113 | // and may require a specific checkpointed version of the file. |
3114 | // (dict_id is assigned by the ft_handle_open() function.) |
3115 | int |
3116 | toku_ft_handle_open_recovery(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, LSN max_acceptable_lsn) { |
3117 | int r; |
3118 | assert(use_filenum.fileid != FILENUM_NONE.fileid); |
3119 | r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable, |
3120 | txn, use_filenum, DICTIONARY_ID_NONE, max_acceptable_lsn); |
3121 | return r; |
3122 | } |
3123 | |
3124 | // Open an ft in normal use. The FILENUM and dict_id are assigned by the ft_handle_open() function. |
3125 | // Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring. |
3126 | int |
3127 | toku_ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn) { |
3128 | int r; |
3129 | r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable, txn, FILENUM_NONE, DICTIONARY_ID_NONE, MAX_LSN); |
3130 | return r; |
3131 | } |
3132 | |
3133 | // clone an ft handle. the cloned handle has a new dict_id but refers to the same fractal tree |
3134 | int |
3135 | toku_ft_handle_clone(FT_HANDLE *cloned_ft_handle, FT_HANDLE ft_handle, TOKUTXN txn) { |
3136 | FT_HANDLE result_ft_handle; |
3137 | toku_ft_handle_create(&result_ft_handle); |
3138 | |
3139 | // we're cloning, so the handle better have an open ft and open cf |
3140 | invariant(ft_handle->ft); |
3141 | invariant(ft_handle->ft->cf); |
3142 | |
3143 | // inherit the options of the ft whose handle is being cloned. |
3144 | toku_ft_handle_inherit_options(result_ft_handle, ft_handle->ft); |
3145 | |
3146 | // we can clone the handle by creating a new handle with the same fname |
3147 | CACHEFILE cf = ft_handle->ft->cf; |
3148 | CACHETABLE ct = toku_cachefile_get_cachetable(cf); |
3149 | const char *fname_in_env = toku_cachefile_fname_in_env(cf); |
3150 | int r = toku_ft_handle_open(result_ft_handle, fname_in_env, false, false, ct, txn); |
3151 | if (r != 0) { |
3152 | toku_ft_handle_close(result_ft_handle); |
3153 | result_ft_handle = NULL; |
3154 | } |
3155 | *cloned_ft_handle = result_ft_handle; |
3156 | return r; |
3157 | } |
3158 | |
3159 | // Open an ft in normal use. The FILENUM and dict_id are assigned by the ft_handle_open() function. |
3160 | int |
3161 | toku_ft_handle_open_with_dict_id( |
3162 | FT_HANDLE t, |
3163 | const char *fname_in_env, |
3164 | int is_create, |
3165 | int only_create, |
3166 | CACHETABLE cachetable, |
3167 | TOKUTXN txn, |
3168 | DICTIONARY_ID use_dictionary_id |
3169 | ) |
3170 | { |
3171 | int r; |
3172 | r = ft_handle_open( |
3173 | t, |
3174 | fname_in_env, |
3175 | is_create, |
3176 | only_create, |
3177 | cachetable, |
3178 | txn, |
3179 | FILENUM_NONE, |
3180 | use_dictionary_id, |
3181 | MAX_LSN |
3182 | ); |
3183 | return r; |
3184 | } |
3185 | |
3186 | DICTIONARY_ID |
3187 | toku_ft_get_dictionary_id(FT_HANDLE ft_handle) { |
3188 | FT ft = ft_handle->ft; |
3189 | return ft->dict_id; |
3190 | } |
3191 | |
3192 | void toku_ft_set_flags(FT_HANDLE ft_handle, unsigned int flags) { |
3193 | ft_handle->did_set_flags = true; |
3194 | ft_handle->options.flags = flags; |
3195 | } |
3196 | |
3197 | void toku_ft_get_flags(FT_HANDLE ft_handle, unsigned int *flags) { |
3198 | *flags = ft_handle->options.flags; |
3199 | } |
3200 | |
3201 | void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, unsigned int *max_val_len) |
3202 | // return the maximum advisable key value lengths. The ft doesn't enforce these. |
3203 | { |
3204 | *max_key_len = 32*1024; |
3205 | *max_val_len = 32*1024*1024; |
3206 | } |
3207 | |
3208 | |
3209 | void toku_ft_handle_set_nodesize(FT_HANDLE ft_handle, unsigned int nodesize) { |
3210 | if (ft_handle->ft) { |
3211 | toku_ft_set_nodesize(ft_handle->ft, nodesize); |
3212 | } |
3213 | else { |
3214 | ft_handle->options.nodesize = nodesize; |
3215 | } |
3216 | } |
3217 | |
3218 | void toku_ft_handle_get_nodesize(FT_HANDLE ft_handle, unsigned int *nodesize) { |
3219 | if (ft_handle->ft) { |
3220 | toku_ft_get_nodesize(ft_handle->ft, nodesize); |
3221 | } |
3222 | else { |
3223 | *nodesize = ft_handle->options.nodesize; |
3224 | } |
3225 | } |
3226 | |
3227 | void toku_ft_handle_set_basementnodesize(FT_HANDLE ft_handle, unsigned int basementnodesize) { |
3228 | if (ft_handle->ft) { |
3229 | toku_ft_set_basementnodesize(ft_handle->ft, basementnodesize); |
3230 | } |
3231 | else { |
3232 | ft_handle->options.basementnodesize = basementnodesize; |
3233 | } |
3234 | } |
3235 | |
3236 | void toku_ft_handle_get_basementnodesize(FT_HANDLE ft_handle, unsigned int *basementnodesize) { |
3237 | if (ft_handle->ft) { |
3238 | toku_ft_get_basementnodesize(ft_handle->ft, basementnodesize); |
3239 | } |
3240 | else { |
3241 | *basementnodesize = ft_handle->options.basementnodesize; |
3242 | } |
3243 | } |
3244 | |
3245 | void toku_ft_set_bt_compare(FT_HANDLE ft_handle, int (*bt_compare)(DB*, const DBT*, const DBT*)) { |
3246 | ft_handle->options.compare_fun = bt_compare; |
3247 | } |
3248 | |
3249 | void toku_ft_set_redirect_callback(FT_HANDLE ft_handle, on_redirect_callback redir_cb, void* ) { |
3250 | ft_handle->redirect_callback = redir_cb; |
3251 | ft_handle->redirect_callback_extra = extra; |
3252 | } |
3253 | |
3254 | void toku_ft_set_update(FT_HANDLE ft_handle, ft_update_func update_fun) { |
3255 | ft_handle->options.update_fun = update_fun; |
3256 | } |
3257 | |
3258 | const toku::comparator &toku_ft_get_comparator(FT_HANDLE ft_handle) { |
3259 | invariant_notnull(ft_handle->ft); |
3260 | return ft_handle->ft->cmp; |
3261 | } |
3262 | |
3263 | static void |
3264 | ft_remove_handle_ref_callback(FT UU(ft), void *) { |
3265 | FT_HANDLE CAST_FROM_VOIDP(handle, extra); |
3266 | toku_list_remove(&handle->live_ft_handle_link); |
3267 | } |
3268 | |
3269 | static void ft_handle_close(FT_HANDLE ft_handle, bool oplsn_valid, LSN oplsn) { |
3270 | FT ft = ft_handle->ft; |
3271 | // There are error paths in the ft_handle_open that end with ft_handle->ft == nullptr. |
3272 | if (ft != nullptr) { |
3273 | toku_ft_remove_reference(ft, oplsn_valid, oplsn, ft_remove_handle_ref_callback, ft_handle); |
3274 | } |
3275 | toku_free(ft_handle); |
3276 | } |
3277 | |
3278 | // close an ft handle during normal operation. the underlying ft may or may not close, |
3279 | // depending if there are still references. an lsn for this close will come from the logger. |
3280 | void toku_ft_handle_close(FT_HANDLE ft_handle) { |
3281 | ft_handle_close(ft_handle, false, ZERO_LSN); |
3282 | } |
3283 | |
3284 | // close an ft handle during recovery. the underlying ft must close, and will use the given lsn. |
3285 | void toku_ft_handle_close_recovery(FT_HANDLE ft_handle, LSN oplsn) { |
3286 | // the ft must exist if closing during recovery. error paths during |
3287 | // open for recovery should close handles using toku_ft_handle_close() |
3288 | invariant_notnull(ft_handle->ft); |
3289 | ft_handle_close(ft_handle, true, oplsn); |
3290 | } |
3291 | |
3292 | // TODO: remove this, callers should instead just use toku_ft_handle_close() |
3293 | int toku_close_ft_handle_nolsn(FT_HANDLE ft_handle, char **UU(error_string)) { |
3294 | toku_ft_handle_close(ft_handle); |
3295 | return 0; |
3296 | } |
3297 | |
3298 | void toku_ft_handle_create(FT_HANDLE *ft_handle_ptr) { |
3299 | FT_HANDLE XMALLOC(ft_handle); |
3300 | memset(ft_handle, 0, sizeof *ft_handle); |
3301 | toku_list_init(&ft_handle->live_ft_handle_link); |
3302 | ft_handle->options.flags = 0; |
3303 | ft_handle->did_set_flags = false; |
3304 | ft_handle->options.nodesize = FT_DEFAULT_NODE_SIZE; |
3305 | ft_handle->options.basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE; |
3306 | ft_handle->options.compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; |
3307 | ft_handle->options.fanout = FT_DEFAULT_FANOUT; |
3308 | ft_handle->options.compare_fun = toku_builtin_compare_fun; |
3309 | ft_handle->options.update_fun = NULL; |
3310 | *ft_handle_ptr = ft_handle; |
3311 | } |
3312 | |
3313 | /******************************* search ***************************************/ |
3314 | |
3315 | // Return true if this key is within the search bound. If there is no search bound then the tree search continues. |
3316 | static bool search_continue(ft_search *search, void *key, uint32_t key_len) { |
3317 | bool result = true; |
3318 | if (search->direction == FT_SEARCH_LEFT && search->k_bound) { |
3319 | FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context); |
3320 | DBT this_key = { .data = key, .size = key_len }; |
3321 | // search continues if this key <= key bound |
3322 | result = (ft_handle->ft->cmp(&this_key, search->k_bound) <= 0); |
3323 | } |
3324 | return result; |
3325 | } |
3326 | |
3327 | static int heaviside_from_search_t(const DBT &kdbt, ft_search &search) { |
3328 | int cmp = search.compare(search, |
3329 | search.k ? &kdbt : 0); |
3330 | // The search->compare function returns only 0 or 1 |
3331 | switch (search.direction) { |
3332 | case FT_SEARCH_LEFT: return cmp==0 ? -1 : +1; |
3333 | case FT_SEARCH_RIGHT: return cmp==0 ? +1 : -1; // Because the comparison runs backwards for right searches. |
3334 | } |
3335 | abort(); return 0; |
3336 | } |
3337 | |
3338 | // This is a bottom layer of the search functions. |
3339 | static int |
3340 | ft_search_basement_node( |
3341 | BASEMENTNODE bn, |
3342 | ft_search *search, |
3343 | FT_GET_CALLBACK_FUNCTION getf, |
3344 | void *getf_v, |
3345 | bool *doprefetch, |
3346 | FT_CURSOR ftcursor, |
3347 | bool can_bulk_fetch |
3348 | ) |
3349 | { |
3350 | // Now we have to convert from ft_search to the heaviside function with a direction. What a pain... |
3351 | |
3352 | int direction; |
3353 | switch (search->direction) { |
3354 | case FT_SEARCH_LEFT: direction = +1; goto ok; |
3355 | case FT_SEARCH_RIGHT: direction = -1; goto ok; |
3356 | } |
3357 | return EINVAL; // This return and the goto are a hack to get both compile-time and run-time checking on enum |
3358 | ok: ; |
3359 | uint32_t idx = 0; |
3360 | LEAFENTRY le; |
3361 | uint32_t keylen; |
3362 | void *key; |
3363 | int r = bn->data_buffer.find<decltype(*search), heaviside_from_search_t>( |
3364 | *search, |
3365 | direction, |
3366 | &le, |
3367 | &key, |
3368 | &keylen, |
3369 | &idx |
3370 | ); |
3371 | if (r!=0) return r; |
3372 | |
3373 | if (toku_ft_cursor_is_leaf_mode(ftcursor)) |
3374 | goto got_a_good_value; // leaf mode cursors see all leaf entries |
3375 | if (le_val_is_del(le, ftcursor->read_type, ftcursor->ttxn)) { |
3376 | // Provisionally deleted stuff is gone. |
3377 | // So we need to scan in the direction to see if we can find something. |
3378 | // Every 64 deleted leaf entries check if the leaf's key is within the search bounds. |
3379 | for (uint64_t n_deleted = 1; ; n_deleted++) { |
3380 | switch (search->direction) { |
3381 | case FT_SEARCH_LEFT: |
3382 | idx++; |
3383 | if (idx >= bn->data_buffer.num_klpairs() || ((n_deleted % 64) == 0 && !search_continue(search, key, keylen))) { |
3384 | FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted); |
3385 | if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) { |
3386 | return TOKUDB_INTERRUPTED; |
3387 | } |
3388 | return DB_NOTFOUND; |
3389 | } |
3390 | break; |
3391 | case FT_SEARCH_RIGHT: |
3392 | if (idx == 0) { |
3393 | FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted); |
3394 | if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) { |
3395 | return TOKUDB_INTERRUPTED; |
3396 | } |
3397 | return DB_NOTFOUND; |
3398 | } |
3399 | idx--; |
3400 | break; |
3401 | default: |
3402 | abort(); |
3403 | } |
3404 | r = bn->data_buffer.fetch_klpair(idx, &le, &keylen, &key); |
3405 | assert_zero(r); // we just validated the index |
3406 | if (!le_val_is_del(le, ftcursor->read_type, ftcursor->ttxn)) { |
3407 | FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted); |
3408 | if (ftcursor->interrupt_cb) |
3409 | ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted); |
3410 | goto got_a_good_value; |
3411 | } |
3412 | } |
3413 | } |
3414 | got_a_good_value: |
3415 | { |
3416 | uint32_t vallen; |
3417 | void *val; |
3418 | |
3419 | le_extract_val(le, toku_ft_cursor_is_leaf_mode(ftcursor), |
3420 | ftcursor->read_type, ftcursor->ttxn, |
3421 | &vallen, &val); |
3422 | r = toku_ft_cursor_check_restricted_range(ftcursor, key, keylen); |
3423 | if (r == 0) { |
3424 | r = getf(keylen, key, vallen, val, getf_v, false); |
3425 | } |
3426 | if (r == 0 || r == TOKUDB_CURSOR_CONTINUE) { |
3427 | // |
3428 | // IMPORTANT: bulk fetch CANNOT go past the current basement node, |
3429 | // because there is no guarantee that messages have been applied |
3430 | // to other basement nodes, as part of #5770 |
3431 | // |
3432 | if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) { |
3433 | r = toku_ft_cursor_shortcut(ftcursor, direction, idx, &bn->data_buffer, |
3434 | getf, getf_v, &keylen, &key, &vallen, &val); |
3435 | } |
3436 | |
3437 | toku_destroy_dbt(&ftcursor->key); |
3438 | toku_destroy_dbt(&ftcursor->val); |
3439 | if (!ftcursor->is_temporary) { |
3440 | toku_memdup_dbt(&ftcursor->key, key, keylen); |
3441 | toku_memdup_dbt(&ftcursor->val, val, vallen); |
3442 | } |
3443 | // The search was successful. Prefetching can continue. |
3444 | *doprefetch = true; |
3445 | } |
3446 | } |
3447 | if (r == TOKUDB_CURSOR_CONTINUE) r = 0; |
3448 | return r; |
3449 | } |
3450 | |
3451 | static int |
3452 | ft_search_node ( |
3453 | FT_HANDLE ft_handle, |
3454 | FTNODE node, |
3455 | ft_search *search, |
3456 | int child_to_search, |
3457 | FT_GET_CALLBACK_FUNCTION getf, |
3458 | void *getf_v, |
3459 | bool *doprefetch, |
3460 | FT_CURSOR ftcursor, |
3461 | UNLOCKERS unlockers, |
3462 | ANCESTORS, |
3463 | const pivot_bounds &bounds, |
3464 | bool can_bulk_fetch |
3465 | ); |
3466 | |
3467 | static int |
3468 | ftnode_fetch_callback_and_free_bfe(CACHEFILE cf, PAIR p, int fd, BLOCKNUM blocknum, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int *dirtyp, void *) |
3469 | { |
3470 | int r = toku_ftnode_fetch_callback(cf, p, fd, blocknum, fullhash, ftnode_pv, disk_data, sizep, dirtyp, extraargs); |
3471 | ftnode_fetch_extra *CAST_FROM_VOIDP(bfe, extraargs); |
3472 | bfe->destroy(); |
3473 | toku_free(bfe); |
3474 | return r; |
3475 | } |
3476 | |
3477 | static int |
3478 | ftnode_pf_callback_and_free_bfe(void *ftnode_pv, void* disk_data, void *, int fd, PAIR_ATTR *sizep) |
3479 | { |
3480 | int r = toku_ftnode_pf_callback(ftnode_pv, disk_data, read_extraargs, fd, sizep); |
3481 | ftnode_fetch_extra *CAST_FROM_VOIDP(bfe, read_extraargs); |
3482 | bfe->destroy(); |
3483 | toku_free(bfe); |
3484 | return r; |
3485 | } |
3486 | |
3487 | CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT ft) { |
3488 | CACHETABLE_WRITE_CALLBACK wc; |
3489 | wc.flush_callback = toku_ftnode_flush_callback; |
3490 | wc.pe_est_callback = toku_ftnode_pe_est_callback; |
3491 | wc.pe_callback = toku_ftnode_pe_callback; |
3492 | wc.cleaner_callback = toku_ftnode_cleaner_callback; |
3493 | wc.clone_callback = toku_ftnode_clone_callback; |
3494 | wc.checkpoint_complete_callback = toku_ftnode_checkpoint_complete_callback; |
3495 | wc.write_extraargs = ft; |
3496 | return wc; |
3497 | } |
3498 | |
3499 | static void |
3500 | ft_node_maybe_prefetch(FT_HANDLE ft_handle, FTNODE node, int childnum, FT_CURSOR ftcursor, bool *doprefetch) { |
3501 | // the number of nodes to prefetch |
3502 | const int num_nodes_to_prefetch = 1; |
3503 | |
3504 | // if we want to prefetch in the tree |
3505 | // then prefetch the next children if there are any |
3506 | if (*doprefetch && toku_ft_cursor_prefetching(ftcursor) && !ftcursor->disable_prefetching) { |
3507 | int rc = ft_cursor_rightmost_child_wanted(ftcursor, ft_handle, node); |
3508 | for (int i = childnum + 1; (i <= childnum + num_nodes_to_prefetch) && (i <= rc); i++) { |
3509 | BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, i); |
3510 | uint32_t nextfullhash = compute_child_fullhash(ft_handle->ft->cf, node, i); |
3511 | ftnode_fetch_extra *XCALLOC(bfe); |
3512 | bfe->create_for_prefetch(ft_handle->ft, ftcursor); |
3513 | bool doing_prefetch = false; |
3514 | toku_cachefile_prefetch( |
3515 | ft_handle->ft->cf, |
3516 | nextchildblocknum, |
3517 | nextfullhash, |
3518 | get_write_callbacks_for_node(ft_handle->ft), |
3519 | ftnode_fetch_callback_and_free_bfe, |
3520 | toku_ftnode_pf_req_callback, |
3521 | ftnode_pf_callback_and_free_bfe, |
3522 | bfe, |
3523 | &doing_prefetch |
3524 | ); |
3525 | if (!doing_prefetch) { |
3526 | bfe->destroy(); |
3527 | toku_free(bfe); |
3528 | } |
3529 | *doprefetch = false; |
3530 | } |
3531 | } |
3532 | } |
3533 | |
3534 | struct { |
3535 | FT_HANDLE ft_handle; |
3536 | FTNODE ; |
3537 | bool ; |
3538 | }; |
3539 | |
3540 | // When this is called, the cachetable lock is held |
3541 | static void |
3542 | unlock_ftnode_fun (void *v) { |
3543 | struct unlock_ftnode_extra *x = NULL; |
3544 | CAST_FROM_VOIDP(x, v); |
3545 | FT_HANDLE ft_handle = x->ft_handle; |
3546 | FTNODE node = x->node; |
3547 | // CT lock is held |
3548 | int r = toku_cachetable_unpin_ct_prelocked_no_flush( |
3549 | ft_handle->ft->cf, |
3550 | node->ct_pair, |
3551 | (enum cachetable_dirty) node->dirty, |
3552 | x->msgs_applied ? make_ftnode_pair_attr(node) : make_invalid_pair_attr() |
3553 | ); |
3554 | assert_zero(r); |
3555 | } |
3556 | |
3557 | /* search in a node's child */ |
3558 | static int |
3559 | ft_search_child(FT_HANDLE ft_handle, FTNODE node, int childnum, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, bool *doprefetch, FT_CURSOR ftcursor, UNLOCKERS unlockers, |
3560 | ANCESTORS ancestors, const pivot_bounds &bounds, bool can_bulk_fetch) |
3561 | // Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned). |
3562 | { |
3563 | struct ancestors next_ancestors = {node, childnum, ancestors}; |
3564 | |
3565 | BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum); |
3566 | uint32_t fullhash = compute_child_fullhash(ft_handle->ft->cf, node, childnum); |
3567 | FTNODE childnode = nullptr; |
3568 | |
3569 | // If the current node's height is greater than 1, then its child is an internal node. |
3570 | // Therefore, to warm the cache better (#5798), we want to read all the partitions off disk in one shot. |
3571 | bool read_all_partitions = node->height > 1; |
3572 | ftnode_fetch_extra bfe; |
3573 | bfe.create_for_subset_read( |
3574 | ft_handle->ft, |
3575 | search, |
3576 | &ftcursor->range_lock_left_key, |
3577 | &ftcursor->range_lock_right_key, |
3578 | ftcursor->left_is_neg_infty, |
3579 | ftcursor->right_is_pos_infty, |
3580 | ftcursor->disable_prefetching, |
3581 | read_all_partitions |
3582 | ); |
3583 | bool msgs_applied = false; |
3584 | { |
3585 | int rr = toku_pin_ftnode_for_query(ft_handle, childblocknum, fullhash, |
3586 | unlockers, |
3587 | &next_ancestors, bounds, |
3588 | &bfe, |
3589 | true, |
3590 | &childnode, |
3591 | &msgs_applied); |
3592 | if (rr==TOKUDB_TRY_AGAIN) { |
3593 | return rr; |
3594 | } |
3595 | invariant_zero(rr); |
3596 | } |
3597 | |
3598 | struct unlock_ftnode_extra = { ft_handle, childnode, msgs_applied }; |
3599 | struct unlockers next_unlockers = { true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers }; |
3600 | int r = ft_search_node(ft_handle, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, ftcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch); |
3601 | if (r!=TOKUDB_TRY_AGAIN) { |
3602 | // maybe prefetch the next child |
3603 | if (r == 0 && node->height == 1) { |
3604 | ft_node_maybe_prefetch(ft_handle, node, childnum, ftcursor, doprefetch); |
3605 | } |
3606 | |
3607 | assert(next_unlockers.locked); |
3608 | if (msgs_applied) { |
3609 | toku_unpin_ftnode(ft_handle->ft, childnode); |
3610 | } |
3611 | else { |
3612 | toku_unpin_ftnode_read_only(ft_handle->ft, childnode); |
3613 | } |
3614 | } else { |
3615 | // try again. |
3616 | |
3617 | // there are two cases where we get TOKUDB_TRY_AGAIN |
3618 | // case 1 is when some later call to toku_pin_ftnode returned |
3619 | // that value and unpinned all the nodes anyway. case 2 |
3620 | // is when ft_search_node had to stop its search because |
3621 | // some piece of a node that it needed was not in memory. In this case, |
3622 | // the node was not unpinned, so we unpin it here |
3623 | if (next_unlockers.locked) { |
3624 | if (msgs_applied) { |
3625 | toku_unpin_ftnode(ft_handle->ft, childnode); |
3626 | } |
3627 | else { |
3628 | toku_unpin_ftnode_read_only(ft_handle->ft, childnode); |
3629 | } |
3630 | } |
3631 | } |
3632 | |
3633 | return r; |
3634 | } |
3635 | |
3636 | static inline int |
3637 | search_which_child_cmp_with_bound(const toku::comparator &cmp, FTNODE node, int childnum, |
3638 | ft_search *search, DBT *dbt) { |
3639 | return cmp(toku_copyref_dbt(dbt, node->pivotkeys.get_pivot(childnum)), &search->pivot_bound); |
3640 | } |
3641 | |
3642 | int |
3643 | toku_ft_search_which_child(const toku::comparator &cmp, FTNODE node, ft_search *search) { |
3644 | if (node->n_children <= 1) return 0; |
3645 | |
3646 | DBT pivotkey; |
3647 | toku_init_dbt(&pivotkey); |
3648 | int lo = 0; |
3649 | int hi = node->n_children - 1; |
3650 | int mi; |
3651 | while (lo < hi) { |
3652 | mi = (lo + hi) / 2; |
3653 | node->pivotkeys.fill_pivot(mi, &pivotkey); |
3654 | // search->compare is really strange, and only works well with a |
3655 | // linear search, it makes binary search a pita. |
3656 | // |
3657 | // if you are searching left to right, it returns |
3658 | // "0" for pivots that are < the target, and |
3659 | // "1" for pivots that are >= the target |
3660 | // if you are searching right to left, it's the opposite. |
3661 | // |
3662 | // so if we're searching from the left and search->compare says |
3663 | // "1", we want to go left from here, if it says "0" we want to go |
3664 | // right. searching from the right does the opposite. |
3665 | bool c = search->compare(*search, &pivotkey); |
3666 | if (((search->direction == FT_SEARCH_LEFT) && c) || |
3667 | ((search->direction == FT_SEARCH_RIGHT) && !c)) { |
3668 | hi = mi; |
3669 | } else { |
3670 | assert(((search->direction == FT_SEARCH_LEFT) && !c) || |
3671 | ((search->direction == FT_SEARCH_RIGHT) && c)); |
3672 | lo = mi + 1; |
3673 | } |
3674 | } |
3675 | // ready to return something, if the pivot is bounded, we have to move |
3676 | // over a bit to get away from what we've already searched |
3677 | if (search->pivot_bound.data != nullptr) { |
3678 | if (search->direction == FT_SEARCH_LEFT) { |
3679 | while (lo < node->n_children - 1 && |
3680 | search_which_child_cmp_with_bound(cmp, node, lo, search, &pivotkey) <= 0) { |
3681 | // searching left to right, if the comparison says the |
3682 | // current pivot (lo) is left of or equal to our bound, |
3683 | // don't search that child again |
3684 | lo++; |
3685 | } |
3686 | } else { |
3687 | while (lo > 0 && |
3688 | search_which_child_cmp_with_bound(cmp, node, lo - 1, search, &pivotkey) >= 0) { |
3689 | // searching right to left, same argument as just above |
3690 | // (but we had to pass lo - 1 because the pivot between lo |
3691 | // and the thing just less than it is at that position in |
3692 | // the pivot keys array) |
3693 | lo--; |
3694 | } |
3695 | } |
3696 | } |
3697 | return lo; |
3698 | } |
3699 | |
3700 | static void |
3701 | maybe_search_save_bound( |
3702 | FTNODE node, |
3703 | int child_searched, |
3704 | ft_search *search) |
3705 | { |
3706 | int p = (search->direction == FT_SEARCH_LEFT) ? child_searched : child_searched - 1; |
3707 | if (p >= 0 && p < node->n_children-1) { |
3708 | toku_destroy_dbt(&search->pivot_bound); |
3709 | toku_clone_dbt(&search->pivot_bound, node->pivotkeys.get_pivot(p)); |
3710 | } |
3711 | } |
3712 | |
3713 | // Returns true if there are still children left to search in this node within the search bound (if any). |
3714 | static bool search_try_again(FTNODE node, int child_to_search, ft_search *search) { |
3715 | bool try_again = false; |
3716 | if (search->direction == FT_SEARCH_LEFT) { |
3717 | if (child_to_search < node->n_children-1) { |
3718 | try_again = true; |
3719 | // if there is a search bound and the bound is within the search pivot then continue the search |
3720 | if (search->k_bound) { |
3721 | FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context); |
3722 | try_again = (ft_handle->ft->cmp(search->k_bound, &search->pivot_bound) > 0); |
3723 | } |
3724 | } |
3725 | } else if (search->direction == FT_SEARCH_RIGHT) { |
3726 | if (child_to_search > 0) |
3727 | try_again = true; |
3728 | } |
3729 | return try_again; |
3730 | } |
3731 | |
3732 | static int |
3733 | ft_search_node( |
3734 | FT_HANDLE ft_handle, |
3735 | FTNODE node, |
3736 | ft_search *search, |
3737 | int child_to_search, |
3738 | FT_GET_CALLBACK_FUNCTION getf, |
3739 | void *getf_v, |
3740 | bool *doprefetch, |
3741 | FT_CURSOR ftcursor, |
3742 | UNLOCKERS unlockers, |
3743 | ANCESTORS ancestors, |
3744 | const pivot_bounds &bounds, |
3745 | bool can_bulk_fetch |
3746 | ) |
3747 | { |
3748 | int r = 0; |
3749 | // assert that we got a valid child_to_search |
3750 | invariant(child_to_search >= 0); |
3751 | invariant(child_to_search < node->n_children); |
3752 | // |
3753 | // At this point, we must have the necessary partition available to continue the search |
3754 | // |
3755 | assert(BP_STATE(node,child_to_search) == PT_AVAIL); |
3756 | const pivot_bounds next_bounds = bounds.next_bounds(node, child_to_search); |
3757 | if (node->height > 0) { |
3758 | r = ft_search_child( |
3759 | ft_handle, |
3760 | node, |
3761 | child_to_search, |
3762 | search, |
3763 | getf, |
3764 | getf_v, |
3765 | doprefetch, |
3766 | ftcursor, |
3767 | unlockers, |
3768 | ancestors, |
3769 | next_bounds, |
3770 | can_bulk_fetch |
3771 | ); |
3772 | } |
3773 | else { |
3774 | r = ft_search_basement_node( |
3775 | BLB(node, child_to_search), |
3776 | search, |
3777 | getf, |
3778 | getf_v, |
3779 | doprefetch, |
3780 | ftcursor, |
3781 | can_bulk_fetch |
3782 | ); |
3783 | } |
3784 | if (r == 0) { |
3785 | return r; //Success |
3786 | } |
3787 | |
3788 | if (r != DB_NOTFOUND) { |
3789 | return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN) |
3790 | } |
3791 | // not really necessary, just put this here so that reading the |
3792 | // code becomes simpler. The point is at this point in the code, |
3793 | // we know that we got DB_NOTFOUND and we have to continue |
3794 | assert(r == DB_NOTFOUND); |
3795 | // we have a new pivotkey |
3796 | if (node->height == 0) { |
3797 | // when we run off the end of a basement, try to lock the range up to the pivot. solves #3529 |
3798 | const DBT *pivot = search->direction == FT_SEARCH_LEFT ? next_bounds.ubi() : // left -> right |
3799 | next_bounds.lbe(); // right -> left |
3800 | if (pivot != nullptr) { |
3801 | int rr = getf(pivot->size, pivot->data, 0, nullptr, getf_v, true); |
3802 | if (rr != 0) { |
3803 | return rr; // lock was not granted |
3804 | } |
3805 | } |
3806 | } |
3807 | |
3808 | // If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible. |
3809 | // This way of doing DB_NOTFOUND is a kludge, and ought to be simplified. Something like this is needed for DB_NEXT, but |
3810 | // for point queries, it's overkill. If we got a DB_NOTFOUND on a point query then we should just stop looking. |
3811 | // When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress. |
3812 | // If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left). |
3813 | // So save the pivot key in the search object. |
3814 | maybe_search_save_bound(node, child_to_search, search); |
3815 | |
3816 | // as part of #5770, if we can continue searching, |
3817 | // we MUST return TOKUDB_TRY_AGAIN, |
3818 | // because there is no guarantee that messages have been applied |
3819 | // on any other path. |
3820 | if (search_try_again(node, child_to_search, search)) { |
3821 | r = TOKUDB_TRY_AGAIN; |
3822 | } |
3823 | |
3824 | return r; |
3825 | } |
3826 | |
3827 | int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, FT_CURSOR ftcursor, bool can_bulk_fetch) |
3828 | // Effect: Perform a search. Associate cursor with a leaf if possible. |
3829 | // All searches are performed through this function. |
3830 | { |
3831 | int r; |
3832 | uint trycount = 0; // How many tries did it take to get the result? |
3833 | FT ft = ft_handle->ft; |
3834 | |
3835 | toku::context search_ctx(CTX_SEARCH); |
3836 | |
3837 | try_again: |
3838 | |
3839 | trycount++; |
3840 | |
3841 | // |
3842 | // Here is how searches work |
3843 | // At a high level, we descend down the tree, using the search parameter |
3844 | // to guide us towards where to look. But the search parameter is not |
3845 | // used here to determine which child of a node to read (regardless |
3846 | // of whether that child is another node or a basement node) |
3847 | // The search parameter is used while we are pinning the node into |
3848 | // memory, because that is when the system needs to ensure that |
3849 | // the appropriate partition of the child we are using is in memory. |
3850 | // So, here are the steps for a search (and this applies to this function |
3851 | // as well as ft_search_child: |
3852 | // - Take the search parameter, and create a ftnode_fetch_extra, that will be used by toku_pin_ftnode |
3853 | // - Call toku_pin_ftnode with the bfe as the extra for the fetch callback (in case the node is not at all in memory) |
3854 | // and the partial fetch callback (in case the node is perhaps partially in memory) to the fetch the node |
3855 | // - This eventually calls either toku_ftnode_fetch_callback or toku_ftnode_pf_req_callback depending on whether the node is in |
3856 | // memory at all or not. |
3857 | // - Within these functions, the "ft_search search" parameter is used to evaluate which child the search is interested in. |
3858 | // If the node is not in memory at all, toku_ftnode_fetch_callback will read the node and decompress only the partition for the |
3859 | // relevant child, be it a message buffer or basement node. If the node is in memory, then toku_ftnode_pf_req_callback |
3860 | // will tell the cachetable that a partial fetch is required if and only if the relevant child is not in memory. If the relevant child |
3861 | // is not in memory, then toku_ftnode_pf_callback is called to fetch the partition. |
3862 | // - These functions set bfe->child_to_read so that the search code does not need to reevaluate it. |
3863 | // - Just to reiterate, all of the last item happens within toku_ftnode_pin(_holding_lock) |
3864 | // - At this point, toku_ftnode_pin_holding_lock has returned, with bfe.child_to_read set, |
3865 | // - ft_search_node is called, assuming that the node and its relevant partition are in memory. |
3866 | // |
3867 | ftnode_fetch_extra bfe; |
3868 | bfe.create_for_subset_read( |
3869 | ft, |
3870 | search, |
3871 | &ftcursor->range_lock_left_key, |
3872 | &ftcursor->range_lock_right_key, |
3873 | ftcursor->left_is_neg_infty, |
3874 | ftcursor->right_is_pos_infty, |
3875 | ftcursor->disable_prefetching, |
3876 | true // We may as well always read the whole root into memory, if it's a leaf node it's a tiny tree anyway. |
3877 | ); |
3878 | FTNODE node = NULL; |
3879 | { |
3880 | uint32_t fullhash; |
3881 | CACHEKEY root_key; |
3882 | toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); |
3883 | toku_pin_ftnode( |
3884 | ft, |
3885 | root_key, |
3886 | fullhash, |
3887 | &bfe, |
3888 | PL_READ, // may_modify_node set to false, because root cannot change during search |
3889 | &node, |
3890 | true |
3891 | ); |
3892 | } |
3893 | |
3894 | uint tree_height = node->height + 1; // How high is the tree? This is the height of the root node plus one (leaf is at height 0). |
3895 | |
3896 | |
3897 | struct unlock_ftnode_extra = {ft_handle,node,false}; |
3898 | struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL}; |
3899 | |
3900 | { |
3901 | bool doprefetch = false; |
3902 | //static int counter = 0; counter++; |
3903 | r = ft_search_node(ft_handle, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, ftcursor, &unlockers, (ANCESTORS)NULL, pivot_bounds::infinite_bounds(), can_bulk_fetch); |
3904 | if (r==TOKUDB_TRY_AGAIN) { |
3905 | // there are two cases where we get TOKUDB_TRY_AGAIN |
3906 | // case 1 is when some later call to toku_pin_ftnode returned |
3907 | // that value and unpinned all the nodes anyway. case 2 |
3908 | // is when ft_search_node had to stop its search because |
3909 | // some piece of a node that it needed was not in memory. |
3910 | // In this case, the node was not unpinned, so we unpin it here |
3911 | if (unlockers.locked) { |
3912 | toku_unpin_ftnode_read_only(ft_handle->ft, node); |
3913 | } |
3914 | goto try_again; |
3915 | } else { |
3916 | assert(unlockers.locked); |
3917 | } |
3918 | } |
3919 | |
3920 | assert(unlockers.locked); |
3921 | toku_unpin_ftnode_read_only(ft_handle->ft, node); |
3922 | |
3923 | |
3924 | //Heaviside function (+direction) queries define only a lower or upper |
3925 | //bound. Some queries require both an upper and lower bound. |
3926 | //They do this by wrapping the FT_GET_CALLBACK_FUNCTION with another |
3927 | //test that checks for the other bound. If the other bound fails, |
3928 | //it returns TOKUDB_FOUND_BUT_REJECTED which means not found, but |
3929 | //stop searching immediately, as opposed to DB_NOTFOUND |
3930 | //which can mean not found, but keep looking in another leaf. |
3931 | if (r==TOKUDB_FOUND_BUT_REJECTED) r = DB_NOTFOUND; |
3932 | else if (r==DB_NOTFOUND) { |
3933 | //We truly did not find an answer to the query. |
3934 | //Therefore, the FT_GET_CALLBACK_FUNCTION has NOT been called. |
3935 | //The contract specifies that the callback function must be called |
3936 | //for 'r= (0|DB_NOTFOUND|TOKUDB_FOUND_BUT_REJECTED)' |
3937 | //TODO: #1378 This is not the ultimate location of this call to the |
3938 | //callback. It is surely wrong for node-level locking, and probably |
3939 | //wrong for the STRADDLE callback for heaviside function(two sets of key/vals) |
3940 | int r2 = getf(0,NULL, 0,NULL, getf_v, false); |
3941 | if (r2!=0) r = r2; |
3942 | } |
3943 | { // accounting (to detect and measure thrashing) |
3944 | uint retrycount = trycount - 1; // how many retries were needed? |
3945 | if (retrycount) { |
3946 | FT_STATUS_INC(FT_TOTAL_RETRIES, retrycount); |
3947 | } |
3948 | if (retrycount > tree_height) { // if at least one node was read from disk more than once |
3949 | FT_STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHT, 1); |
3950 | if (retrycount > (tree_height+3)) |
3951 | FT_STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHTPLUS3, 1); |
3952 | } |
3953 | } |
3954 | return r; |
3955 | } |
3956 | |
3957 | /* ********************************* delete **************************************/ |
3958 | static int |
3959 | getf_nothing (uint32_t UU(keylen), const void *UU(key), uint32_t UU(vallen), const void *UU(val), void *UU(pair_v), bool UU(lock_only)) { |
3960 | return 0; |
3961 | } |
3962 | |
3963 | int toku_ft_cursor_delete(FT_CURSOR cursor, int flags, TOKUTXN txn) { |
3964 | int r; |
3965 | |
3966 | int unchecked_flags = flags; |
3967 | bool error_if_missing = (bool) !(flags&DB_DELETE_ANY); |
3968 | unchecked_flags &= ~DB_DELETE_ANY; |
3969 | if (unchecked_flags!=0) r = EINVAL; |
3970 | else if (toku_ft_cursor_not_set(cursor)) r = EINVAL; |
3971 | else { |
3972 | r = 0; |
3973 | if (error_if_missing) { |
3974 | r = toku_ft_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL); |
3975 | } |
3976 | if (r == 0) { |
3977 | toku_ft_delete(cursor->ft_handle, &cursor->key, txn); |
3978 | } |
3979 | } |
3980 | return r; |
3981 | } |
3982 | |
3983 | /* ********************* keyrange ************************ */ |
3984 | |
3985 | struct keyrange_compare_s { |
3986 | FT ft; |
3987 | const DBT *key; |
3988 | }; |
3989 | |
3990 | // TODO: Remove me, I'm boring |
3991 | static int keyrange_compare(DBT const &kdbt, |
3992 | const struct keyrange_compare_s &s) { |
3993 | return s.ft->cmp(&kdbt, s.key); |
3994 | } |
3995 | |
3996 | static void keysrange_in_leaf_partition(FT_HANDLE ft_handle, |
3997 | FTNODE node, |
3998 | DBT *key_left, |
3999 | DBT *key_right, |
4000 | int left_child_number, |
4001 | int right_child_number, |
4002 | uint64_t estimated_num_rows, |
4003 | uint64_t *less, |
4004 | uint64_t *equal_left, |
4005 | uint64_t *middle, |
4006 | uint64_t *equal_right, |
4007 | uint64_t *greater, |
4008 | bool *single_basement_node) |
4009 | // If the partition is in main memory then estimate the number |
4010 | // Treat key_left == NULL as negative infinity |
4011 | // Treat key_right == NULL as positive infinity |
4012 | { |
4013 | paranoid_invariant(node->height == 0); // we are in a leaf |
4014 | paranoid_invariant(!(key_left == NULL && key_right != NULL)); |
4015 | paranoid_invariant(left_child_number <= right_child_number); |
4016 | bool single_basement = left_child_number == right_child_number; |
4017 | paranoid_invariant(!single_basement || |
4018 | (BP_STATE(node, left_child_number) == PT_AVAIL)); |
4019 | if (BP_STATE(node, left_child_number) == PT_AVAIL) { |
4020 | int r; |
4021 | // The partition is in main memory then get an exact count. |
4022 | struct keyrange_compare_s s_left = {ft_handle->ft, key_left}; |
4023 | BASEMENTNODE bn = BLB(node, left_child_number); |
4024 | uint32_t idx_left = 0; |
4025 | // if key_left is NULL then set r==-1 and idx==0. |
4026 | r = key_left |
4027 | ? bn->data_buffer.find_zero<decltype(s_left), keyrange_compare>( |
4028 | s_left, nullptr, nullptr, nullptr, &idx_left) |
4029 | : -1; |
4030 | *less = idx_left; |
4031 | *equal_left = (r == 0) ? 1 : 0; |
4032 | |
4033 | uint32_t size = bn->data_buffer.num_klpairs(); |
4034 | uint32_t idx_right = size; |
4035 | r = -1; |
4036 | if (single_basement && key_right) { |
4037 | struct keyrange_compare_s s_right = {ft_handle->ft, key_right}; |
4038 | r = bn->data_buffer.find_zero<decltype(s_right), keyrange_compare>( |
4039 | s_right, nullptr, nullptr, nullptr, &idx_right); |
4040 | } |
4041 | *middle = idx_right - idx_left - *equal_left; |
4042 | *equal_right = (r == 0) ? 1 : 0; |
4043 | *greater = size - idx_right - *equal_right; |
4044 | } else { |
4045 | paranoid_invariant(!single_basement); |
4046 | uint32_t idx_left = estimated_num_rows / 2; |
4047 | if (!key_left) { |
4048 | // Both nullptr, assume key_left belongs before leftmost entry, |
4049 | // key_right belongs after rightmost entry |
4050 | idx_left = 0; |
4051 | paranoid_invariant(!key_right); |
4052 | } |
4053 | // Assume idx_left and idx_right point to where key_left and key_right |
4054 | // belong, (but are not there). |
4055 | *less = idx_left; |
4056 | *equal_left = 0; |
4057 | *middle = estimated_num_rows - idx_left; |
4058 | *equal_right = 0; |
4059 | *greater = 0; |
4060 | } |
4061 | *single_basement_node = single_basement; |
4062 | } |
4063 | |
4064 | static int toku_ft_keysrange_internal( |
4065 | FT_HANDLE ft_handle, |
4066 | FTNODE node, |
4067 | DBT *key_left, |
4068 | DBT *key_right, |
4069 | bool may_find_right, |
4070 | uint64_t *less, |
4071 | uint64_t *equal_left, |
4072 | uint64_t *middle, |
4073 | uint64_t *equal_right, |
4074 | uint64_t *greater, |
4075 | bool *single_basement_node, |
4076 | uint64_t estimated_num_rows, |
4077 | ftnode_fetch_extra *min_bfe, // set up to read a minimal read. |
4078 | ftnode_fetch_extra |
4079 | *match_bfe, // set up to read a basement node iff both keys in it |
4080 | struct unlockers *unlockers, |
4081 | ANCESTORS ancestors, |
4082 | const pivot_bounds &bounds) |
4083 | // Implementation note: Assign values to less, equal, and greater, and then on |
4084 | // the way out (returning up the stack) we add more values in. |
4085 | { |
4086 | int r = 0; |
4087 | // if KEY is NULL then use the leftmost key. |
4088 | int left_child_number = |
4089 | key_left ? toku_ftnode_which_child(node, key_left, ft_handle->ft->cmp) |
4090 | : 0; |
4091 | int right_child_number = |
4092 | node->n_children; // Sentinel that does not equal left_child_number. |
4093 | if (may_find_right) { |
4094 | right_child_number = |
4095 | key_right |
4096 | ? toku_ftnode_which_child(node, key_right, ft_handle->ft->cmp) |
4097 | : node->n_children - 1; |
4098 | } |
4099 | |
4100 | uint64_t rows_per_child = estimated_num_rows / node->n_children; |
4101 | if (node->height == 0) { |
4102 | keysrange_in_leaf_partition(ft_handle, |
4103 | node, |
4104 | key_left, |
4105 | key_right, |
4106 | left_child_number, |
4107 | right_child_number, |
4108 | rows_per_child, |
4109 | less, |
4110 | equal_left, |
4111 | middle, |
4112 | equal_right, |
4113 | greater, |
4114 | single_basement_node); |
4115 | |
4116 | *less += rows_per_child * left_child_number; |
4117 | if (*single_basement_node) { |
4118 | *greater += |
4119 | rows_per_child * (node->n_children - left_child_number - 1); |
4120 | } else { |
4121 | *middle += |
4122 | rows_per_child * (node->n_children - left_child_number - 1); |
4123 | } |
4124 | } else { |
4125 | // do the child. |
4126 | struct ancestors next_ancestors = {node, left_child_number, ancestors}; |
4127 | BLOCKNUM childblocknum = BP_BLOCKNUM(node, left_child_number); |
4128 | uint32_t fullhash = |
4129 | compute_child_fullhash(ft_handle->ft->cf, node, left_child_number); |
4130 | FTNODE childnode; |
4131 | bool msgs_applied = false; |
4132 | bool child_may_find_right = |
4133 | may_find_right && left_child_number == right_child_number; |
4134 | r = toku_pin_ftnode_for_query( |
4135 | ft_handle, |
4136 | childblocknum, |
4137 | fullhash, |
4138 | unlockers, |
4139 | &next_ancestors, |
4140 | bounds, |
4141 | child_may_find_right ? match_bfe : min_bfe, |
4142 | false, |
4143 | &childnode, |
4144 | &msgs_applied); |
4145 | paranoid_invariant(!msgs_applied); |
4146 | if (r != TOKUDB_TRY_AGAIN) { |
4147 | assert_zero(r); |
4148 | |
4149 | struct unlock_ftnode_extra = { |
4150 | ft_handle, childnode, false}; |
4151 | struct unlockers next_unlockers = { |
4152 | true, unlock_ftnode_fun, (void *)&unlock_extra, unlockers}; |
4153 | const pivot_bounds next_bounds = |
4154 | bounds.next_bounds(node, left_child_number); |
4155 | |
4156 | r = toku_ft_keysrange_internal(ft_handle, |
4157 | childnode, |
4158 | key_left, |
4159 | key_right, |
4160 | child_may_find_right, |
4161 | less, |
4162 | equal_left, |
4163 | middle, |
4164 | equal_right, |
4165 | greater, |
4166 | single_basement_node, |
4167 | rows_per_child, |
4168 | min_bfe, |
4169 | match_bfe, |
4170 | &next_unlockers, |
4171 | &next_ancestors, |
4172 | next_bounds); |
4173 | if (r != TOKUDB_TRY_AGAIN) { |
4174 | assert_zero(r); |
4175 | |
4176 | *less += rows_per_child * left_child_number; |
4177 | if (*single_basement_node) { |
4178 | *greater += rows_per_child * |
4179 | (node->n_children - left_child_number - 1); |
4180 | } else { |
4181 | *middle += rows_per_child * |
4182 | (node->n_children - left_child_number - 1); |
4183 | } |
4184 | |
4185 | assert(unlockers->locked); |
4186 | toku_unpin_ftnode_read_only(ft_handle->ft, childnode); |
4187 | } |
4188 | } |
4189 | } |
4190 | return r; |
4191 | } |
4192 | |
4193 | void toku_ft_keysrange(FT_HANDLE ft_handle, |
4194 | DBT *key_left, |
4195 | DBT *key_right, |
4196 | uint64_t *less_p, |
4197 | uint64_t *equal_left_p, |
4198 | uint64_t *middle_p, |
4199 | uint64_t *equal_right_p, |
4200 | uint64_t *greater_p, |
4201 | bool *middle_3_exact_p) |
4202 | // Effect: Return an estimate of the number of keys to the left, the number |
4203 | // equal (to left key), number between keys, number equal to right key, and the |
4204 | // number to the right of both keys. |
4205 | // The values are an estimate. |
4206 | // If you perform a keyrange on two keys that are in the same basement, |
4207 | // equal_less, middle, and equal_right will be exact. |
4208 | // 4184: What to do with a NULL key? |
4209 | // key_left==NULL is treated as -infinity |
4210 | // key_right==NULL is treated as +infinity |
4211 | // If KEY is NULL then the system picks an arbitrary key and returns it. |
4212 | // key_right can be non-null only if key_left is non-null; |
4213 | { |
4214 | if (!key_left && key_right) { |
4215 | // Simplify internals by only supporting key_right != null when key_left |
4216 | // != null |
4217 | // If key_right != null and key_left == null, then swap them and fix up |
4218 | // numbers. |
4219 | uint64_t less = 0, equal_left = 0, middle = 0, equal_right = 0, |
4220 | greater = 0; |
4221 | toku_ft_keysrange(ft_handle, |
4222 | key_right, |
4223 | nullptr, |
4224 | &less, |
4225 | &equal_left, |
4226 | &middle, |
4227 | &equal_right, |
4228 | &greater, |
4229 | middle_3_exact_p); |
4230 | *less_p = 0; |
4231 | *equal_left_p = 0; |
4232 | *middle_p = less; |
4233 | *equal_right_p = equal_left; |
4234 | *greater_p = middle; |
4235 | invariant_zero(equal_right); |
4236 | invariant_zero(greater); |
4237 | return; |
4238 | } |
4239 | paranoid_invariant(!(!key_left && key_right)); |
4240 | ftnode_fetch_extra min_bfe; |
4241 | ftnode_fetch_extra match_bfe; |
4242 | min_bfe.create_for_min_read( |
4243 | ft_handle->ft); // read pivot keys but not message buffers |
4244 | match_bfe.create_for_keymatch( |
4245 | ft_handle->ft, |
4246 | key_left, |
4247 | key_right, |
4248 | false, |
4249 | false); // read basement node only if both keys in it. |
4250 | try_again : { |
4251 | uint64_t less = 0, equal_left = 0, middle = 0, equal_right = 0, greater = 0; |
4252 | bool single_basement_node = false; |
4253 | FTNODE node = NULL; |
4254 | { |
4255 | uint32_t fullhash; |
4256 | CACHEKEY root_key; |
4257 | toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash); |
4258 | toku_pin_ftnode( |
4259 | ft_handle->ft, |
4260 | root_key, |
4261 | fullhash, |
4262 | &match_bfe, |
4263 | PL_READ, // may_modify_node, cannot change root during keyrange |
4264 | &node, |
4265 | true); |
4266 | } |
4267 | |
4268 | struct unlock_ftnode_extra = {ft_handle, node, false}; |
4269 | struct unlockers unlockers = { |
4270 | true, unlock_ftnode_fun, (void *)&unlock_extra, (UNLOCKERS)NULL}; |
4271 | |
4272 | { |
4273 | int r; |
4274 | int64_t numrows = ft_handle->ft->in_memory_logical_rows; |
4275 | if (numrows < 0) |
4276 | numrows = 0; // prevent appearance of a negative number |
4277 | r = toku_ft_keysrange_internal(ft_handle, |
4278 | node, |
4279 | key_left, |
4280 | key_right, |
4281 | true, |
4282 | &less, |
4283 | &equal_left, |
4284 | &middle, |
4285 | &equal_right, |
4286 | &greater, |
4287 | &single_basement_node, |
4288 | numrows, |
4289 | &min_bfe, |
4290 | &match_bfe, |
4291 | &unlockers, |
4292 | (ANCESTORS)NULL, |
4293 | pivot_bounds::infinite_bounds()); |
4294 | assert(r == 0 || r == TOKUDB_TRY_AGAIN); |
4295 | if (r == TOKUDB_TRY_AGAIN) { |
4296 | assert(!unlockers.locked); |
4297 | goto try_again; |
4298 | } |
4299 | // May need to do a second query. |
4300 | if (!single_basement_node && key_right != nullptr) { |
4301 | // "greater" is stored in "middle" |
4302 | invariant_zero(equal_right); |
4303 | invariant_zero(greater); |
4304 | uint64_t less2 = 0, equal_left2 = 0, middle2 = 0, equal_right2 = 0, |
4305 | greater2 = 0; |
4306 | bool ignore; |
4307 | r = toku_ft_keysrange_internal(ft_handle, |
4308 | node, |
4309 | key_right, |
4310 | nullptr, |
4311 | false, |
4312 | &less2, |
4313 | &equal_left2, |
4314 | &middle2, |
4315 | &equal_right2, |
4316 | &greater2, |
4317 | &ignore, |
4318 | numrows, |
4319 | &min_bfe, |
4320 | &match_bfe, |
4321 | &unlockers, |
4322 | (ANCESTORS) nullptr, |
4323 | pivot_bounds::infinite_bounds()); |
4324 | assert(r == 0 || r == TOKUDB_TRY_AGAIN); |
4325 | if (r == TOKUDB_TRY_AGAIN) { |
4326 | assert(!unlockers.locked); |
4327 | goto try_again; |
4328 | } |
4329 | invariant_zero(equal_right2); |
4330 | invariant_zero(greater2); |
4331 | // Update numbers. |
4332 | // less is already correct. |
4333 | // equal_left is already correct. |
4334 | |
4335 | // "middle" currently holds everything greater than left_key in |
4336 | // first query |
4337 | // 'middle2' currently holds everything greater than right_key in |
4338 | // second query |
4339 | // 'equal_left2' is how many match right_key |
4340 | |
4341 | // Prevent underflow. |
4342 | if (middle >= equal_left2 + middle2) { |
4343 | middle -= equal_left2 + middle2; |
4344 | } else { |
4345 | middle = 0; |
4346 | } |
4347 | equal_right = equal_left2; |
4348 | greater = middle2; |
4349 | } |
4350 | } |
4351 | assert(unlockers.locked); |
4352 | toku_unpin_ftnode_read_only(ft_handle->ft, node); |
4353 | if (!key_right) { |
4354 | paranoid_invariant_zero(equal_right); |
4355 | paranoid_invariant_zero(greater); |
4356 | } |
4357 | if (!key_left) { |
4358 | paranoid_invariant_zero(less); |
4359 | paranoid_invariant_zero(equal_left); |
4360 | } |
4361 | *less_p = less; |
4362 | *equal_left_p = equal_left; |
4363 | *middle_p = middle; |
4364 | *equal_right_p = equal_right; |
4365 | *greater_p = greater; |
4366 | *middle_3_exact_p = single_basement_node; |
4367 | } |
4368 | } |
4369 | |
4370 | struct { |
4371 | uint64_t ; |
4372 | uint64_t *; |
4373 | void (*)(const DBT *, uint64_t, void *); |
4374 | void *; |
4375 | }; |
4376 | |
4377 | static int (const void* key, const uint32_t keylen, const LEAFENTRY & le, const uint32_t UU(idx), struct get_key_after_bytes_iterate_extra * const e) { |
4378 | // only checking the latest val, mvcc will make this inaccurate |
4379 | uint64_t pairlen = keylen + le_latest_vallen(le); |
4380 | if (*e->skipped + pairlen > e->skip_len) { |
4381 | // found our key! |
4382 | DBT end_key; |
4383 | toku_fill_dbt(&end_key, key, keylen); |
4384 | e->callback(&end_key, *e->skipped, e->cb_extra); |
4385 | return 1; |
4386 | } else { |
4387 | *e->skipped += pairlen; |
4388 | return 0; |
4389 | } |
4390 | } |
4391 | |
4392 | static int get_key_after_bytes_in_basementnode(FT ft, BASEMENTNODE bn, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *, uint64_t *skipped) { |
4393 | int r; |
4394 | uint32_t idx_left = 0; |
4395 | if (start_key != nullptr) { |
4396 | struct keyrange_compare_s cmp = {ft, start_key}; |
4397 | r = bn->data_buffer.find_zero<decltype(cmp), keyrange_compare>(cmp, nullptr, nullptr, nullptr, &idx_left); |
4398 | assert(r == 0 || r == DB_NOTFOUND); |
4399 | } |
4400 | struct get_key_after_bytes_iterate_extra = {skip_len, skipped, callback, cb_extra}; |
4401 | r = bn->data_buffer.iterate_on_range<get_key_after_bytes_iterate_extra, get_key_after_bytes_iterate>(idx_left, bn->data_buffer.num_klpairs(), &iter_extra); |
4402 | |
4403 | // Invert the sense of r == 0 (meaning the iterate finished, which means we didn't find what we wanted) |
4404 | if (r == 1) { |
4405 | r = 0; |
4406 | } else { |
4407 | r = DB_NOTFOUND; |
4408 | } |
4409 | return r; |
4410 | } |
4411 | |
4412 | static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *, uint64_t *skipped); |
4413 | |
4414 | static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, int childnum, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *, uint64_t *skipped) { |
4415 | int r; |
4416 | struct ancestors next_ancestors = {node, childnum, ancestors}; |
4417 | BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum); |
4418 | uint32_t fullhash = compute_child_fullhash(ft->cf, node, childnum); |
4419 | FTNODE child; |
4420 | bool msgs_applied = false; |
4421 | r = toku_pin_ftnode_for_query(ft_h, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, false, &child, &msgs_applied); |
4422 | paranoid_invariant(!msgs_applied); |
4423 | if (r == TOKUDB_TRY_AGAIN) { |
4424 | return r; |
4425 | } |
4426 | assert_zero(r); |
4427 | struct unlock_ftnode_extra = {ft_h, child, false}; |
4428 | struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers}; |
4429 | const pivot_bounds next_bounds = bounds.next_bounds(node, childnum); |
4430 | return get_key_after_bytes_in_subtree(ft_h, ft, child, &next_unlockers, &next_ancestors, next_bounds, bfe, search, subtree_bytes, start_key, skip_len, callback, cb_extra, skipped); |
4431 | } |
4432 | |
4433 | static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *, uint64_t *skipped) { |
4434 | int r; |
4435 | int childnum = toku_ft_search_which_child(ft->cmp, node, search); |
4436 | const uint64_t child_subtree_bytes = subtree_bytes / node->n_children; |
4437 | if (node->height == 0) { |
4438 | r = DB_NOTFOUND; |
4439 | for (int i = childnum; r == DB_NOTFOUND && i < node->n_children; ++i) { |
4440 | // The theory here is that a leaf node could only be very |
4441 | // unbalanced if it's dirty, which means all its basements are |
4442 | // available. So if a basement node is available, we should |
4443 | // check it as carefully as possible, but if it's compressed |
4444 | // or on disk, then it should be fairly well balanced so we |
4445 | // can trust the fanout calculation. |
4446 | if (BP_STATE(node, i) == PT_AVAIL) { |
4447 | r = get_key_after_bytes_in_basementnode(ft, BLB(node, i), (i == childnum) ? start_key : nullptr, skip_len, callback, cb_extra, skipped); |
4448 | } else { |
4449 | *skipped += child_subtree_bytes; |
4450 | if (*skipped >= skip_len && i < node->n_children - 1) { |
4451 | DBT pivot; |
4452 | callback(node->pivotkeys.fill_pivot(i, &pivot), *skipped, cb_extra); |
4453 | r = 0; |
4454 | } |
4455 | // Otherwise, r is still DB_NOTFOUND. If this is the last |
4456 | // basement node, we'll return DB_NOTFOUND and that's ok. |
4457 | // Some ancestor in the call stack will check the next |
4458 | // node over and that will call the callback, or if no |
4459 | // such node exists, we're at the max key and we should |
4460 | // return DB_NOTFOUND up to the top. |
4461 | } |
4462 | } |
4463 | } else { |
4464 | r = get_key_after_bytes_in_child(ft_h, ft, node, unlockers, ancestors, bounds, bfe, search, childnum, child_subtree_bytes, start_key, skip_len, callback, cb_extra, skipped); |
4465 | for (int i = childnum + 1; r == DB_NOTFOUND && i < node->n_children; ++i) { |
4466 | if (*skipped + child_subtree_bytes < skip_len) { |
4467 | *skipped += child_subtree_bytes; |
4468 | } else { |
4469 | r = get_key_after_bytes_in_child(ft_h, ft, node, unlockers, ancestors, bounds, bfe, search, i, child_subtree_bytes, nullptr, skip_len, callback, cb_extra, skipped); |
4470 | } |
4471 | } |
4472 | } |
4473 | |
4474 | if (r != TOKUDB_TRY_AGAIN) { |
4475 | assert(unlockers->locked); |
4476 | toku_unpin_ftnode_read_only(ft, node); |
4477 | unlockers->locked = false; |
4478 | } |
4479 | return r; |
4480 | } |
4481 | |
4482 | int toku_ft_get_key_after_bytes(FT_HANDLE ft_h, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *end_key, uint64_t actually_skipped, void *), void *) |
4483 | // Effect: |
4484 | // Call callback with end_key set to the largest key such that the sum of the sizes of the key/val pairs in the range [start_key, end_key) is <= skip_len. |
4485 | // Call callback with actually_skipped set to the sum of the sizes of the key/val pairs in the range [start_key, end_key). |
4486 | // Notes: |
4487 | // start_key == nullptr is interpreted as negative infinity. |
4488 | // end_key == nullptr is interpreted as positive infinity. |
4489 | // Only the latest val is counted toward the size, in the case of MVCC data. |
4490 | // Implementation: |
4491 | // This is an estimated calculation. We assume for a node that each of its subtrees have equal size. If the tree is a single basement node, then we will be accurate, but otherwise we could be quite off. |
4492 | // Returns: |
4493 | // 0 on success |
4494 | // an error code otherwise |
4495 | { |
4496 | FT ft = ft_h->ft; |
4497 | ftnode_fetch_extra bfe; |
4498 | bfe.create_for_min_read(ft); |
4499 | while (true) { |
4500 | FTNODE root; |
4501 | { |
4502 | uint32_t fullhash; |
4503 | CACHEKEY root_key; |
4504 | toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); |
4505 | toku_pin_ftnode(ft, root_key, fullhash, &bfe, PL_READ, &root, true); |
4506 | } |
4507 | struct unlock_ftnode_extra = {ft_h, root, false}; |
4508 | struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS) nullptr}; |
4509 | ft_search search; |
4510 | ft_search_init(&search, (start_key == nullptr ? toku_ft_cursor_compare_one : toku_ft_cursor_compare_set_range), FT_SEARCH_LEFT, start_key, nullptr, ft_h); |
4511 | |
4512 | int r; |
4513 | // We can't do this because of #5768, there may be dictionaries in the wild that have negative stats. This won't affect mongo so it's ok: |
4514 | //paranoid_invariant(ft->in_memory_stats.numbytes >= 0); |
4515 | int64_t numbytes = ft->in_memory_stats.numbytes; |
4516 | if (numbytes < 0) { |
4517 | numbytes = 0; |
4518 | } |
4519 | uint64_t skipped = 0; |
4520 | r = get_key_after_bytes_in_subtree(ft_h, ft, root, &unlockers, nullptr, pivot_bounds::infinite_bounds(), &bfe, &search, (uint64_t) numbytes, start_key, skip_len, callback, cb_extra, &skipped); |
4521 | assert(!unlockers.locked); |
4522 | if (r != TOKUDB_TRY_AGAIN) { |
4523 | if (r == DB_NOTFOUND) { |
4524 | callback(nullptr, skipped, cb_extra); |
4525 | r = 0; |
4526 | } |
4527 | return r; |
4528 | } |
4529 | } |
4530 | } |
4531 | |
4532 | //Test-only wrapper for the old one-key range function |
4533 | void toku_ft_keyrange(FT_HANDLE ft_handle, DBT *key, uint64_t *less, uint64_t *equal, uint64_t *greater) { |
4534 | uint64_t zero_equal_right, zero_greater; |
4535 | bool ignore; |
4536 | toku_ft_keysrange(ft_handle, key, nullptr, less, equal, greater, &zero_equal_right, &zero_greater, &ignore); |
4537 | invariant_zero(zero_equal_right); |
4538 | invariant_zero(zero_greater); |
4539 | } |
4540 | |
4541 | void toku_ft_handle_stat64 (FT_HANDLE ft_handle, TOKUTXN UU(txn), struct ftstat64_s *s) { |
4542 | toku_ft_stat64(ft_handle->ft, s); |
4543 | } |
4544 | |
4545 | void toku_ft_handle_get_fractal_tree_info64(FT_HANDLE ft_h, struct ftinfo64 *s) { |
4546 | toku_ft_get_fractal_tree_info64(ft_h->ft, s); |
4547 | } |
4548 | |
4549 | int toku_ft_handle_iterate_fractal_tree_block_map(FT_HANDLE ft_h, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *) { |
4550 | return toku_ft_iterate_fractal_tree_block_map(ft_h->ft, iter, iter_extra); |
4551 | } |
4552 | |
4553 | /* ********************* debugging dump ************************ */ |
4554 | static int |
4555 | toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth, const DBT *lorange, const DBT *hirange) { |
4556 | int result=0; |
4557 | FTNODE node; |
4558 | toku_get_node_for_verify(blocknum, ft_handle, &node); |
4559 | result=toku_verify_ftnode(ft_handle, ft_handle->ft->h->max_msn_in_ft, ft_handle->ft->h->max_msn_in_ft, false, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0); |
4560 | uint32_t fullhash = toku_cachetable_hash(ft_handle->ft->cf, blocknum); |
4561 | ftnode_fetch_extra bfe; |
4562 | bfe.create_for_full_read(ft_handle->ft); |
4563 | toku_pin_ftnode( |
4564 | ft_handle->ft, |
4565 | blocknum, |
4566 | fullhash, |
4567 | &bfe, |
4568 | PL_WRITE_EXPENSIVE, |
4569 | &node, |
4570 | true |
4571 | ); |
4572 | assert(node->fullhash==fullhash); |
4573 | fprintf(file, "%*sNode=%p\n" , depth, "" , node); |
4574 | |
4575 | fprintf(file, "%*sNode %" PRId64 " height=%d n_children=%d keyrange=%s %s\n" , |
4576 | depth, "" , blocknum.b, node->height, node->n_children, (char*)(lorange ? lorange->data : 0), (char*)(hirange ? hirange->data : 0)); |
4577 | { |
4578 | int i; |
4579 | for (i=0; i+1< node->n_children; i++) { |
4580 | fprintf(file, "%*spivotkey %d =" , depth+1, "" , i); |
4581 | toku_print_BYTESTRING(file, node->pivotkeys.get_pivot(i).size, (char *) node->pivotkeys.get_pivot(i).data); |
4582 | fprintf(file, "\n" ); |
4583 | } |
4584 | for (i=0; i< node->n_children; i++) { |
4585 | if (node->height > 0) { |
4586 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
4587 | fprintf(file, "%*schild %d buffered (%d entries):" , depth+1, "" , i, toku_bnc_n_entries(bnc)); |
4588 | struct print_msg_fn { |
4589 | FILE *file; |
4590 | int depth; |
4591 | print_msg_fn(FILE *f, int d) : file(f), depth(d) { } |
4592 | int operator()(const ft_msg &msg, bool UU(is_fresh)) { |
4593 | fprintf(file, "%*s xid=%" PRIu64 " %u (type=%d) msn=0x%" PRIu64 "\n" , |
4594 | depth+2, "" , |
4595 | toku_xids_get_innermost_xid(msg.xids()), |
4596 | static_cast<unsigned>(toku_dtoh32(*(int*)msg.kdbt()->data)), |
4597 | msg.type(), msg.msn().msn); |
4598 | return 0; |
4599 | } |
4600 | } print_fn(file, depth); |
4601 | bnc->msg_buffer.iterate(print_fn); |
4602 | } |
4603 | else { |
4604 | int size = BLB_DATA(node, i)->num_klpairs(); |
4605 | if (0) |
4606 | for (int j=0; j<size; j++) { |
4607 | LEAFENTRY le; |
4608 | void* keyp = NULL; |
4609 | uint32_t keylen = 0; |
4610 | int r = BLB_DATA(node,i)->fetch_klpair(j, &le, &keylen, &keyp); |
4611 | assert_zero(r); |
4612 | fprintf(file, " [%d]=" , j); |
4613 | print_klpair(file, keyp, keylen, le); |
4614 | fprintf(file, "\n" ); |
4615 | } |
4616 | fprintf(file, "\n" ); |
4617 | } |
4618 | } |
4619 | if (node->height > 0) { |
4620 | for (i=0; i<node->n_children; i++) { |
4621 | fprintf(file, "%*schild %d\n" , depth, "" , i); |
4622 | if (i>0) { |
4623 | char *CAST_FROM_VOIDP(key, node->pivotkeys.get_pivot(i - 1).data); |
4624 | fprintf(file, "%*spivot %d len=%u %u\n" , depth+1, "" , i-1, node->pivotkeys.get_pivot(i - 1).size, (unsigned)toku_dtoh32(*(int*)key)); |
4625 | } |
4626 | DBT x, y; |
4627 | toku_dump_ftnode(file, ft_handle, BP_BLOCKNUM(node, i), depth+4, |
4628 | (i==0) ? lorange : node->pivotkeys.fill_pivot(i - 1, &x), |
4629 | (i==node->n_children-1) ? hirange : node->pivotkeys.fill_pivot(i, &y)); |
4630 | } |
4631 | } |
4632 | } |
4633 | toku_unpin_ftnode(ft_handle->ft, node); |
4634 | return result; |
4635 | } |
4636 | |
4637 | int toku_dump_ft(FILE *f, FT_HANDLE ft_handle) { |
4638 | FT ft = ft_handle->ft; |
4639 | invariant_notnull(ft); |
4640 | ft->blocktable.dump_translation_table(f); |
4641 | |
4642 | uint32_t fullhash = 0; |
4643 | CACHEKEY root_key; |
4644 | toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash); |
4645 | return toku_dump_ftnode(f, ft_handle, root_key, 0, 0, 0); |
4646 | } |
4647 | |
4648 | |
4649 | static void toku_pfs_keys_init(const char *toku_instr_group_name) { |
4650 | kibbutz_mutex_key = new toku_instr_key( |
4651 | toku_instr_object_type::mutex, toku_instr_group_name, "kibbutz_mutex" ); |
4652 | minicron_p_mutex_key = new toku_instr_key( |
4653 | toku_instr_object_type::mutex, toku_instr_group_name, |
4654 | "minicron_p_mutex" ); |
4655 | queue_result_mutex_key = new toku_instr_key( |
4656 | toku_instr_object_type::mutex, toku_instr_group_name, |
4657 | "queue_result_mutex" ); |
4658 | tpool_lock_mutex_key = new toku_instr_key( |
4659 | toku_instr_object_type::mutex, toku_instr_group_name, |
4660 | "tpool_lock_mutex" ); |
4661 | workset_lock_mutex_key = new toku_instr_key( |
4662 | toku_instr_object_type::mutex, toku_instr_group_name, |
4663 | "workset_lock_mutex" ); |
4664 | bjm_jobs_lock_mutex_key = new toku_instr_key( |
4665 | toku_instr_object_type::mutex, toku_instr_group_name, |
4666 | "bjm_jobs_lock_mutex" ); |
4667 | log_internal_lock_mutex_key = new toku_instr_key( |
4668 | toku_instr_object_type::mutex, toku_instr_group_name, |
4669 | "log_internal_lock_mutex" ); |
4670 | cachetable_ev_thread_lock_mutex_key = |
4671 | new toku_instr_key(toku_instr_object_type::mutex, |
4672 | toku_instr_group_name, |
4673 | "cachetable_ev_thread_lock_mutex" ); |
4674 | cachetable_disk_nb_mutex_key = new toku_instr_key( |
4675 | toku_instr_object_type::mutex, toku_instr_group_name, |
4676 | "cachetable_disk_nb_mutex" ); |
4677 | safe_file_size_lock_mutex_key = new toku_instr_key( |
4678 | toku_instr_object_type::mutex, toku_instr_group_name, |
4679 | "safe_file_size_lock_mutex" ); |
4680 | cachetable_m_mutex_key = new toku_instr_key( |
4681 | toku_instr_object_type::mutex, toku_instr_group_name, |
4682 | "cachetable_m_mutex_key" ); |
4683 | checkpoint_safe_mutex_key = new toku_instr_key( |
4684 | toku_instr_object_type::mutex, toku_instr_group_name, |
4685 | "checkpoint_safe_mutex" ); |
4686 | ft_ref_lock_mutex_key = new toku_instr_key( |
4687 | toku_instr_object_type::mutex, toku_instr_group_name, |
4688 | "ft_ref_lock_mutex" ); |
4689 | ft_open_close_lock_mutex_key = new toku_instr_key( |
4690 | toku_instr_object_type::mutex, toku_instr_group_name, |
4691 | "ft_open_close_lock_mutex" ); |
4692 | loader_error_mutex_key = new toku_instr_key( |
4693 | toku_instr_object_type::mutex, toku_instr_group_name, |
4694 | "loader_error_mutex" ); |
4695 | bfs_mutex_key = |
4696 | new toku_instr_key(toku_instr_object_type::mutex, toku_instr_group_name, |
4697 | "bfs_mutex" ); |
4698 | loader_bl_mutex_key = new toku_instr_key( |
4699 | toku_instr_object_type::mutex, toku_instr_group_name, |
4700 | "loader_bl_mutex" ); |
4701 | loader_fi_lock_mutex_key = new toku_instr_key( |
4702 | toku_instr_object_type::mutex, toku_instr_group_name, |
4703 | "loader_fi_lock_mutex" ); |
4704 | loader_out_mutex_key = new toku_instr_key( |
4705 | toku_instr_object_type::mutex, toku_instr_group_name, |
4706 | "loader_out_mutex" ); |
4707 | result_output_condition_lock_mutex_key = |
4708 | new toku_instr_key(toku_instr_object_type::mutex, |
4709 | toku_instr_group_name, |
4710 | "result_output_condition_lock_mutex" ); |
4711 | block_table_mutex_key = new toku_instr_key( |
4712 | toku_instr_object_type::mutex, toku_instr_group_name, |
4713 | "block_table_mutex" ); |
4714 | rollback_log_node_cache_mutex_key = new toku_instr_key( |
4715 | toku_instr_object_type::mutex, toku_instr_group_name, |
4716 | "rollback_log_node_cache_mutex" ); |
4717 | txn_lock_mutex_key = new toku_instr_key( |
4718 | toku_instr_object_type::mutex, toku_instr_group_name, "txn_lock_mutex" ); |
4719 | txn_state_lock_mutex_key = new toku_instr_key( |
4720 | toku_instr_object_type::mutex, toku_instr_group_name, |
4721 | "txn_state_lock_mutex" ); |
4722 | txn_child_manager_mutex_key = new toku_instr_key( |
4723 | toku_instr_object_type::mutex, toku_instr_group_name, |
4724 | "txn_child_manager_mutex" ); |
4725 | txn_manager_lock_mutex_key = new toku_instr_key( |
4726 | toku_instr_object_type::mutex, toku_instr_group_name, |
4727 | "txn_manager_lock_mutex" ); |
4728 | treenode_mutex_key = new toku_instr_key( |
4729 | toku_instr_object_type::mutex, toku_instr_group_name, "treenode_mutex" ); |
4730 | locktree_request_info_mutex_key = new toku_instr_key( |
4731 | toku_instr_object_type::mutex, toku_instr_group_name, |
4732 | "locktree_request_info_mutex" ); |
4733 | locktree_request_info_retry_mutex_key = new toku_instr_key( |
4734 | toku_instr_object_type::mutex, toku_instr_group_name, |
4735 | "locktree_request_info_retry_mutex_key" ); |
4736 | manager_mutex_key = new toku_instr_key( |
4737 | toku_instr_object_type::mutex, toku_instr_group_name, "manager_mutex" ); |
4738 | manager_escalation_mutex_key = new toku_instr_key( |
4739 | toku_instr_object_type::mutex, toku_instr_group_name, |
4740 | "manager_escalation_mutex" ); |
4741 | db_txn_struct_i_txn_mutex_key = new toku_instr_key( |
4742 | toku_instr_object_type::mutex, toku_instr_group_name, |
4743 | "db_txn_struct_i_txn_mutex" ); |
4744 | manager_escalator_mutex_key = new toku_instr_key( |
4745 | toku_instr_object_type::mutex, toku_instr_group_name, |
4746 | "manager_escalator_mutex" ); |
4747 | indexer_i_indexer_lock_mutex_key = new toku_instr_key( |
4748 | toku_instr_object_type::mutex, toku_instr_group_name, |
4749 | "indexer_i_indexer_lock_mutex" ); |
4750 | indexer_i_indexer_estimate_lock_mutex_key = |
4751 | new toku_instr_key(toku_instr_object_type::mutex, |
4752 | toku_instr_group_name, |
4753 | "indexer_i_indexer_estimate_lock_mutex" ); |
4754 | |
4755 | tokudb_file_data_key = new toku_instr_key( |
4756 | toku_instr_object_type::file, toku_instr_group_name, "tokudb_data_file" ); |
4757 | tokudb_file_load_key = new toku_instr_key( |
4758 | toku_instr_object_type::file, toku_instr_group_name, "tokudb_load_file" ); |
4759 | tokudb_file_tmp_key = new toku_instr_key( |
4760 | toku_instr_object_type::file, toku_instr_group_name, "tokudb_tmp_file" ); |
4761 | tokudb_file_log_key = new toku_instr_key( |
4762 | toku_instr_object_type::file, toku_instr_group_name, "tokudb_log_file" ); |
4763 | |
4764 | fti_probe_1_key = |
4765 | new toku_instr_key(toku_instr_object_type::mutex, toku_instr_group_name, |
4766 | "fti_probe_1" ); |
4767 | |
4768 | extractor_thread_key = new toku_instr_key( |
4769 | toku_instr_object_type::thread, toku_instr_group_name, |
4770 | "extractor_thread" ); |
4771 | fractal_thread_key = new toku_instr_key( |
4772 | toku_instr_object_type::thread, toku_instr_group_name, "fractal_thread" ); |
4773 | io_thread_key = |
4774 | new toku_instr_key(toku_instr_object_type::thread, toku_instr_group_name, |
4775 | "io_thread" ); |
4776 | eviction_thread_key = new toku_instr_key( |
4777 | toku_instr_object_type::thread, toku_instr_group_name, |
4778 | "eviction_thread" ); |
4779 | kibbutz_thread_key = new toku_instr_key( |
4780 | toku_instr_object_type::thread, toku_instr_group_name, "kibbutz_thread" ); |
4781 | minicron_thread_key = new toku_instr_key( |
4782 | toku_instr_object_type::thread, toku_instr_group_name, |
4783 | "minicron_thread" ); |
4784 | tp_internal_thread_key = new toku_instr_key( |
4785 | toku_instr_object_type::thread, toku_instr_group_name, |
4786 | "tp_internal_thread" ); |
4787 | |
4788 | result_state_cond_key = new toku_instr_key( |
4789 | toku_instr_object_type::cond, toku_instr_group_name, |
4790 | "result_state_cond" ); |
4791 | bjm_jobs_wait_key = new toku_instr_key( |
4792 | toku_instr_object_type::cond, toku_instr_group_name, "bjm_jobs_wait" ); |
4793 | cachetable_p_refcount_wait_key = new toku_instr_key( |
4794 | toku_instr_object_type::cond, toku_instr_group_name, |
4795 | "cachetable_p_refcount_wait" ); |
4796 | cachetable_m_flow_control_cond_key = new toku_instr_key( |
4797 | toku_instr_object_type::cond, toku_instr_group_name, |
4798 | "cachetable_m_flow_control_cond" ); |
4799 | cachetable_m_ev_thread_cond_key = new toku_instr_key( |
4800 | toku_instr_object_type::cond, toku_instr_group_name, |
4801 | "cachetable_m_ev_thread_cond" ); |
4802 | bfs_cond_key = |
4803 | new toku_instr_key(toku_instr_object_type::cond, toku_instr_group_name, |
4804 | "bfs_cond" ); |
4805 | result_output_condition_key = new toku_instr_key( |
4806 | toku_instr_object_type::cond, toku_instr_group_name, |
4807 | "result_output_condition" ); |
4808 | manager_m_escalator_done_key = new toku_instr_key( |
4809 | toku_instr_object_type::cond, toku_instr_group_name, |
4810 | "manager_m_escalator_done" ); |
4811 | lock_request_m_wait_cond_key = new toku_instr_key( |
4812 | toku_instr_object_type::cond, toku_instr_group_name, |
4813 | "lock_request_m_wait_cond" ); |
4814 | queue_result_cond_key = new toku_instr_key( |
4815 | toku_instr_object_type::cond, toku_instr_group_name, |
4816 | "queue_result_cond" ); |
4817 | ws_worker_wait_key = new toku_instr_key( |
4818 | toku_instr_object_type::cond, toku_instr_group_name, "ws_worker_wait" ); |
4819 | rwlock_wait_read_key = new toku_instr_key( |
4820 | toku_instr_object_type::cond, toku_instr_group_name, "rwlock_wait_read" ); |
4821 | rwlock_wait_write_key = new toku_instr_key( |
4822 | toku_instr_object_type::cond, toku_instr_group_name, |
4823 | "rwlock_wait_write" ); |
4824 | rwlock_cond_key = |
4825 | new toku_instr_key(toku_instr_object_type::cond, toku_instr_group_name, |
4826 | "rwlock_cond" ); |
4827 | tp_thread_wait_key = new toku_instr_key( |
4828 | toku_instr_object_type::cond, toku_instr_group_name, "tp_thread_wait" ); |
4829 | tp_pool_wait_free_key = new toku_instr_key( |
4830 | toku_instr_object_type::cond, toku_instr_group_name, |
4831 | "tp_pool_wait_free" ); |
4832 | frwlock_m_wait_read_key = new toku_instr_key( |
4833 | toku_instr_object_type::cond, toku_instr_group_name, |
4834 | "frwlock_m_wait_read" ); |
4835 | kibbutz_k_cond_key = new toku_instr_key( |
4836 | toku_instr_object_type::cond, toku_instr_group_name, "kibbutz_k_cond" ); |
4837 | minicron_p_condvar_key = new toku_instr_key( |
4838 | toku_instr_object_type::cond, toku_instr_group_name, |
4839 | "minicron_p_condvar" ); |
4840 | locktree_request_info_retry_cv_key = new toku_instr_key( |
4841 | toku_instr_object_type::cond, toku_instr_group_name, |
4842 | "locktree_request_info_retry_cv_key" ); |
4843 | |
4844 | multi_operation_lock_key = new toku_instr_key( |
4845 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4846 | "multi_operation_lock" ); |
4847 | low_priority_multi_operation_lock_key = |
4848 | new toku_instr_key(toku_instr_object_type::rwlock, |
4849 | toku_instr_group_name, |
4850 | "low_priority_multi_operation_lock" ); |
4851 | cachetable_m_list_lock_key = new toku_instr_key( |
4852 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4853 | "cachetable_m_list_lock" ); |
4854 | cachetable_m_pending_lock_expensive_key = |
4855 | new toku_instr_key(toku_instr_object_type::rwlock, |
4856 | toku_instr_group_name, |
4857 | "cachetable_m_pending_lock_expensive" ); |
4858 | cachetable_m_pending_lock_cheap_key = |
4859 | new toku_instr_key(toku_instr_object_type::rwlock, |
4860 | toku_instr_group_name, |
4861 | "cachetable_m_pending_lock_cheap" ); |
4862 | cachetable_m_lock_key = new toku_instr_key( |
4863 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4864 | "cachetable_m_lock" ); |
4865 | result_i_open_dbs_rwlock_key = new toku_instr_key( |
4866 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4867 | "result_i_open_dbs_rwlock" ); |
4868 | checkpoint_safe_rwlock_key = new toku_instr_key( |
4869 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4870 | "checkpoint_safe_rwlock" ); |
4871 | cachetable_value_key = new toku_instr_key( |
4872 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4873 | "cachetable_value" ); |
4874 | safe_file_size_lock_rwlock_key = new toku_instr_key( |
4875 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4876 | "safe_file_size_lock_rwlock" ); |
4877 | cachetable_disk_nb_rwlock_key = new toku_instr_key( |
4878 | toku_instr_object_type::rwlock, toku_instr_group_name, |
4879 | "cachetable_disk_nb_rwlock" ); |
4880 | |
4881 | toku_instr_probe_1 = new toku_instr_probe(*fti_probe_1_key); |
4882 | } |
4883 | |
4884 | static void toku_pfs_keys_destroy(void) { |
4885 | delete kibbutz_mutex_key; |
4886 | delete minicron_p_mutex_key; |
4887 | delete queue_result_mutex_key; |
4888 | delete tpool_lock_mutex_key; |
4889 | delete workset_lock_mutex_key; |
4890 | delete bjm_jobs_lock_mutex_key; |
4891 | delete log_internal_lock_mutex_key; |
4892 | delete cachetable_ev_thread_lock_mutex_key; |
4893 | delete cachetable_disk_nb_mutex_key; |
4894 | delete safe_file_size_lock_mutex_key; |
4895 | delete cachetable_m_mutex_key; |
4896 | delete checkpoint_safe_mutex_key; |
4897 | delete ft_ref_lock_mutex_key; |
4898 | delete ft_open_close_lock_mutex_key; |
4899 | delete loader_error_mutex_key; |
4900 | delete bfs_mutex_key; |
4901 | delete loader_bl_mutex_key; |
4902 | delete loader_fi_lock_mutex_key; |
4903 | delete loader_out_mutex_key; |
4904 | delete result_output_condition_lock_mutex_key; |
4905 | delete block_table_mutex_key; |
4906 | delete rollback_log_node_cache_mutex_key; |
4907 | delete txn_lock_mutex_key; |
4908 | delete txn_state_lock_mutex_key; |
4909 | delete txn_child_manager_mutex_key; |
4910 | delete txn_manager_lock_mutex_key; |
4911 | delete treenode_mutex_key; |
4912 | delete locktree_request_info_mutex_key; |
4913 | delete locktree_request_info_retry_mutex_key; |
4914 | delete manager_mutex_key; |
4915 | delete manager_escalation_mutex_key; |
4916 | delete db_txn_struct_i_txn_mutex_key; |
4917 | delete manager_escalator_mutex_key; |
4918 | delete indexer_i_indexer_lock_mutex_key; |
4919 | delete indexer_i_indexer_estimate_lock_mutex_key; |
4920 | |
4921 | delete tokudb_file_data_key; |
4922 | delete tokudb_file_load_key; |
4923 | delete tokudb_file_tmp_key; |
4924 | delete tokudb_file_log_key; |
4925 | |
4926 | delete fti_probe_1_key; |
4927 | |
4928 | delete extractor_thread_key; |
4929 | delete fractal_thread_key; |
4930 | delete io_thread_key; |
4931 | delete eviction_thread_key; |
4932 | delete kibbutz_thread_key; |
4933 | delete minicron_thread_key; |
4934 | delete tp_internal_thread_key; |
4935 | |
4936 | delete result_state_cond_key; |
4937 | delete bjm_jobs_wait_key; |
4938 | delete cachetable_p_refcount_wait_key; |
4939 | delete cachetable_m_flow_control_cond_key; |
4940 | delete cachetable_m_ev_thread_cond_key; |
4941 | delete bfs_cond_key; |
4942 | delete result_output_condition_key; |
4943 | delete manager_m_escalator_done_key; |
4944 | delete lock_request_m_wait_cond_key; |
4945 | delete queue_result_cond_key; |
4946 | delete ws_worker_wait_key; |
4947 | delete rwlock_wait_read_key; |
4948 | delete rwlock_wait_write_key; |
4949 | delete rwlock_cond_key; |
4950 | delete tp_thread_wait_key; |
4951 | delete tp_pool_wait_free_key; |
4952 | delete frwlock_m_wait_read_key; |
4953 | delete kibbutz_k_cond_key; |
4954 | delete minicron_p_condvar_key; |
4955 | delete locktree_request_info_retry_cv_key; |
4956 | |
4957 | delete multi_operation_lock_key; |
4958 | delete low_priority_multi_operation_lock_key; |
4959 | delete cachetable_m_list_lock_key; |
4960 | delete cachetable_m_pending_lock_expensive_key; |
4961 | delete cachetable_m_pending_lock_cheap_key; |
4962 | delete cachetable_m_lock_key; |
4963 | delete result_i_open_dbs_rwlock_key; |
4964 | delete checkpoint_safe_rwlock_key; |
4965 | delete cachetable_value_key; |
4966 | delete safe_file_size_lock_rwlock_key; |
4967 | |
4968 | delete cachetable_disk_nb_rwlock_key; |
4969 | delete toku_instr_probe_1; |
4970 | } |
4971 | |
4972 | int toku_ft_layer_init(void) { |
4973 | int r = 0; |
4974 | |
4975 | // Portability must be initialized first |
4976 | r = toku_portability_init(); |
4977 | if (r) { |
4978 | goto exit; |
4979 | } |
4980 | |
4981 | toku_pfs_keys_init("fti" ); |
4982 | |
4983 | r = db_env_set_toku_product_name("tokudb" ); |
4984 | if (r) { |
4985 | goto exit; |
4986 | } |
4987 | |
4988 | partitioned_counters_init(); |
4989 | toku_status_init(); |
4990 | toku_context_status_init(); |
4991 | toku_checkpoint_init(); |
4992 | toku_ft_serialize_layer_init(); |
4993 | toku_mutex_init( |
4994 | *ft_open_close_lock_mutex_key, &ft_open_close_lock, nullptr); |
4995 | toku_scoped_malloc_init(); |
4996 | exit: |
4997 | return r; |
4998 | } |
4999 | |
5000 | void toku_ft_layer_destroy(void) { |
5001 | toku_mutex_destroy(&ft_open_close_lock); |
5002 | toku_ft_serialize_layer_destroy(); |
5003 | toku_checkpoint_destroy(); |
5004 | toku_context_status_destroy(); |
5005 | toku_status_destroy(); |
5006 | partitioned_counters_destroy(); |
5007 | toku_scoped_malloc_destroy(); |
5008 | toku_pfs_keys_destroy(); |
5009 | |
5010 | // Portability must be cleaned up last |
5011 | toku_portability_destroy(); |
5012 | } |
5013 | |
5014 | // This lock serializes all opens and closes because the cachetable requires that clients do not try to open or close a cachefile in parallel. We made |
5015 | // it coarser by not allowing any cachefiles to be open or closed in parallel. |
5016 | void toku_ft_open_close_lock(void) { |
5017 | toku_mutex_lock(&ft_open_close_lock); |
5018 | } |
5019 | |
5020 | void toku_ft_open_close_unlock(void) { |
5021 | toku_mutex_unlock(&ft_open_close_lock); |
5022 | } |
5023 | |
5024 | // Prepare to remove a dictionary from the database when this transaction is committed: |
5025 | // - mark transaction as NEED fsync on commit |
5026 | // - make entry in rollback log |
5027 | // - make fdelete entry in recovery log |
5028 | // |
5029 | // Effect: when the txn commits, the ft's cachefile will be marked as unlink |
5030 | // on close. see toku_commit_fdelete and how unlink on close works |
5031 | // in toku_cachefile_close(); |
5032 | // Requires: serialized with begin checkpoint |
5033 | // this does not need to take the open close lock because |
5034 | // 1.) the ft/cf cannot go away because we have a live handle. |
5035 | // 2.) we're not setting the unlink on close bit _here_. that |
5036 | // happens on txn commit (as the name suggests). |
5037 | // 3.) we're already holding the multi operation lock to |
5038 | // synchronize with begin checkpoint. |
5039 | // Contract: the iname of the ft should never be reused. |
5040 | void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn) { |
5041 | assert(txn); |
5042 | |
5043 | CACHEFILE cf = handle->ft->cf; |
5044 | FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf)); |
5045 | |
5046 | toku_txn_maybe_note_ft(txn, ft); |
5047 | |
5048 | // If the txn commits, the commit MUST be in the log before the file is actually unlinked |
5049 | toku_txn_force_fsync_on_commit(txn); |
5050 | // make entry in rollback log |
5051 | FILENUM filenum = toku_cachefile_filenum(cf); |
5052 | toku_logger_save_rollback_fdelete(txn, filenum); |
5053 | // make entry in recovery log |
5054 | toku_logger_log_fdelete(txn, filenum); |
5055 | } |
5056 | |
5057 | // Non-transactional version of fdelete |
5058 | // |
5059 | // Effect: The ft file is unlinked when the handle closes and it's ft is not |
5060 | // pinned by checkpoint. see toku_remove_ft_ref() and how unlink on |
5061 | // close works in toku_cachefile_close(); |
5062 | // Requires: serialized with begin checkpoint |
5063 | void toku_ft_unlink(FT_HANDLE handle) { |
5064 | CACHEFILE cf; |
5065 | cf = handle->ft->cf; |
5066 | toku_cachefile_unlink_on_close(cf); |
5067 | } |
5068 | |
5069 | int toku_ft_rename_iname(DB_TXN *txn, |
5070 | const char *data_dir, |
5071 | const char *old_iname, |
5072 | const char *new_iname, |
5073 | CACHETABLE ct) { |
5074 | int r = 0; |
5075 | |
5076 | std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(nullptr, |
5077 | &toku_free); |
5078 | std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(nullptr, |
5079 | &toku_free); |
5080 | |
5081 | new_iname_full.reset(toku_construct_full_name(2, data_dir, new_iname)); |
5082 | old_iname_full.reset(toku_construct_full_name(2, data_dir, old_iname)); |
5083 | |
5084 | if (txn) { |
5085 | BYTESTRING bs_old_name = {static_cast<uint32_t>(strlen(old_iname) + 1), |
5086 | const_cast<char *>(old_iname)}; |
5087 | BYTESTRING bs_new_name = {static_cast<uint32_t>(strlen(new_iname) + 1), |
5088 | const_cast<char *>(new_iname)}; |
5089 | FILENUM filenum = FILENUM_NONE; |
5090 | { |
5091 | CACHEFILE cf; |
5092 | r = toku_cachefile_of_iname_in_env(ct, old_iname, &cf); |
5093 | if (r != ENOENT) { |
5094 | char *old_fname_in_cf = toku_cachefile_fname_in_env(cf); |
5095 | toku_cachefile_set_fname_in_env(cf, toku_xstrdup(new_iname)); |
5096 | toku_free(old_fname_in_cf); |
5097 | filenum = toku_cachefile_filenum(cf); |
5098 | } |
5099 | } |
5100 | toku_logger_save_rollback_frename( |
5101 | db_txn_struct_i(txn)->tokutxn, &bs_old_name, &bs_new_name); |
5102 | toku_log_frename(db_txn_struct_i(txn)->tokutxn->logger, |
5103 | (LSN *)0, |
5104 | 0, |
5105 | toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn), |
5106 | bs_old_name, |
5107 | filenum, |
5108 | bs_new_name); |
5109 | } |
5110 | |
5111 | if (!toku_create_subdirs_if_needed(new_iname_full.get())) |
5112 | return get_error_errno(); |
5113 | r = toku_os_rename(old_iname_full.get(), new_iname_full.get()); |
5114 | if (r != 0) |
5115 | return r; |
5116 | r = toku_fsync_directory(new_iname_full.get()); |
5117 | return r; |
5118 | } |
5119 | |
5120 | int toku_ft_get_fragmentation(FT_HANDLE ft_handle, TOKU_DB_FRAGMENTATION report) { |
5121 | int fd = toku_cachefile_get_fd(ft_handle->ft->cf); |
5122 | toku_ft_lock(ft_handle->ft); |
5123 | |
5124 | int64_t file_size; |
5125 | int r = toku_os_get_file_size(fd, &file_size); |
5126 | if (r == 0) { |
5127 | report->file_size_bytes = file_size; |
5128 | ft_handle->ft->blocktable.get_fragmentation_unlocked(report); |
5129 | } |
5130 | toku_ft_unlock(ft_handle->ft); |
5131 | return r; |
5132 | } |
5133 | |
5134 | static bool is_empty_fast_iter (FT_HANDLE ft_handle, FTNODE node) { |
5135 | if (node->height > 0) { |
5136 | for (int childnum=0; childnum<node->n_children; childnum++) { |
5137 | if (toku_bnc_nbytesinbuf(BNC(node, childnum)) != 0) { |
5138 | return 0; // it's not empty if there are bytes in buffers |
5139 | } |
5140 | FTNODE childnode; |
5141 | { |
5142 | BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum); |
5143 | uint32_t fullhash = compute_child_fullhash(ft_handle->ft->cf, node, childnum); |
5144 | ftnode_fetch_extra bfe; |
5145 | bfe.create_for_full_read(ft_handle->ft); |
5146 | // don't need to pass in dependent nodes as we are not |
5147 | // modifying nodes we are pinning |
5148 | toku_pin_ftnode( |
5149 | ft_handle->ft, |
5150 | childblocknum, |
5151 | fullhash, |
5152 | &bfe, |
5153 | PL_READ, // may_modify_node set to false, as nodes not modified |
5154 | &childnode, |
5155 | true |
5156 | ); |
5157 | } |
5158 | int child_is_empty = is_empty_fast_iter(ft_handle, childnode); |
5159 | toku_unpin_ftnode(ft_handle->ft, childnode); |
5160 | if (!child_is_empty) return 0; |
5161 | } |
5162 | return 1; |
5163 | } else { |
5164 | // leaf: If the dmt is empty, we are happy. |
5165 | for (int i = 0; i < node->n_children; i++) { |
5166 | if (BLB_DATA(node, i)->num_klpairs()) { |
5167 | return false; |
5168 | } |
5169 | } |
5170 | return true; |
5171 | } |
5172 | } |
5173 | |
5174 | bool toku_ft_is_empty_fast (FT_HANDLE ft_handle) |
5175 | // A fast check to see if the tree is empty. If there are any messages or leafentries, we consider the tree to be nonempty. It's possible that those |
5176 | // messages and leafentries would all optimize away and that the tree is empty, but we'll say it is nonempty. |
5177 | { |
5178 | uint32_t fullhash; |
5179 | FTNODE node; |
5180 | { |
5181 | CACHEKEY root_key; |
5182 | toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash); |
5183 | ftnode_fetch_extra bfe; |
5184 | bfe.create_for_full_read(ft_handle->ft); |
5185 | toku_pin_ftnode( |
5186 | ft_handle->ft, |
5187 | root_key, |
5188 | fullhash, |
5189 | &bfe, |
5190 | PL_READ, // may_modify_node set to false, node does not change |
5191 | &node, |
5192 | true |
5193 | ); |
5194 | } |
5195 | bool r = is_empty_fast_iter(ft_handle, node); |
5196 | toku_unpin_ftnode(ft_handle->ft, node); |
5197 | return r; |
5198 | } |
5199 | |
5200 | // test-only |
5201 | int toku_ft_strerror_r(int error, char *buf, size_t buflen) |
5202 | { |
5203 | if (error>=0) { |
5204 | return (long) strerror_r(error, buf, buflen); |
5205 | } else { |
5206 | switch (error) { |
5207 | case DB_KEYEXIST: |
5208 | snprintf(buf, buflen, "Key exists" ); |
5209 | return 0; |
5210 | case TOKUDB_CANCELED: |
5211 | snprintf(buf, buflen, "User canceled operation" ); |
5212 | return 0; |
5213 | default: |
5214 | snprintf(buf, buflen, "Unknown error %d" , error); |
5215 | return EINVAL; |
5216 | } |
5217 | } |
5218 | } |
5219 | |
5220 | int toku_keycompare(const void *key1, uint32_t key1len, const void *key2, uint32_t key2len) { |
5221 | int comparelen = key1len < key2len ? key1len : key2len; |
5222 | int c = memcmp(key1, key2, comparelen); |
5223 | if (__builtin_expect(c != 0, 1)) { |
5224 | return c; |
5225 | } else { |
5226 | if (key1len < key2len) { |
5227 | return -1; |
5228 | } else if (key1len > key2len) { |
5229 | return 1; |
5230 | } else { |
5231 | return 0; |
5232 | } |
5233 | } |
5234 | } |
5235 | |
5236 | int toku_builtin_compare_fun(DB *db __attribute__((__unused__)), const DBT *a, const DBT*b) { |
5237 | return toku_keycompare(a->data, a->size, b->data, b->size); |
5238 | } |
5239 | |
5240 | #include <toku_race_tools.h> |
5241 | void __attribute__((__constructor__)) toku_ft_helgrind_ignore(void); |
5242 | void |
5243 | toku_ft_helgrind_ignore(void) { |
5244 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&ft_status, sizeof ft_status); |
5245 | } |
5246 | |