1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39/*
40
41Managing the tree shape: How insertion, deletion, and querying work
42
43When we insert a message into the FT_HANDLE, here's what happens.
44
45to insert a message at the root
46
47 - find the root node
48 - capture the next msn of the root node and assign it to the message
49 - split the root if it needs to be split
50 - insert the message into the root buffer
51 - if the root is too full, then toku_ft_flush_some_child() of the root on a flusher thread
52
53flusher functions use an advice struct with provides some functions to
54call that tell it what to do based on the context of the flush. see ft-flusher.h
55
56to flush some child, given a parent and some advice
57 - pick the child using advice->pick_child()
58 - remove that childs buffer from the parent
59 - flush the buffer to the child
60 - if the child has stable reactivity and
61 advice->should_recursively_flush() is true, then
62 toku_ft_flush_some_child() of the child
63 - otherwise split the child if it needs to be split
64 - otherwise maybe merge the child if it needs to be merged
65
66flusher threads:
67
68 flusher threads are created on demand as the result of internal nodes
69 becoming gorged by insertions. this allows flushing to be done somewhere
70 other than the client thread. these work items are enqueued onto
71 the cachetable kibbutz and are done in a first in first out order.
72
73cleaner threads:
74
75 the cleaner thread wakes up every so often (say, 1 second) and chooses
76 a small number (say, 5) of nodes as candidates for a flush. the one
77 with the largest cache pressure is chosen to be flushed. cache pressure
78 is a function of the size of the node in the cachetable plus the work done.
79 the cleaner thread need not actually do a flush when awoken, so only
80 nodes that have sufficient cache pressure are flushed.
81
82checkpointing:
83
84 the checkpoint thread wakes up every minute to checkpoint dirty nodes
85 to disk. at the time of this writing, nodes during checkpoint are
86 locked and cannot be queried or flushed to. a design in which nodes
87 are copied before checkpoint is being considered as a way to reduce
88 the performance variability caused by a checkpoint locking too
89 many nodes and preventing other threads from traversing down the tree,
90 for a query or otherwise.
91
92To shrink a file: Let X be the size of the reachable data.
93 We define an acceptable bloat constant of C. For example we set C=2 if we are willing to allow the file to be as much as 2X in size.
94 The goal is to find the smallest amount of stuff we can move to get the file down to size CX.
95 That seems like a difficult problem, so we use the following heuristics:
96 If we can relocate the last block to an lower location, then do so immediately. (The file gets smaller right away, so even though the new location
97 may even not be in the first CX bytes, we are making the file smaller.)
98 Otherwise all of the earlier blocks are smaller than the last block (of size L). So find the smallest region that has L free bytes in it.
99 (This can be computed in one pass)
100 Move the first allocated block in that region to some location not in the interior of the region.
101 (Outside of the region is OK, and reallocating the block at the edge of the region is OK).
102 This has the effect of creating a smaller region with at least L free bytes in it.
103 Go back to the top (because by now some other block may have been allocated or freed).
104 Claim: if there are no other allocations going on concurrently, then this algorithm will shrink the file reasonably efficiently. By this I mean that
105 each block of shrinkage does the smallest amount of work possible. That doesn't mean that the work overall is minimized.
106 Note: If there are other allocations and deallocations going on concurrently, we might never get enough space to move the last block. But it takes a lot
107 of allocations and deallocations to make that happen, and it's probably reasonable for the file not to shrink in this case.
108
109To split or merge a child of a node:
110Split_or_merge (node, childnum) {
111 If the child needs to be split (it's a leaf with too much stuff or a nonleaf with too much fanout)
112 fetch the node and the child into main memory.
113 split the child, producing two nodes A and B, and also a pivot. Don't worry if the resulting child is still too big or too small. Fix it on the next pass.
114 fixup node to point at the two new children. Don't worry about the node getting too much fanout.
115 return;
116 If the child needs to be merged (it's a leaf with too little stuff (less than 1/4 full) or a nonleaf with too little fanout (less than 1/4)
117 fetch node, the child and a sibling of the child into main memory.
118 move all messages from the node to the two children (so that the message buffers are empty)
119 If the two siblings together fit into one node then
120 merge the two siblings.
121 fixup the node to point at one child
122 Otherwise
123 load balance the content of the two nodes
124 Don't worry about the resulting children having too many messages or otherwise being too big or too small. Fix it on the next pass.
125 }
126}
127
128Here's how querying works:
129
130lookups:
131 - As of Dr. No, we don't do any tree shaping on lookup.
132 - We don't promote eagerly or use aggressive promotion or passive-aggressive
133 promotion. We just push messages down according to the traditional FT_HANDLE
134 algorithm on insertions.
135 - when a node is brought into memory, we apply ancestor messages above it.
136
137basement nodes, bulk fetch, and partial fetch:
138 - leaf nodes are comprised of N basement nodes, each of nominal size. when
139 a query hits a leaf node. it may require one or more basement nodes to be in memory.
140 - for point queries, we do not read the entire node into memory. instead,
141 we only read in the required basement node
142 - for range queries, cursors may return cursor continue in their callback
143 to take a the shortcut path until the end of the basement node.
144 - for range queries, cursors may prelock a range of keys (with or without a txn).
145 the fractal tree will prefetch nodes aggressively until the end of the range.
146 - without a prelocked range, range queries behave like successive point queries.
147
148*/
149
150#include <my_global.h>
151#include "ft/cachetable/checkpoint.h"
152#include "ft/cursor.h"
153#include "ft/ft-cachetable-wrappers.h"
154#include "ft/ft-flusher.h"
155#include "ft/ft-internal.h"
156#include "ft/ft.h"
157#include "ft/leafentry.h"
158#include "ft/logger/log-internal.h"
159#include "ft/msg.h"
160#include "ft/node.h"
161#include "ft/serialize/block_table.h"
162#include "ft/serialize/ft-serialize.h"
163#include "ft/serialize/ft_layout_version.h"
164#include "ft/serialize/ft_node-serialize.h"
165#include "ft/serialize/sub_block.h"
166#include "ft/txn/txn_manager.h"
167#include "ft/txn/xids.h"
168#include "ft/ule.h"
169#include "src/ydb-internal.h"
170
171#include <toku_race_tools.h>
172
173#include <portability/toku_atomic.h>
174
175#include <util/context.h>
176#include <util/mempool.h>
177#include <util/status.h>
178#include <util/rwlock.h>
179#include <util/sort.h>
180#include <util/scoped_malloc.h>
181
182#include <stdint.h>
183
184#include <memory>
185/* Status is intended for display to humans to help understand system behavior.
186 * It does not need to be perfectly thread-safe.
187 */
188
189static toku_mutex_t ft_open_close_lock;
190static toku_instr_key *ft_open_close_lock_mutex_key;
191// FIXME: the instrumentation keys below are defined here even though they
192// belong to other modules, because they are registered here. If desired, they
193// can be moved to their proper modules and registration done there in a
194// one-time init function
195// locktree
196toku_instr_key *treenode_mutex_key;
197toku_instr_key *manager_mutex_key;
198toku_instr_key *manager_escalation_mutex_key;
199toku_instr_key *manager_escalator_mutex_key;
200// src
201toku_instr_key *db_txn_struct_i_txn_mutex_key;
202toku_instr_key *indexer_i_indexer_lock_mutex_key;
203toku_instr_key *indexer_i_indexer_estimate_lock_mutex_key;
204toku_instr_key *result_i_open_dbs_rwlock_key;
205// locktree
206toku_instr_key *lock_request_m_wait_cond_key;
207toku_instr_key *manager_m_escalator_done_key;
208toku_instr_key *locktree_request_info_mutex_key;
209toku_instr_key *locktree_request_info_retry_mutex_key;
210toku_instr_key *locktree_request_info_retry_cv_key;
211
212// this is a sample probe for custom instrumentation
213static toku_instr_key *fti_probe_1_key;
214
215// This is a sample probe for custom instrumentation
216toku_instr_probe *toku_instr_probe_1;
217
218void toku_ft_get_status(FT_STATUS s) {
219 ft_status.init();
220 *s = ft_status;
221
222 // Calculate compression ratios for leaf and nonleaf nodes
223 const double compressed_leaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_BYTES) +
224 FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT);
225 const double uncompressed_leaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES) +
226 FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT);
227 const double compressed_nonleaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_BYTES) +
228 FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT);
229 const double uncompressed_nonleaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES) +
230 FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT);
231
232 if (compressed_leaf_bytes > 0) {
233 s->status[FT_STATUS_S::FT_DISK_FLUSH_LEAF_COMPRESSION_RATIO].value.dnum
234 = uncompressed_leaf_bytes / compressed_leaf_bytes;
235 }
236 if (compressed_nonleaf_bytes > 0) {
237 s->status[FT_STATUS_S::FT_DISK_FLUSH_NONLEAF_COMPRESSION_RATIO].value.dnum
238 = uncompressed_nonleaf_bytes / compressed_nonleaf_bytes;
239 }
240 if (compressed_leaf_bytes > 0 || compressed_nonleaf_bytes > 0) {
241 s->status[FT_STATUS_S::FT_DISK_FLUSH_OVERALL_COMPRESSION_RATIO].value.dnum
242 = (uncompressed_leaf_bytes + uncompressed_nonleaf_bytes) /
243 (compressed_leaf_bytes + compressed_nonleaf_bytes);
244 }
245}
246
247void toku_note_deserialized_basement_node(bool fixed_key_size) {
248 if (fixed_key_size) {
249 FT_STATUS_INC(FT_BASEMENT_DESERIALIZE_FIXED_KEYSIZE, 1);
250 } else {
251 FT_STATUS_INC(FT_BASEMENT_DESERIALIZE_VARIABLE_KEYSIZE, 1);
252 }
253}
254
255static void ft_verify_flags(FT UU(ft), FTNODE UU(node)) {
256 paranoid_invariant(ft->h->flags == node->flags);
257}
258
259int toku_ft_debug_mode = 0;
260
261uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) {
262 paranoid_invariant(node->height>0);
263 paranoid_invariant(childnum<node->n_children);
264 return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum));
265}
266
267//
268// pivot bounds
269// TODO: move me to ft/node.cc?
270//
271
272pivot_bounds::pivot_bounds(const DBT &lbe_dbt, const DBT &ubi_dbt) :
273 _lower_bound_exclusive(lbe_dbt), _upper_bound_inclusive(ubi_dbt) {
274}
275
276pivot_bounds pivot_bounds::infinite_bounds() {
277 DBT dbt;
278 toku_init_dbt(&dbt);
279
280 // infinity is represented by an empty dbt
281 invariant(toku_dbt_is_empty(&dbt));
282 return pivot_bounds(dbt, dbt);
283}
284
285const DBT *pivot_bounds::lbe() const {
286 return &_lower_bound_exclusive;
287}
288
289const DBT *pivot_bounds::ubi() const {
290 return &_upper_bound_inclusive;
291}
292
293DBT pivot_bounds::_prepivotkey(FTNODE node, int childnum, const DBT &lbe_dbt) const {
294 if (childnum == 0) {
295 return lbe_dbt;
296 } else {
297 return node->pivotkeys.get_pivot(childnum - 1);
298 }
299}
300
301DBT pivot_bounds::_postpivotkey(FTNODE node, int childnum, const DBT &ubi_dbt) const {
302 if (childnum + 1 == node->n_children) {
303 return ubi_dbt;
304 } else {
305 return node->pivotkeys.get_pivot(childnum);
306 }
307}
308
309pivot_bounds pivot_bounds::next_bounds(FTNODE node, int childnum) const {
310 return pivot_bounds(_prepivotkey(node, childnum, _lower_bound_exclusive),
311 _postpivotkey(node, childnum, _upper_bound_inclusive));
312}
313
314////////////////////////////////////////////////////////////////////////////////
315
316static long get_avail_internal_node_partition_size(FTNODE node, int i) {
317 paranoid_invariant(node->height > 0);
318 return toku_bnc_memory_size(BNC(node, i));
319}
320
321static long ftnode_cachepressure_size(FTNODE node) {
322 long retval = 0;
323 bool totally_empty = true;
324 if (node->height == 0) {
325 goto exit;
326 }
327 else {
328 for (int i = 0; i < node->n_children; i++) {
329 if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) {
330 continue;
331 }
332 else if (BP_STATE(node,i) == PT_COMPRESSED) {
333 SUB_BLOCK sb = BSB(node, i);
334 totally_empty = false;
335 retval += sb->compressed_size;
336 }
337 else if (BP_STATE(node,i) == PT_AVAIL) {
338 totally_empty = totally_empty && (toku_bnc_n_entries(BNC(node, i)) == 0);
339 retval += get_avail_internal_node_partition_size(node, i);
340 retval += BP_WORKDONE(node, i);
341 }
342 else {
343 abort();
344 }
345 }
346 }
347exit:
348 if (totally_empty) {
349 return 0;
350 }
351 return retval;
352}
353
354static long
355ftnode_memory_size (FTNODE node)
356// Effect: Estimate how much main memory a node requires.
357{
358 long retval = 0;
359 int n_children = node->n_children;
360 retval += sizeof(*node);
361 retval += (n_children)*(sizeof(node->bp[0]));
362 retval += node->pivotkeys.total_size();
363
364 // now calculate the sizes of the partitions
365 for (int i = 0; i < n_children; i++) {
366 if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) {
367 continue;
368 }
369 else if (BP_STATE(node,i) == PT_COMPRESSED) {
370 SUB_BLOCK sb = BSB(node, i);
371 retval += sizeof(*sb);
372 retval += sb->compressed_size;
373 }
374 else if (BP_STATE(node,i) == PT_AVAIL) {
375 if (node->height > 0) {
376 retval += get_avail_internal_node_partition_size(node, i);
377 }
378 else {
379 BASEMENTNODE bn = BLB(node, i);
380 retval += sizeof(*bn);
381 retval += BLB_DATA(node, i)->get_memory_size();
382 }
383 }
384 else {
385 abort();
386 }
387 }
388 return retval;
389}
390
391PAIR_ATTR make_ftnode_pair_attr(FTNODE node) {
392 long size = ftnode_memory_size(node);
393 long cachepressure_size = ftnode_cachepressure_size(node);
394 PAIR_ATTR result={
395 .size = size,
396 .nonleaf_size = (node->height > 0) ? size : 0,
397 .leaf_size = (node->height > 0) ? 0 : size,
398 .rollback_size = 0,
399 .cache_pressure_size = cachepressure_size,
400 .is_valid = true
401 };
402 return result;
403}
404
405PAIR_ATTR make_invalid_pair_attr(void) {
406 PAIR_ATTR result={
407 .size = 0,
408 .nonleaf_size = 0,
409 .leaf_size = 0,
410 .rollback_size = 0,
411 .cache_pressure_size = 0,
412 .is_valid = false
413 };
414 return result;
415}
416
417
418// assign unique dictionary id
419static uint64_t dict_id_serial = 1;
420static DICTIONARY_ID
421next_dict_id(void) {
422 uint64_t i = toku_sync_fetch_and_add(&dict_id_serial, 1);
423 assert(i); // guarantee unique dictionary id by asserting 64-bit counter never wraps
424 DICTIONARY_ID d = {.dictid = i};
425 return d;
426}
427
428// TODO: This isn't so pretty
429void ftnode_fetch_extra::_create_internal(FT ft_) {
430 ft = ft_;
431 type = ftnode_fetch_none;
432 search = nullptr;
433
434 toku_init_dbt(&range_lock_left_key);
435 toku_init_dbt(&range_lock_right_key);
436 left_is_neg_infty = false;
437 right_is_pos_infty = false;
438
439 // -1 means 'unknown', which is the correct default state
440 child_to_read = -1;
441 disable_prefetching = false;
442 read_all_partitions = false;
443
444 bytes_read = 0;
445 io_time = 0;
446 deserialize_time = 0;
447 decompress_time = 0;
448}
449
450void ftnode_fetch_extra::create_for_full_read(FT ft_) {
451 _create_internal(ft_);
452
453 type = ftnode_fetch_all;
454}
455
456void ftnode_fetch_extra::create_for_keymatch(FT ft_, const DBT *left, const DBT *right,
457 bool disable_prefetching_, bool read_all_partitions_) {
458 _create_internal(ft_);
459 invariant(ft->h->type == FT_CURRENT);
460
461 type = ftnode_fetch_keymatch;
462 if (left != nullptr) {
463 toku_copyref_dbt(&range_lock_left_key, *left);
464 }
465 if (right != nullptr) {
466 toku_copyref_dbt(&range_lock_right_key, *right);
467 }
468 left_is_neg_infty = left == nullptr;
469 right_is_pos_infty = right == nullptr;
470 disable_prefetching = disable_prefetching_;
471 read_all_partitions = read_all_partitions_;
472}
473
474void ftnode_fetch_extra::create_for_subset_read(FT ft_, ft_search *search_,
475 const DBT *left, const DBT *right,
476 bool left_is_neg_infty_, bool right_is_pos_infty_,
477 bool disable_prefetching_, bool read_all_partitions_) {
478 _create_internal(ft_);
479 invariant(ft->h->type == FT_CURRENT);
480
481 type = ftnode_fetch_subset;
482 search = search_;
483 if (left != nullptr) {
484 toku_copyref_dbt(&range_lock_left_key, *left);
485 }
486 if (right != nullptr) {
487 toku_copyref_dbt(&range_lock_right_key, *right);
488 }
489 left_is_neg_infty = left_is_neg_infty_;
490 right_is_pos_infty = right_is_pos_infty_;
491 disable_prefetching = disable_prefetching_;
492 read_all_partitions = read_all_partitions_;
493}
494
495void ftnode_fetch_extra::create_for_min_read(FT ft_) {
496 _create_internal(ft_);
497 invariant(ft->h->type == FT_CURRENT);
498
499 type = ftnode_fetch_none;
500}
501
502void ftnode_fetch_extra::create_for_prefetch(FT ft_, struct ft_cursor *cursor) {
503 _create_internal(ft_);
504 invariant(ft->h->type == FT_CURRENT);
505
506 type = ftnode_fetch_prefetch;
507 const DBT *left = &cursor->range_lock_left_key;
508 if (left->data) {
509 toku_clone_dbt(&range_lock_left_key, *left);
510 }
511 const DBT *right = &cursor->range_lock_right_key;
512 if (right->data) {
513 toku_clone_dbt(&range_lock_right_key, *right);
514 }
515 left_is_neg_infty = cursor->left_is_neg_infty;
516 right_is_pos_infty = cursor->right_is_pos_infty;
517 disable_prefetching = cursor->disable_prefetching;
518}
519
520void ftnode_fetch_extra::destroy(void) {
521 toku_destroy_dbt(&range_lock_left_key);
522 toku_destroy_dbt(&range_lock_right_key);
523}
524
525// Requires: child_to_read to have been set
526bool ftnode_fetch_extra::wants_child_available(int childnum) const {
527 return type == ftnode_fetch_all ||
528 (child_to_read == childnum &&
529 (type == ftnode_fetch_subset || type == ftnode_fetch_keymatch));
530}
531
532int ftnode_fetch_extra::leftmost_child_wanted(FTNODE node) const {
533 paranoid_invariant(type == ftnode_fetch_subset ||
534 type == ftnode_fetch_prefetch ||
535 type == ftnode_fetch_keymatch);
536 if (left_is_neg_infty) {
537 return 0;
538 } else if (range_lock_left_key.data == nullptr) {
539 return -1;
540 } else {
541 return toku_ftnode_which_child(node, &range_lock_left_key, ft->cmp);
542 }
543}
544
545int ftnode_fetch_extra::rightmost_child_wanted(FTNODE node) const {
546 paranoid_invariant(type == ftnode_fetch_subset ||
547 type == ftnode_fetch_prefetch ||
548 type == ftnode_fetch_keymatch);
549 if (right_is_pos_infty) {
550 return node->n_children - 1;
551 } else if (range_lock_right_key.data == nullptr) {
552 return -1;
553 } else {
554 return toku_ftnode_which_child(node, &range_lock_right_key, ft->cmp);
555 }
556}
557
558static int
559ft_cursor_rightmost_child_wanted(FT_CURSOR cursor, FT_HANDLE ft_handle, FTNODE node)
560{
561 if (cursor->right_is_pos_infty) {
562 return node->n_children - 1;
563 } else if (cursor->range_lock_right_key.data == nullptr) {
564 return -1;
565 } else {
566 return toku_ftnode_which_child(node, &cursor->range_lock_right_key, ft_handle->ft->cmp);
567 }
568}
569
570STAT64INFO_S
571toku_get_and_clear_basement_stats(FTNODE leafnode) {
572 invariant(leafnode->height == 0);
573 STAT64INFO_S deltas = ZEROSTATS;
574 for (int i = 0; i < leafnode->n_children; i++) {
575 BASEMENTNODE bn = BLB(leafnode, i);
576 invariant(BP_STATE(leafnode,i) == PT_AVAIL);
577 deltas.numrows += bn->stat64_delta.numrows;
578 deltas.numbytes += bn->stat64_delta.numbytes;
579 bn->stat64_delta = ZEROSTATS;
580 }
581 return deltas;
582}
583
584void toku_ft_status_update_flush_reason(FTNODE node,
585 uint64_t uncompressed_bytes_flushed, uint64_t bytes_written,
586 tokutime_t write_time, bool for_checkpoint) {
587 if (node->height == 0) {
588 if (for_checkpoint) {
589 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, 1);
590 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT, bytes_written);
591 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed);
592 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME_FOR_CHECKPOINT, write_time);
593 }
594 else {
595 FT_STATUS_INC(FT_DISK_FLUSH_LEAF, 1);
596 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES, bytes_written);
597 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed);
598 FT_STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME, write_time);
599 }
600 }
601 else {
602 if (for_checkpoint) {
603 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, 1);
604 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT, bytes_written);
605 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed);
606 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, write_time);
607 }
608 else {
609 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF, 1);
610 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES, bytes_written);
611 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed);
612 FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME, write_time);
613 }
614 }
615}
616
617void toku_ftnode_checkpoint_complete_callback(void *value_data) {
618 FTNODE node = static_cast<FTNODE>(value_data);
619 if (node->height > 0) {
620 for (int i = 0; i < node->n_children; ++i) {
621 if (BP_STATE(node, i) == PT_AVAIL) {
622 NONLEAF_CHILDINFO bnc = BNC(node, i);
623 bnc->flow[1] = bnc->flow[0];
624 bnc->flow[0] = 0;
625 }
626 }
627 }
628}
629
630void toku_ftnode_clone_callback(void *value_data,
631 void **cloned_value_data,
632 long *clone_size,
633 PAIR_ATTR *new_attr,
634 bool for_checkpoint,
635 void *write_extraargs) {
636 FTNODE node = static_cast<FTNODE>(value_data);
637 toku_ftnode_assert_fully_in_memory(node);
638 FT ft = static_cast<FT>(write_extraargs);
639 FTNODE XCALLOC(cloned_node);
640 if (node->height == 0) {
641 // set header stats, must be done before rebalancing
642 toku_ftnode_update_disk_stats(node, ft, for_checkpoint);
643 // rebalance the leaf node
644 toku_ftnode_leaf_rebalance(node, ft->h->basementnodesize);
645 }
646
647 cloned_node->oldest_referenced_xid_known =
648 node->oldest_referenced_xid_known;
649 cloned_node->max_msn_applied_to_node_on_disk =
650 node->max_msn_applied_to_node_on_disk;
651 cloned_node->flags = node->flags;
652 cloned_node->blocknum = node->blocknum;
653 cloned_node->layout_version = node->layout_version;
654 cloned_node->layout_version_original = node->layout_version_original;
655 cloned_node->layout_version_read_from_disk =
656 node->layout_version_read_from_disk;
657 cloned_node->build_id = node->build_id;
658 cloned_node->height = node->height;
659 cloned_node->dirty = node->dirty;
660 cloned_node->fullhash = node->fullhash;
661 cloned_node->n_children = node->n_children;
662
663 XMALLOC_N(node->n_children, cloned_node->bp);
664 // clone pivots
665 cloned_node->pivotkeys.create_from_pivot_keys(node->pivotkeys);
666 if (node->height > 0) {
667 // need to move messages here so that we don't serialize stale
668 // messages to the fresh tree - ft verify code complains otherwise.
669 toku_move_ftnode_messages_to_stale(ft, node);
670 }
671 // clone partition
672 toku_ftnode_clone_partitions(node, cloned_node);
673
674 // clear dirty bit
675 node->dirty = 0;
676 cloned_node->dirty = 0;
677 node->layout_version_read_from_disk = FT_LAYOUT_VERSION;
678 // set new pair attr if necessary
679 if (node->height == 0) {
680 *new_attr = make_ftnode_pair_attr(node);
681 for (int i = 0; i < node->n_children; i++) {
682 if (BP_STATE(node, i) == PT_AVAIL) {
683 BLB_LRD(node, i) = 0;
684 BLB_LRD(cloned_node, i) = 0;
685 }
686 }
687 } else {
688 new_attr->is_valid = false;
689 }
690 *clone_size = ftnode_memory_size(cloned_node);
691 *cloned_value_data = cloned_node;
692}
693
694void toku_ftnode_flush_callback(CACHEFILE UU(cachefile),
695 int fd,
696 BLOCKNUM blocknum,
697 void *ftnode_v,
698 void **disk_data,
699 void *extraargs,
700 PAIR_ATTR size __attribute__((unused)),
701 PAIR_ATTR *new_size,
702 bool write_me,
703 bool keep_me,
704 bool for_checkpoint,
705 bool is_clone) {
706 FT ft = (FT)extraargs;
707 FTNODE ftnode = (FTNODE)ftnode_v;
708 FTNODE_DISK_DATA *ndd = (FTNODE_DISK_DATA *)disk_data;
709 assert(ftnode->blocknum.b == blocknum.b);
710 int height = ftnode->height;
711 if (write_me) {
712 toku_ftnode_assert_fully_in_memory(ftnode);
713 if (height > 0 && !is_clone) {
714 // cloned nodes already had their stale messages moved, see
715 // toku_ftnode_clone_callback()
716 toku_move_ftnode_messages_to_stale(ft, ftnode);
717 } else if (height == 0) {
718 toku_ftnode_leaf_run_gc(ft, ftnode);
719 if (!is_clone) {
720 toku_ftnode_update_disk_stats(ftnode, ft, for_checkpoint);
721 }
722 }
723 int r = toku_serialize_ftnode_to(
724 fd, ftnode->blocknum, ftnode, ndd, !is_clone, ft, for_checkpoint);
725 assert_zero(r);
726 ftnode->layout_version_read_from_disk = FT_LAYOUT_VERSION;
727 }
728 if (!keep_me) {
729 if (!is_clone) {
730 long node_size = ftnode_memory_size(ftnode);
731 if (ftnode->height == 0) {
732 FT_STATUS_INC(FT_FULL_EVICTIONS_LEAF, 1);
733 FT_STATUS_INC(FT_FULL_EVICTIONS_LEAF_BYTES, node_size);
734
735 // A leaf node (height == 0) is being evicted (!keep_me) and is
736 // not a checkpoint clone (!is_clone). This leaf node may have
737 // had messages applied to satisfy a query, but was never
738 // actually dirtied (!ftnode->dirty && !write_me). **Note that
739 // if (write_me) would persist the node and clear the dirty
740 // flag **. This message application may have updated the trees
741 // logical row count. Since these message applications are not
742 // persisted, we need undo the logical row count adjustments as
743 // they may occur again in the future if/when the node is
744 // re-read from disk for another query or change.
745 if (!ftnode->dirty && !write_me) {
746 int64_t lrc_delta = 0;
747 for (int i = 0; i < ftnode->n_children; i++) {
748 if (BP_STATE(ftnode, i) == PT_AVAIL) {
749 lrc_delta -= BLB_LRD(ftnode, i);
750 BLB_LRD(ftnode, i) = 0;
751 }
752 }
753 toku_ft_adjust_logical_row_count(ft, lrc_delta);
754 }
755 } else {
756 FT_STATUS_INC(FT_FULL_EVICTIONS_NONLEAF, 1);
757 FT_STATUS_INC(FT_FULL_EVICTIONS_NONLEAF_BYTES, node_size);
758 }
759 toku_free(*disk_data);
760 } else {
761 if (ftnode->height == 0) {
762 // No need to adjust logical row counts when flushing a clone
763 // as they should have been zeroed out anyway when cloned.
764 // Clones are 'copies' of work already done so doing it again
765 // (adjusting row counts) would be redundant and leads to
766 // inaccurate counts.
767 for (int i = 0; i < ftnode->n_children; i++) {
768 if (BP_STATE(ftnode, i) == PT_AVAIL) {
769 BASEMENTNODE bn = BLB(ftnode, i);
770 toku_ft_decrease_stats(&ft->in_memory_stats,
771 bn->stat64_delta);
772 }
773 }
774 }
775 }
776 toku_ftnode_free(&ftnode);
777 } else {
778 *new_size = make_ftnode_pair_attr(ftnode);
779 }
780}
781
782void
783toku_ft_status_update_pivot_fetch_reason(ftnode_fetch_extra *bfe)
784{
785 if (bfe->type == ftnode_fetch_prefetch) {
786 FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1);
787 FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_PREFETCH, bfe->bytes_read);
788 FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->io_time);
789 } else if (bfe->type == ftnode_fetch_all) {
790 FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1);
791 FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_WRITE, bfe->bytes_read);
792 FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->io_time);
793 } else if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_keymatch) {
794 FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1);
795 FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_QUERY, bfe->bytes_read);
796 FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->io_time);
797 }
798}
799
800int toku_ftnode_fetch_callback(CACHEFILE UU(cachefile),
801 PAIR p,
802 int fd,
803 BLOCKNUM blocknum,
804 uint32_t fullhash,
805 void **ftnode_pv,
806 void **disk_data,
807 PAIR_ATTR *sizep,
808 int *dirtyp,
809 void *extraargs) {
810 assert(extraargs);
811 assert(*ftnode_pv == nullptr);
812 FTNODE_DISK_DATA *ndd = (FTNODE_DISK_DATA *)disk_data;
813 ftnode_fetch_extra *bfe = (ftnode_fetch_extra *)extraargs;
814 FTNODE *node = (FTNODE *)ftnode_pv;
815 // deserialize the node, must pass the bfe in because we cannot
816 // evaluate what piece of the the node is necessary until we get it at
817 // least partially into memory
818 int r =
819 toku_deserialize_ftnode_from(fd, blocknum, fullhash, node, ndd, bfe);
820 if (r != 0) {
821 if (r == TOKUDB_BAD_CHECKSUM) {
822 fprintf(
823 stderr,
824 "%s:%d:toku_ftnode_fetch_callback - "
825 "file[%s], blocknum[%ld], toku_deserialize_ftnode_from "
826 "failed with a checksum error.\n",
827 __FILE__,
828 __LINE__,
829 toku_cachefile_fname_in_env(cachefile),
830 blocknum.b);
831 } else {
832 fprintf(
833 stderr,
834 "%s:%d:toku_ftnode_fetch_callback - "
835 "file[%s], blocknum[%ld], toku_deserialize_ftnode_from "
836 "failed with %d.\n",
837 __FILE__,
838 __LINE__,
839 toku_cachefile_fname_in_env(cachefile),
840 blocknum.b,
841 r);
842 }
843 // make absolutely sure we crash before doing anything else.
844 abort();
845 }
846
847 if (r == 0) {
848 *sizep = make_ftnode_pair_attr(*node);
849 (*node)->ct_pair = p;
850 *dirtyp = (*node)->dirty; // deserialize could mark the node as dirty
851 // (presumably for upgrade)
852 }
853 return r;
854}
855
856static bool ft_compress_buffers_before_eviction = true;
857
858void toku_ft_set_compress_buffers_before_eviction(bool compress_buffers) {
859 ft_compress_buffers_before_eviction = compress_buffers;
860}
861
862void toku_ftnode_pe_est_callback(
863 void* ftnode_pv,
864 void* disk_data,
865 long* bytes_freed_estimate,
866 enum partial_eviction_cost *cost,
867 void* UU(write_extraargs)
868 )
869{
870 paranoid_invariant(ftnode_pv != NULL);
871 long bytes_to_free = 0;
872 FTNODE node = static_cast<FTNODE>(ftnode_pv);
873 if (node->dirty || node->height == 0 ||
874 node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
875 *bytes_freed_estimate = 0;
876 *cost = PE_CHEAP;
877 goto exit;
878 }
879
880 //
881 // we are dealing with a clean internal node
882 //
883 *cost = PE_EXPENSIVE;
884 // now lets get an estimate for how much data we can free up
885 // we estimate the compressed size of data to be how large
886 // the compressed data is on disk
887 for (int i = 0; i < node->n_children; i++) {
888 if (BP_STATE(node,i) == PT_AVAIL && BP_SHOULD_EVICT(node,i)) {
889 // calculate how much data would be freed if
890 // we compress this node and add it to
891 // bytes_to_free
892
893 if (ft_compress_buffers_before_eviction) {
894 // first get an estimate for how much space will be taken
895 // after compression, it is simply the size of compressed
896 // data on disk plus the size of the struct that holds it
897 FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data;
898 uint32_t compressed_data_size = BP_SIZE(ndd, i);
899 compressed_data_size += sizeof(struct sub_block);
900
901 // now get the space taken now
902 uint32_t decompressed_data_size = get_avail_internal_node_partition_size(node,i);
903 bytes_to_free += (decompressed_data_size - compressed_data_size);
904 } else {
905 bytes_to_free += get_avail_internal_node_partition_size(node, i);
906 }
907 }
908 }
909
910 *bytes_freed_estimate = bytes_to_free;
911exit:
912 return;
913}
914
915// replace the child buffer with a compressed version of itself.
916static void compress_internal_node_partition(FTNODE node, int i, enum toku_compression_method compression_method) {
917 // if we should evict, compress the
918 // message buffer into a sub_block
919 assert(BP_STATE(node, i) == PT_AVAIL);
920 assert(node->height > 0);
921 SUB_BLOCK XMALLOC(sb);
922 sub_block_init(sb);
923 toku_create_compressed_partition_from_available(node, i, compression_method, sb);
924
925 // now set the state to compressed
926 set_BSB(node, i, sb);
927 BP_STATE(node,i) = PT_COMPRESSED;
928}
929
930// callback for partially evicting a node
931int toku_ftnode_pe_callback(void *ftnode_pv,
932 PAIR_ATTR old_attr,
933 void *write_extraargs,
934 void (*finalize)(PAIR_ATTR new_attr, void *extra),
935 void *finalize_extra) {
936 FTNODE node = (FTNODE)ftnode_pv;
937 FT ft = (FT)write_extraargs;
938 int num_partial_evictions = 0;
939
940 // Hold things we intend to destroy here.
941 // They will be taken care of after finalize().
942 int num_basements_to_destroy = 0;
943 int num_buffers_to_destroy = 0;
944 int num_pointers_to_free = 0;
945 BASEMENTNODE basements_to_destroy[node->n_children];
946 NONLEAF_CHILDINFO buffers_to_destroy[node->n_children];
947 void *pointers_to_free[node->n_children * 2];
948
949 // Don't partially evict dirty nodes
950 if (node->dirty) {
951 goto exit;
952 }
953 // Don't partially evict nodes whose partitions can't be read back
954 // from disk individually
955 if (node->layout_version_read_from_disk <
956 FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
957 goto exit;
958 }
959 //
960 // partial eviction for nonleaf nodes
961 //
962 if (node->height > 0) {
963 for (int i = 0; i < node->n_children; i++) {
964 if (BP_STATE(node, i) == PT_AVAIL) {
965 if (BP_SHOULD_EVICT(node, i)) {
966 NONLEAF_CHILDINFO bnc = BNC(node, i);
967 if (ft_compress_buffers_before_eviction &&
968 // We may not serialize and compress a partition in
969 // memory if its in memory layout version is different
970 // than what's on disk (and therefore requires upgrade).
971 //
972 // Auto-upgrade code assumes that if a node's layout
973 // version read from disk is not current, it MUST
974 // require upgrade.
975 // Breaking this rule would cause upgrade code to
976 // upgrade this partition again after we serialize it as
977 // the current version, which is bad.
978 node->layout_version ==
979 node->layout_version_read_from_disk) {
980 toku_ft_bnc_move_messages_to_stale(ft, bnc);
981 compress_internal_node_partition(
982 node,
983 i,
984 // Always compress with quicklz
985 TOKU_QUICKLZ_METHOD);
986 } else {
987 // We're not compressing buffers before eviction. Simply
988 // detach the buffer and set the child's state to
989 // on-disk.
990 set_BNULL(node, i);
991 BP_STATE(node, i) = PT_ON_DISK;
992 }
993 buffers_to_destroy[num_buffers_to_destroy++] = bnc;
994 num_partial_evictions++;
995 } else {
996 BP_SWEEP_CLOCK(node, i);
997 }
998 } else {
999 continue;
1000 }
1001 }
1002 } else {
1003 //
1004 // partial eviction strategy for basement nodes:
1005 // if the bn is compressed, evict it
1006 // else: check if it requires eviction, if it does, evict it, if not,
1007 // sweep the clock count
1008 //
1009 for (int i = 0; i < node->n_children; i++) {
1010 // Get rid of compressed stuff no matter what.
1011 if (BP_STATE(node, i) == PT_COMPRESSED) {
1012 SUB_BLOCK sb = BSB(node, i);
1013 pointers_to_free[num_pointers_to_free++] = sb->compressed_ptr;
1014 pointers_to_free[num_pointers_to_free++] = sb;
1015 set_BNULL(node, i);
1016 BP_STATE(node, i) = PT_ON_DISK;
1017 num_partial_evictions++;
1018 } else if (BP_STATE(node, i) == PT_AVAIL) {
1019 if (BP_SHOULD_EVICT(node, i)) {
1020 BASEMENTNODE bn = BLB(node, i);
1021 basements_to_destroy[num_basements_to_destroy++] = bn;
1022 toku_ft_decrease_stats(&ft->in_memory_stats,
1023 bn->stat64_delta);
1024 // A basement node is being partially evicted.
1025 // This masement node may have had messages applied to it to
1026 // satisfy a query, but was never actually dirtied.
1027 // This message application may have updated the trees
1028 // logical row count. Since these message applications are
1029 // not being persisted, we need undo the logical row count
1030 // adjustments as they may occur again in the future if/when
1031 // the node is re-read from disk for another query or change.
1032 toku_ft_adjust_logical_row_count(ft,
1033 -bn->logical_rows_delta);
1034 set_BNULL(node, i);
1035 BP_STATE(node, i) = PT_ON_DISK;
1036 num_partial_evictions++;
1037 } else {
1038 BP_SWEEP_CLOCK(node, i);
1039 }
1040 } else if (BP_STATE(node, i) == PT_ON_DISK) {
1041 continue;
1042 } else {
1043 abort();
1044 }
1045 }
1046 }
1047
1048exit:
1049 // call the finalize callback with a new pair attr
1050 int height = node->height;
1051 PAIR_ATTR new_attr = make_ftnode_pair_attr(node);
1052 finalize(new_attr, finalize_extra);
1053
1054 // destroy everything now that we've called finalize(),
1055 // and, by contract, and it's safe to do expensive work.
1056 for (int i = 0; i < num_basements_to_destroy; i++) {
1057 destroy_basement_node(basements_to_destroy[i]);
1058 }
1059 for (int i = 0; i < num_buffers_to_destroy; i++) {
1060 destroy_nonleaf_childinfo(buffers_to_destroy[i]);
1061 }
1062 for (int i = 0; i < num_pointers_to_free; i++) {
1063 toku_free(pointers_to_free[i]);
1064 }
1065 // stats
1066 if (num_partial_evictions > 0) {
1067 if (height == 0) {
1068 long delta = old_attr.leaf_size - new_attr.leaf_size;
1069 FT_STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF, num_partial_evictions);
1070 FT_STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF_BYTES, delta);
1071 } else {
1072 long delta = old_attr.nonleaf_size - new_attr.nonleaf_size;
1073 FT_STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF, num_partial_evictions);
1074 FT_STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF_BYTES, delta);
1075 }
1076 }
1077 return 0;
1078}
1079
1080// We touch the clock while holding a read lock.
1081// DRD reports a race but we want to ignore it.
1082// Using a valgrind suppressions file is better than the DRD_IGNORE_VAR macro because it's more targeted.
1083// We need a function to have something a drd suppression can reference
1084// see src/tests/drd.suppressions (unsafe_touch_clock)
1085static void unsafe_touch_clock(FTNODE node, int i) {
1086 toku_unsafe_set(&node->bp[i].clock_count, static_cast<unsigned char>(1));
1087}
1088
1089// Callback that states if a partial fetch of the node is necessary
1090// Currently, this function is responsible for the following things:
1091// - reporting to the cachetable whether a partial fetch is required (as required by the contract of the callback)
1092// - A couple of things that are NOT required by the callback, but we do for efficiency and simplicity reasons:
1093// - for queries, set the value of bfe->child_to_read so that the query that called this can proceed with the query
1094// as opposed to having to evaluate toku_ft_search_which_child again. This is done to make the in-memory query faster
1095// - touch the necessary partition's clock. The reason we do it here is so that there is one central place it is done, and not done
1096// by all the various callers
1097//
1098bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) {
1099 // placeholder for now
1100 bool retval = false;
1101 FTNODE node = (FTNODE) ftnode_pv;
1102 ftnode_fetch_extra *bfe = (ftnode_fetch_extra *) read_extraargs;
1103 //
1104 // The three types of fetches that the ft layer may request are:
1105 // - ftnode_fetch_none: no partitions are necessary (example use: stat64)
1106 // - ftnode_fetch_subset: some subset is necessary (example use: toku_ft_search)
1107 // - ftnode_fetch_all: entire node is necessary (example use: flush, split, merge)
1108 // The code below checks if the necessary partitions are already in memory,
1109 // and if they are, return false, and if not, return true
1110 //
1111 if (bfe->type == ftnode_fetch_none) {
1112 retval = false;
1113 }
1114 else if (bfe->type == ftnode_fetch_all) {
1115 retval = false;
1116 for (int i = 0; i < node->n_children; i++) {
1117 unsafe_touch_clock(node,i);
1118 // if we find a partition that is not available,
1119 // then a partial fetch is required because
1120 // the entire node must be made available
1121 if (BP_STATE(node,i) != PT_AVAIL) {
1122 retval = true;
1123 }
1124 }
1125 }
1126 else if (bfe->type == ftnode_fetch_subset) {
1127 // we do not take into account prefetching yet
1128 // as of now, if we need a subset, the only thing
1129 // we can possibly require is a single basement node
1130 // we find out what basement node the query cares about
1131 // and check if it is available
1132 paranoid_invariant(bfe->search);
1133 bfe->child_to_read = toku_ft_search_which_child(
1134 bfe->ft->cmp,
1135 node,
1136 bfe->search
1137 );
1138 unsafe_touch_clock(node,bfe->child_to_read);
1139 // child we want to read is not available, must set retval to true
1140 retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL);
1141 }
1142 else if (bfe->type == ftnode_fetch_prefetch) {
1143 // makes no sense to have prefetching disabled
1144 // and still call this function
1145 paranoid_invariant(!bfe->disable_prefetching);
1146 int lc = bfe->leftmost_child_wanted(node);
1147 int rc = bfe->rightmost_child_wanted(node);
1148 for (int i = lc; i <= rc; ++i) {
1149 if (BP_STATE(node, i) != PT_AVAIL) {
1150 retval = true;
1151 }
1152 }
1153 } else if (bfe->type == ftnode_fetch_keymatch) {
1154 // we do not take into account prefetching yet
1155 // as of now, if we need a subset, the only thing
1156 // we can possibly require is a single basement node
1157 // we find out what basement node the query cares about
1158 // and check if it is available
1159 if (node->height == 0) {
1160 int left_child = bfe->leftmost_child_wanted(node);
1161 int right_child = bfe->rightmost_child_wanted(node);
1162 if (left_child == right_child) {
1163 bfe->child_to_read = left_child;
1164 unsafe_touch_clock(node,bfe->child_to_read);
1165 // child we want to read is not available, must set retval to true
1166 retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL);
1167 }
1168 }
1169 } else {
1170 // we have a bug. The type should be known
1171 abort();
1172 }
1173 return retval;
1174}
1175
1176static void
1177ft_status_update_partial_fetch_reason(
1178 ftnode_fetch_extra *bfe,
1179 int childnum,
1180 enum pt_state state,
1181 bool is_leaf
1182 )
1183{
1184 invariant(state == PT_COMPRESSED || state == PT_ON_DISK);
1185 if (is_leaf) {
1186 if (bfe->type == ftnode_fetch_prefetch) {
1187 if (state == PT_COMPRESSED) {
1188 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH, 1);
1189 } else {
1190 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1);
1191 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, bfe->bytes_read);
1192 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->io_time);
1193 }
1194 } else if (bfe->type == ftnode_fetch_all) {
1195 if (state == PT_COMPRESSED) {
1196 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE, 1);
1197 } else {
1198 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1);
1199 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_WRITE, bfe->bytes_read);
1200 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->io_time);
1201 }
1202 } else if (childnum == bfe->child_to_read) {
1203 if (state == PT_COMPRESSED) {
1204 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, 1);
1205 } else {
1206 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1);
1207 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_NORMAL, bfe->bytes_read);
1208 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->io_time);
1209 }
1210 } else {
1211 if (state == PT_COMPRESSED) {
1212 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE, 1);
1213 } else {
1214 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1);
1215 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, bfe->bytes_read);
1216 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->io_time);
1217 }
1218 }
1219 }
1220 else {
1221 if (bfe->type == ftnode_fetch_prefetch) {
1222 if (state == PT_COMPRESSED) {
1223 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, 1);
1224 } else {
1225 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1);
1226 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, bfe->bytes_read);
1227 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->io_time);
1228 }
1229 } else if (bfe->type == ftnode_fetch_all) {
1230 if (state == PT_COMPRESSED) {
1231 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, 1);
1232 } else {
1233 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1);
1234 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, bfe->bytes_read);
1235 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->io_time);
1236 }
1237 } else if (childnum == bfe->child_to_read) {
1238 if (state == PT_COMPRESSED) {
1239 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL, 1);
1240 } else {
1241 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1);
1242 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, bfe->bytes_read);
1243 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->io_time);
1244 }
1245 } else {
1246 if (state == PT_COMPRESSED) {
1247 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, 1);
1248 } else {
1249 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1);
1250 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->bytes_read);
1251 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->io_time);
1252 }
1253 }
1254 }
1255}
1256
1257void toku_ft_status_update_serialize_times(FTNODE node, tokutime_t serialize_time, tokutime_t compress_time) {
1258 if (node->height == 0) {
1259 FT_STATUS_INC(FT_LEAF_SERIALIZE_TOKUTIME, serialize_time);
1260 FT_STATUS_INC(FT_LEAF_COMPRESS_TOKUTIME, compress_time);
1261 } else {
1262 FT_STATUS_INC(FT_NONLEAF_SERIALIZE_TOKUTIME, serialize_time);
1263 FT_STATUS_INC(FT_NONLEAF_COMPRESS_TOKUTIME, compress_time);
1264 }
1265}
1266
1267void toku_ft_status_update_deserialize_times(FTNODE node, tokutime_t deserialize_time, tokutime_t decompress_time) {
1268 if (node->height == 0) {
1269 FT_STATUS_INC(FT_LEAF_DESERIALIZE_TOKUTIME, deserialize_time);
1270 FT_STATUS_INC(FT_LEAF_DECOMPRESS_TOKUTIME, decompress_time);
1271 } else {
1272 FT_STATUS_INC(FT_NONLEAF_DESERIALIZE_TOKUTIME, deserialize_time);
1273 FT_STATUS_INC(FT_NONLEAF_DECOMPRESS_TOKUTIME, decompress_time);
1274 }
1275}
1276
1277void toku_ft_status_note_msn_discard(void) {
1278 FT_STATUS_INC(FT_MSN_DISCARDS, 1);
1279}
1280
1281void toku_ft_status_note_update(bool broadcast) {
1282 if (broadcast) {
1283 FT_STATUS_INC(FT_UPDATES_BROADCAST, 1);
1284 } else {
1285 FT_STATUS_INC(FT_UPDATES, 1);
1286 }
1287}
1288
1289void toku_ft_status_note_msg_bytes_out(size_t buffsize) {
1290 FT_STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
1291 FT_STATUS_INC(FT_MSG_BYTES_CURR, -buffsize);
1292}
1293void toku_ft_status_note_ftnode(int height, bool created) {
1294 if (created) {
1295 if (height == 0) {
1296 FT_STATUS_INC(FT_CREATE_LEAF, 1);
1297 } else {
1298 FT_STATUS_INC(FT_CREATE_NONLEAF, 1);
1299 }
1300 } else {
1301 // created = false means destroyed
1302 }
1303}
1304
1305// callback for partially reading a node
1306// could have just used toku_ftnode_fetch_callback, but wanted to separate the two cases to separate functions
1307int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
1308 int r = 0;
1309 FTNODE node = (FTNODE) ftnode_pv;
1310 FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data;
1311 ftnode_fetch_extra *bfe = (ftnode_fetch_extra *) read_extraargs;
1312 // there must be a reason this is being called. If we get a garbage type or the type is ftnode_fetch_none,
1313 // then something went wrong
1314 assert((bfe->type == ftnode_fetch_subset) || (bfe->type == ftnode_fetch_all) || (bfe->type == ftnode_fetch_prefetch) || (bfe->type == ftnode_fetch_keymatch));
1315 // determine the range to prefetch
1316 int lc, rc;
1317 if (!bfe->disable_prefetching &&
1318 (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch)
1319 )
1320 {
1321 lc = bfe->leftmost_child_wanted(node);
1322 rc = bfe->rightmost_child_wanted(node);
1323 } else {
1324 lc = -1;
1325 rc = -1;
1326 }
1327 for (int i = 0; i < node->n_children; i++) {
1328 if (BP_STATE(node,i) == PT_AVAIL) {
1329 continue;
1330 }
1331 if ((lc <= i && i <= rc) || bfe->wants_child_available(i)) {
1332 enum pt_state state = BP_STATE(node, i);
1333 if (state == PT_COMPRESSED) {
1334 r = toku_deserialize_bp_from_compressed(node, i, bfe);
1335 } else {
1336 invariant(state == PT_ON_DISK);
1337 r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
1338 }
1339 ft_status_update_partial_fetch_reason(bfe, i, state, (node->height == 0));
1340 }
1341
1342 if (r != 0) {
1343 if (r == TOKUDB_BAD_CHECKSUM) {
1344 fprintf(stderr,
1345 "Checksum failure while reading node partition in file %s.\n",
1346 toku_cachefile_fname_in_env(bfe->ft->cf));
1347 } else {
1348 fprintf(stderr,
1349 "Error while reading node partition %d\n",
1350 get_maybe_error_errno());
1351 }
1352 abort();
1353 }
1354 }
1355
1356 *sizep = make_ftnode_pair_attr(node);
1357
1358 return 0;
1359}
1360
1361int toku_msg_leafval_heaviside(DBT const &kdbt, const struct toku_msg_leafval_heaviside_extra &be) {
1362 return be.cmp(&kdbt, be.key);
1363}
1364
1365static void
1366ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp)
1367// Effect: Create a new root node whose two children are the split of oldroot.
1368// oldroot is unpinned in the process.
1369// Leave the new root pinned.
1370{
1371 FTNODE newroot;
1372
1373 BLOCKNUM old_blocknum = oldroot->blocknum;
1374 uint32_t old_fullhash = oldroot->fullhash;
1375
1376 int new_height = oldroot->height+1;
1377 uint32_t new_fullhash;
1378 BLOCKNUM new_blocknum;
1379
1380 cachetable_put_empty_node_with_dep_nodes(
1381 ft,
1382 1,
1383 &oldroot,
1384 &new_blocknum,
1385 &new_fullhash,
1386 &newroot
1387 );
1388
1389 assert(newroot);
1390 assert(new_height > 0);
1391 toku_initialize_empty_ftnode (
1392 newroot,
1393 new_blocknum,
1394 new_height,
1395 1,
1396 ft->h->layout_version,
1397 ft->h->flags
1398 );
1399 newroot->fullhash = new_fullhash;
1400 MSN msna = oldroot->max_msn_applied_to_node_on_disk;
1401 newroot->max_msn_applied_to_node_on_disk = msna;
1402 BP_STATE(newroot,0) = PT_AVAIL;
1403 newroot->dirty = 1;
1404
1405 // Set the first child to have the new blocknum,
1406 // and then swap newroot with oldroot. The new root
1407 // will inherit the hash/blocknum/pair from oldroot,
1408 // keeping the root blocknum constant.
1409 BP_BLOCKNUM(newroot, 0) = new_blocknum;
1410 toku_ftnode_swap_pair_values(newroot, oldroot);
1411
1412 toku_ft_split_child(
1413 ft,
1414 newroot,
1415 0, // childnum to split
1416 oldroot,
1417 SPLIT_EVENLY
1418 );
1419
1420 // ft_split_child released locks on newroot
1421 // and oldroot, so now we repin and
1422 // return to caller
1423 ftnode_fetch_extra bfe;
1424 bfe.create_for_full_read(ft);
1425 toku_pin_ftnode(
1426 ft,
1427 old_blocknum,
1428 old_fullhash,
1429 &bfe,
1430 PL_WRITE_EXPENSIVE, // may_modify_node
1431 newrootp,
1432 true
1433 );
1434}
1435
1436static void inject_message_in_locked_node(
1437 FT ft,
1438 FTNODE node,
1439 int childnum,
1440 const ft_msg &msg,
1441 size_t flow_deltas[],
1442 txn_gc_info *gc_info
1443 )
1444{
1445 // No guarantee that we're the writer, but oh well.
1446 // TODO(leif): Implement "do I have the lock or is it someone else?"
1447 // check in frwlock. Should be possible with TOKU_PTHREAD_DEBUG, nop
1448 // otherwise.
1449 invariant(toku_ctpair_is_write_locked(node->ct_pair));
1450 toku_ftnode_assert_fully_in_memory(node);
1451
1452 // Take the newer of the two oldest referenced xid values from the node and gc_info.
1453 // The gc_info usually has a newer value, because we got it at the top of this call
1454 // stack from the txn manager. But sometimes the node has a newer value, if some
1455 // other thread sees a newer value and writes to this node before we got the lock.
1456 if (gc_info->oldest_referenced_xid_for_implicit_promotion > node->oldest_referenced_xid_known) {
1457 node->oldest_referenced_xid_known = gc_info->oldest_referenced_xid_for_implicit_promotion;
1458 } else if (gc_info->oldest_referenced_xid_for_implicit_promotion < node->oldest_referenced_xid_known) {
1459 gc_info->oldest_referenced_xid_for_implicit_promotion = node->oldest_referenced_xid_known;
1460 }
1461
1462 // Get the MSN from the header. Now that we have a write lock on the
1463 // node we're injecting into, we know no other thread will get an MSN
1464 // after us and get that message into our subtree before us.
1465 MSN msg_msn = { .msn = toku_sync_add_and_fetch(&ft->h->max_msn_in_ft.msn, 1) };
1466 ft_msg msg_with_msn(msg.kdbt(), msg.vdbt(), msg.type(), msg_msn, msg.xids());
1467 paranoid_invariant(msg_with_msn.msn().msn > node->max_msn_applied_to_node_on_disk.msn);
1468
1469 STAT64INFO_S stats_delta = { 0,0 };
1470 int64_t logical_rows_delta = 0;
1471 toku_ftnode_put_msg(
1472 ft->cmp,
1473 ft->update_fun,
1474 node,
1475 childnum,
1476 msg_with_msn,
1477 true,
1478 gc_info,
1479 flow_deltas,
1480 &stats_delta,
1481 &logical_rows_delta);
1482 if (stats_delta.numbytes || stats_delta.numrows) {
1483 toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
1484 }
1485 toku_ft_adjust_logical_row_count(ft, logical_rows_delta);
1486 //
1487 // assumption is that toku_ftnode_put_msg will
1488 // mark the node as dirty.
1489 // enforcing invariant here.
1490 //
1491 paranoid_invariant(node->dirty != 0);
1492
1493 // update some status variables
1494 if (node->height != 0) {
1495 size_t msgsize = msg.total_size();
1496 FT_STATUS_INC(FT_MSG_BYTES_IN, msgsize);
1497 FT_STATUS_INC(FT_MSG_BYTES_CURR, msgsize);
1498 FT_STATUS_INC(FT_MSG_NUM, 1);
1499 if (ft_msg_type_applies_all(msg.type())) {
1500 FT_STATUS_INC(FT_MSG_NUM_BROADCAST, 1);
1501 }
1502 }
1503
1504 // verify that msn of latest message was captured in root node
1505 paranoid_invariant(msg_with_msn.msn().msn == node->max_msn_applied_to_node_on_disk.msn);
1506
1507 if (node->blocknum.b == ft->rightmost_blocknum.b) {
1508 if (toku_unsafe_fetch(&ft->seqinsert_score) < FT_SEQINSERT_SCORE_THRESHOLD) {
1509 // we promoted to the rightmost leaf node and the seqinsert score has not yet saturated.
1510 toku_sync_fetch_and_add(&ft->seqinsert_score, 1);
1511 }
1512 } else if (toku_unsafe_fetch(&ft->seqinsert_score) != 0) {
1513 // we promoted to something other than the rightmost leaf node and the score should reset
1514 toku_unsafe_set(&ft->seqinsert_score, static_cast<uint32_t>(0));
1515 }
1516
1517 // if we call toku_ft_flush_some_child, then that function unpins the root
1518 // otherwise, we unpin ourselves
1519 if (node->height > 0 && toku_ftnode_nonleaf_is_gorged(node, ft->h->nodesize)) {
1520 toku_ft_flush_node_on_background_thread(ft, node);
1521 }
1522 else {
1523 toku_unpin_ftnode(ft, node);
1524 }
1525}
1526
1527// seqinsert_loc is a bitmask.
1528// The root counts as being both on the "left extreme" and on the "right extreme".
1529// Therefore, at the root, you're at LEFT_EXTREME | RIGHT_EXTREME.
1530typedef char seqinsert_loc;
1531static const seqinsert_loc NEITHER_EXTREME = 0;
1532static const seqinsert_loc LEFT_EXTREME = 1;
1533static const seqinsert_loc RIGHT_EXTREME = 2;
1534
1535static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int childnum, seqinsert_loc loc)
1536// Effect:
1537// If child needs to be split or merged, do that.
1538// parent and child will be unlocked if this happens
1539// Requires: parent and child are read locked
1540// Returns:
1541// true if relocking is needed
1542// false otherwise
1543{
1544 enum reactivity re = toku_ftnode_get_reactivity(ft, child);
1545 enum reactivity newre;
1546 BLOCKNUM child_blocknum;
1547 uint32_t child_fullhash;
1548 switch (re) {
1549 case RE_STABLE:
1550 return false;
1551 case RE_FISSIBLE:
1552 {
1553 // We only have a read lock on the parent. We need to drop both locks, and get write locks.
1554 BLOCKNUM parent_blocknum = parent->blocknum;
1555 uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum);
1556 int parent_height = parent->height;
1557 int parent_n_children = parent->n_children;
1558 toku_unpin_ftnode_read_only(ft, child);
1559 toku_unpin_ftnode_read_only(ft, parent);
1560 ftnode_fetch_extra bfe;
1561 bfe.create_for_full_read(ft);
1562 FTNODE newparent, newchild;
1563 toku_pin_ftnode(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, &newparent, true);
1564 if (newparent->height != parent_height || newparent->n_children != parent_n_children ||
1565 childnum >= newparent->n_children || toku_bnc_n_entries(BNC(newparent, childnum))) {
1566 // If the height changed or childnum is now off the end, something clearly got split or merged out from under us.
1567 // If something got injected in this node, then it got split or merged and we shouldn't be splitting it.
1568 // But we already unpinned the child so we need to have the caller re-try the pins.
1569 toku_unpin_ftnode_read_only(ft, newparent);
1570 return true;
1571 }
1572 // It's ok to reuse the same childnum because if we get something
1573 // else we need to split, well, that's crazy, but let's go ahead
1574 // and split it.
1575 child_blocknum = BP_BLOCKNUM(newparent, childnum);
1576 child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum);
1577 toku_pin_ftnode_with_dep_nodes(ft, child_blocknum, child_fullhash, &bfe, PL_WRITE_CHEAP, 1, &newparent, &newchild, true);
1578 newre = toku_ftnode_get_reactivity(ft, newchild);
1579 if (newre == RE_FISSIBLE) {
1580 enum split_mode split_mode;
1581 if (newparent->height == 1 && (loc & LEFT_EXTREME) && childnum == 0) {
1582 split_mode = SPLIT_RIGHT_HEAVY;
1583 } else if (newparent->height == 1 && (loc & RIGHT_EXTREME) && childnum == newparent->n_children - 1) {
1584 split_mode = SPLIT_LEFT_HEAVY;
1585 } else {
1586 split_mode = SPLIT_EVENLY;
1587 }
1588 toku_ft_split_child(ft, newparent, childnum, newchild, split_mode);
1589 } else {
1590 // some other thread already got it, just unpin and tell the
1591 // caller to retry
1592 toku_unpin_ftnode_read_only(ft, newchild);
1593 toku_unpin_ftnode_read_only(ft, newparent);
1594 }
1595 return true;
1596 }
1597 case RE_FUSIBLE:
1598 {
1599 if (parent->height == 1) {
1600 // prevent re-merging of recently unevenly-split nodes
1601 if (((loc & LEFT_EXTREME) && childnum <= 1) ||
1602 ((loc & RIGHT_EXTREME) && childnum >= parent->n_children - 2)) {
1603 return false;
1604 }
1605 }
1606
1607 int parent_height = parent->height;
1608 BLOCKNUM parent_blocknum = parent->blocknum;
1609 uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum);
1610 toku_unpin_ftnode_read_only(ft, child);
1611 toku_unpin_ftnode_read_only(ft, parent);
1612 ftnode_fetch_extra bfe;
1613 bfe.create_for_full_read(ft);
1614 FTNODE newparent, newchild;
1615 toku_pin_ftnode(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, &newparent, true);
1616 if (newparent->height != parent_height || childnum >= newparent->n_children) {
1617 // looks like this is the root and it got merged, let's just start over (like in the split case above)
1618 toku_unpin_ftnode_read_only(ft, newparent);
1619 return true;
1620 }
1621 child_blocknum = BP_BLOCKNUM(newparent, childnum);
1622 child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum);
1623 toku_pin_ftnode_with_dep_nodes(ft, child_blocknum, child_fullhash, &bfe, PL_READ, 1, &newparent, &newchild, true);
1624 newre = toku_ftnode_get_reactivity(ft, newchild);
1625 if (newre == RE_FUSIBLE && newparent->n_children >= 2) {
1626 toku_unpin_ftnode_read_only(ft, newchild);
1627 toku_ft_merge_child(ft, newparent, childnum);
1628 } else {
1629 // Could be a weird case where newparent has only one
1630 // child. In this case, we want to inject here but we've
1631 // already unpinned the caller's copy of parent so we have
1632 // to ask them to re-pin, or they could (very rarely)
1633 // dereferenced memory in a freed node. TODO: we could
1634 // give them back the copy of the parent we pinned.
1635 //
1636 // Otherwise, some other thread already got it, just unpin
1637 // and tell the caller to retry
1638 toku_unpin_ftnode_read_only(ft, newchild);
1639 toku_unpin_ftnode_read_only(ft, newparent);
1640 }
1641 return true;
1642 }
1643 }
1644 abort();
1645}
1646
1647static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, const ft_msg &msg, size_t flow_deltas[], txn_gc_info *gc_info)
1648// Effect:
1649// Inject message into the node at this blocknum (cachekey).
1650// Gets a write lock on the node for you.
1651{
1652 toku::context inject_ctx(CTX_MESSAGE_INJECTION);
1653 FTNODE node;
1654 ftnode_fetch_extra bfe;
1655 bfe.create_for_full_read(ft);
1656 toku_pin_ftnode(ft, cachekey, fullhash, &bfe, PL_WRITE_CHEAP, &node, true);
1657 toku_ftnode_assert_fully_in_memory(node);
1658 paranoid_invariant(node->fullhash==fullhash);
1659 ft_verify_flags(ft, node);
1660 inject_message_in_locked_node(ft, node, -1, msg, flow_deltas, gc_info);
1661}
1662
1663__attribute__((const))
1664static inline bool should_inject_in_node(seqinsert_loc loc, int height, int depth)
1665// We should inject directly in a node if:
1666// - it's a leaf, or
1667// - it's a height 1 node not at either extreme, or
1668// - it's a depth 2 node not at either extreme
1669{
1670 return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2)));
1671}
1672
1673static void ft_verify_or_set_rightmost_blocknum(FT ft, BLOCKNUM b)
1674// Given: 'b', the _definitive_ and constant rightmost blocknum of 'ft'
1675{
1676 if (toku_unsafe_fetch(&ft->rightmost_blocknum.b) == RESERVED_BLOCKNUM_NULL) {
1677 toku_ft_lock(ft);
1678 if (ft->rightmost_blocknum.b == RESERVED_BLOCKNUM_NULL) {
1679 toku_unsafe_set(&ft->rightmost_blocknum, b);
1680 }
1681 toku_ft_unlock(ft);
1682 }
1683 // The rightmost blocknum only transitions from RESERVED_BLOCKNUM_NULL to non-null.
1684 // If it's already set, verify that the stored value is consistent with 'b'
1685 invariant(toku_unsafe_fetch(&ft->rightmost_blocknum.b) == b.b);
1686}
1687
1688bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) {
1689 static const double factor = 0.125;
1690 const uint64_t flow_threshold = ft->h->nodesize * factor;
1691 return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold;
1692}
1693
1694static void push_something_in_subtree(
1695 FT ft,
1696 FTNODE subtree_root,
1697 int target_childnum,
1698 const ft_msg &msg,
1699 size_t flow_deltas[],
1700 txn_gc_info *gc_info,
1701 int depth,
1702 seqinsert_loc loc,
1703 bool just_did_split_or_merge
1704 )
1705// Effects:
1706// Assign message an MSN from ft->h.
1707// Put message in the subtree rooted at node. Due to promotion the message may not be injected directly in this node.
1708// Unlock node or schedule it to be unlocked (after a background flush).
1709// Either way, the caller is not responsible for unlocking node.
1710// Requires:
1711// subtree_root is read locked and fully in memory.
1712// Notes:
1713// In Ming, the basic rules of promotion are as follows:
1714// Don't promote broadcast messages.
1715// Don't promote past non-empty buffers.
1716// Otherwise, promote at most to height 1 or depth 2 (whichever is highest), as far as the birdie asks you to promote.
1717// We don't promote to leaves because injecting into leaves is expensive, mostly because of #5605 and some of #5552.
1718// We don't promote past depth 2 because we found that gives us enough parallelism without costing us too much pinning work.
1719//
1720// This is true with the following caveats:
1721// We always promote all the way to the leaves on the rightmost and leftmost edges of the tree, for sequential insertions.
1722// (That means we can promote past depth 2 near the edges of the tree.)
1723//
1724// When the birdie is still saying we should promote, we use get_and_pin so that we wait to get the node.
1725// If the birdie doesn't say to promote, we try maybe_get_and_pin. If we get the node cheaply, and it's dirty, we promote anyway.
1726{
1727 toku_ftnode_assert_fully_in_memory(subtree_root);
1728 if (should_inject_in_node(loc, subtree_root->height, depth)) {
1729 switch (depth) {
1730 case 0:
1731 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break;
1732 case 1:
1733 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break;
1734 case 2:
1735 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break;
1736 case 3:
1737 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break;
1738 default:
1739 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
1740 }
1741 // If the target node is a non-root leaf node on the right extreme,
1742 // set the rightmost blocknum. We know there are no messages above us
1743 // because promotion would not chose to inject directly into this leaf
1744 // otherwise. We explicitly skip the root node because then we don't have
1745 // to worry about changing the rightmost blocknum when the root splits.
1746 if (subtree_root->height == 0 && loc == RIGHT_EXTREME && subtree_root->blocknum.b != ft->h->root_blocknum.b) {
1747 ft_verify_or_set_rightmost_blocknum(ft, subtree_root->blocknum);
1748 }
1749 inject_message_in_locked_node(ft, subtree_root, target_childnum, msg, flow_deltas, gc_info);
1750 } else {
1751 int r;
1752 int childnum;
1753 NONLEAF_CHILDINFO bnc;
1754
1755 // toku_ft_root_put_msg should not have called us otherwise.
1756 paranoid_invariant(ft_msg_type_applies_once(msg.type()));
1757
1758 childnum = (target_childnum >= 0 ? target_childnum
1759 : toku_ftnode_which_child(subtree_root, msg.kdbt(), ft->cmp));
1760 bnc = BNC(subtree_root, childnum);
1761
1762 if (toku_bnc_n_entries(bnc) > 0) {
1763 // The buffer is non-empty, give up on promoting.
1764 FT_STATUS_INC(FT_PRO_NUM_STOP_NONEMPTY_BUF, 1);
1765 goto relock_and_push_here;
1766 }
1767
1768 seqinsert_loc next_loc;
1769 if ((loc & LEFT_EXTREME) && childnum == 0) {
1770 next_loc = LEFT_EXTREME;
1771 } else if ((loc & RIGHT_EXTREME) && childnum == subtree_root->n_children - 1) {
1772 next_loc = RIGHT_EXTREME;
1773 } else {
1774 next_loc = NEITHER_EXTREME;
1775 }
1776
1777 if (next_loc == NEITHER_EXTREME && subtree_root->height <= 1) {
1778 // Never promote to leaf nodes except on the edges
1779 FT_STATUS_INC(FT_PRO_NUM_STOP_H1, 1);
1780 goto relock_and_push_here;
1781 }
1782
1783 {
1784 const BLOCKNUM child_blocknum = BP_BLOCKNUM(subtree_root, childnum);
1785 ft->blocktable.verify_blocknum_allocated(child_blocknum);
1786 const uint32_t child_fullhash = toku_cachetable_hash(ft->cf, child_blocknum);
1787
1788 FTNODE child;
1789 {
1790 const int child_height = subtree_root->height - 1;
1791 const int child_depth = depth + 1;
1792 // If we're locking a leaf, or a height 1 node or depth 2
1793 // node in the middle, we know we won't promote further
1794 // than that, so just get a write lock now.
1795 const pair_lock_type lock_type = (should_inject_in_node(next_loc, child_height, child_depth)
1796 ? PL_WRITE_CHEAP
1797 : PL_READ);
1798 if (next_loc != NEITHER_EXTREME || (toku_bnc_should_promote(ft, bnc) && depth <= 1)) {
1799 // If we're on either extreme, or the birdie wants to
1800 // promote and we're in the top two levels of the
1801 // tree, don't stop just because someone else has the
1802 // node locked.
1803 ftnode_fetch_extra bfe;
1804 bfe.create_for_full_read(ft);
1805 if (lock_type == PL_WRITE_CHEAP) {
1806 // We intend to take the write lock for message injection
1807 toku::context inject_ctx(CTX_MESSAGE_INJECTION);
1808 toku_pin_ftnode(ft, child_blocknum, child_fullhash, &bfe, lock_type, &child, true);
1809 } else {
1810 // We're going to keep promoting
1811 toku::context promo_ctx(CTX_PROMO);
1812 toku_pin_ftnode(ft, child_blocknum, child_fullhash, &bfe, lock_type, &child, true);
1813 }
1814 } else {
1815 r = toku_maybe_pin_ftnode_clean(ft, child_blocknum, child_fullhash, lock_type, &child);
1816 if (r != 0) {
1817 // We couldn't get the child cheaply, so give up on promoting.
1818 FT_STATUS_INC(FT_PRO_NUM_STOP_LOCK_CHILD, 1);
1819 goto relock_and_push_here;
1820 }
1821 if (toku_ftnode_fully_in_memory(child)) {
1822 // toku_pin_ftnode... touches the clock but toku_maybe_pin_ftnode... doesn't.
1823 // This prevents partial eviction.
1824 for (int i = 0; i < child->n_children; ++i) {
1825 BP_TOUCH_CLOCK(child, i);
1826 }
1827 } else {
1828 // We got the child, but it's not fully in memory. Give up on promoting.
1829 FT_STATUS_INC(FT_PRO_NUM_STOP_CHILD_INMEM, 1);
1830 goto unlock_child_and_push_here;
1831 }
1832 }
1833 }
1834 paranoid_invariant_notnull(child);
1835
1836 if (!just_did_split_or_merge) {
1837 BLOCKNUM subtree_root_blocknum = subtree_root->blocknum;
1838 uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum);
1839 const bool did_split_or_merge = process_maybe_reactive_child(ft, subtree_root, child, childnum, loc);
1840 if (did_split_or_merge) {
1841 // Need to re-pin this node and try at this level again.
1842 FTNODE newparent;
1843 ftnode_fetch_extra bfe;
1844 bfe.create_for_full_read(ft); // should be fully in memory, we just split it
1845 toku_pin_ftnode(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, &newparent, true);
1846 push_something_in_subtree(ft, newparent, -1, msg, flow_deltas, gc_info, depth, loc, true);
1847 return;
1848 }
1849 }
1850
1851 if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) {
1852 push_something_in_subtree(ft, child, -1, msg, flow_deltas, gc_info, depth + 1, next_loc, false);
1853 toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]);
1854 // The recursive call unpinned the child, but
1855 // we're responsible for unpinning subtree_root.
1856 toku_unpin_ftnode_read_only(ft, subtree_root);
1857 return;
1858 }
1859
1860 FT_STATUS_INC(FT_PRO_NUM_DIDNT_WANT_PROMOTE, 1);
1861 unlock_child_and_push_here:
1862 // We locked the child, but we decided not to promote.
1863 // Unlock the child, and fall through to the next case.
1864 toku_unpin_ftnode_read_only(ft, child);
1865 }
1866 relock_and_push_here:
1867 // Give up on promoting.
1868 // We have subtree_root read-locked and we don't have a child locked.
1869 // Drop the read lock, grab a write lock, and inject here.
1870 {
1871 // Right now we have a read lock on subtree_root, but we want
1872 // to inject into it so we get a write lock instead.
1873 BLOCKNUM subtree_root_blocknum = subtree_root->blocknum;
1874 uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum);
1875 toku_unpin_ftnode_read_only(ft, subtree_root);
1876 switch (depth) {
1877 case 0:
1878 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break;
1879 case 1:
1880 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break;
1881 case 2:
1882 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break;
1883 case 3:
1884 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break;
1885 default:
1886 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
1887 }
1888 inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, msg, flow_deltas, gc_info);
1889 }
1890 }
1891}
1892
1893void toku_ft_root_put_msg(
1894 FT ft,
1895 const ft_msg &msg,
1896 txn_gc_info *gc_info
1897 )
1898// Effect:
1899// - assign msn to message and update msn in the header
1900// - push the message into the ft
1901
1902// As of Clayface, the root blocknum is a constant, so preventing a
1903// race between message injection and the split of a root is the job
1904// of the cachetable's locking rules.
1905//
1906// We also hold the MO lock for a number of reasons, but an important
1907// one is to make sure that a begin_checkpoint may not start while
1908// this code is executing. A begin_checkpoint does (at least) two things
1909// that can interfere with the operations here:
1910// - Copies the header to a checkpoint header. Because we may change
1911// the max_msn_in_ft below, we don't want the header to be copied in
1912// the middle of these operations.
1913// - Takes note of the log's LSN. Because this put operation has
1914// already been logged, this message injection must be included
1915// in any checkpoint that contains this put's logentry.
1916// Holding the mo lock throughout this function ensures that fact.
1917{
1918 toku::context promo_ctx(CTX_PROMO);
1919
1920 // blackhole fractal trees drop all messages, so do nothing.
1921 if (ft->blackhole) {
1922 return;
1923 }
1924
1925 FTNODE node;
1926
1927 uint32_t fullhash;
1928 CACHEKEY root_key;
1929 toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
1930 ftnode_fetch_extra bfe;
1931 bfe.create_for_full_read(ft);
1932
1933 size_t flow_deltas[] = { message_buffer::msg_memsize_in_buffer(msg), 0 };
1934
1935 pair_lock_type lock_type;
1936 lock_type = PL_READ; // try first for a read lock
1937 // If we need to split the root, we'll have to change from a read lock
1938 // to a write lock and check again. We change the variable lock_type
1939 // and jump back to here.
1940 change_lock_type:
1941 // get the root node
1942 toku_pin_ftnode(ft, root_key, fullhash, &bfe, lock_type, &node, true);
1943 toku_ftnode_assert_fully_in_memory(node);
1944 paranoid_invariant(node->fullhash==fullhash);
1945 ft_verify_flags(ft, node);
1946
1947 // First handle a reactive root.
1948 // This relocking for split algorithm will cause every message
1949 // injection thread to change lock type back and forth, when only one
1950 // of them needs to in order to handle the split. That's not great,
1951 // but root splits are incredibly rare.
1952 enum reactivity re = toku_ftnode_get_reactivity(ft, node);
1953 switch (re) {
1954 case RE_STABLE:
1955 case RE_FUSIBLE: // cannot merge anything at the root
1956 if (lock_type != PL_READ) {
1957 // We thought we needed to split, but someone else got to
1958 // it before us. Downgrade to a read lock.
1959 toku_unpin_ftnode_read_only(ft, node);
1960 lock_type = PL_READ;
1961 goto change_lock_type;
1962 }
1963 break;
1964 case RE_FISSIBLE:
1965 if (lock_type == PL_READ) {
1966 // Here, we only have a read lock on the root. In order
1967 // to split it, we need a write lock, but in the course of
1968 // gaining the write lock, someone else may have gotten in
1969 // before us and split it. So we upgrade to a write lock
1970 // and check again.
1971 toku_unpin_ftnode_read_only(ft, node);
1972 lock_type = PL_WRITE_CHEAP;
1973 goto change_lock_type;
1974 } else {
1975 // We have a write lock, now we can split.
1976 ft_init_new_root(ft, node, &node);
1977 // Then downgrade back to a read lock, and we can finally
1978 // do the injection.
1979 toku_unpin_ftnode(ft, node);
1980 lock_type = PL_READ;
1981 FT_STATUS_INC(FT_PRO_NUM_ROOT_SPLIT, 1);
1982 goto change_lock_type;
1983 }
1984 break;
1985 }
1986 // If we get to here, we have a read lock and the root doesn't
1987 // need to be split. It's safe to inject the message.
1988 paranoid_invariant(lock_type == PL_READ);
1989 // We cannot assert that we have the read lock because frwlock asserts
1990 // that its mutex is locked when we check if there are any readers.
1991 // That wouldn't give us a strong guarantee that we have the read lock
1992 // anyway.
1993
1994 // Now, either inject here or promote. We decide based on a heuristic:
1995 if (node->height == 0 || !ft_msg_type_applies_once(msg.type())) {
1996 // If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here.
1997 toku_unpin_ftnode_read_only(ft, node);
1998 FT_STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1);
1999 inject_message_at_this_blocknum(ft, root_key, fullhash, msg, flow_deltas, gc_info);
2000 } else if (node->height > 1) {
2001 // If the root's above height 1, we are definitely eligible for promotion.
2002 push_something_in_subtree(ft, node, -1, msg, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
2003 } else {
2004 // The root's height 1. We may be eligible for promotion here.
2005 // On the extremes, we want to promote, in the middle, we don't.
2006 int childnum = toku_ftnode_which_child(node, msg.kdbt(), ft->cmp);
2007 if (childnum == 0 || childnum == node->n_children - 1) {
2008 // On the extremes, promote. We know which childnum we're going to, so pass that down too.
2009 push_something_in_subtree(ft, node, childnum, msg, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
2010 } else {
2011 // At height 1 in the middle, don't promote, drop the read lock and inject here.
2012 toku_unpin_ftnode_read_only(ft, node);
2013 FT_STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1);
2014 inject_message_at_this_blocknum(ft, root_key, fullhash, msg, flow_deltas, gc_info);
2015 }
2016 }
2017}
2018
2019// TODO: Remove me, I'm boring.
2020static int ft_compare_keys(FT ft, const DBT *a, const DBT *b)
2021// Effect: Compare two keys using the given fractal tree's comparator/descriptor
2022{
2023 return ft->cmp(a, b);
2024}
2025
2026static LEAFENTRY bn_get_le_and_key(BASEMENTNODE bn, int idx, DBT *key)
2027// Effect: Gets the i'th leafentry from the given basement node and
2028// fill its key in *key
2029// Requires: The i'th leafentry exists.
2030{
2031 LEAFENTRY le;
2032 uint32_t le_len;
2033 void *le_key;
2034 int r = bn->data_buffer.fetch_klpair(idx, &le, &le_len, &le_key);
2035 invariant_zero(r);
2036 toku_fill_dbt(key, le_key, le_len);
2037 return le;
2038}
2039
2040static LEAFENTRY ft_leaf_leftmost_le_and_key(FTNODE leaf, DBT *leftmost_key)
2041// Effect: If a leftmost key exists in the given leaf, toku_fill_dbt()
2042// the key into *leftmost_key
2043// Requires: Leaf is fully in memory and pinned for read or write.
2044// Return: leafentry if it exists, nullptr otherwise
2045{
2046 for (int i = 0; i < leaf->n_children; i++) {
2047 BASEMENTNODE bn = BLB(leaf, i);
2048 if (bn->data_buffer.num_klpairs() > 0) {
2049 // Get the first (leftmost) leafentry and its key
2050 return bn_get_le_and_key(bn, 0, leftmost_key);
2051 }
2052 }
2053 return nullptr;
2054}
2055
2056static LEAFENTRY ft_leaf_rightmost_le_and_key(FTNODE leaf, DBT *rightmost_key)
2057// Effect: If a rightmost key exists in the given leaf, toku_fill_dbt()
2058// the key into *rightmost_key
2059// Requires: Leaf is fully in memory and pinned for read or write.
2060// Return: leafentry if it exists, nullptr otherwise
2061{
2062 for (int i = leaf->n_children - 1; i >= 0; i--) {
2063 BASEMENTNODE bn = BLB(leaf, i);
2064 size_t num_les = bn->data_buffer.num_klpairs();
2065 if (num_les > 0) {
2066 // Get the last (rightmost) leafentry and its key
2067 return bn_get_le_and_key(bn, num_les - 1, rightmost_key);
2068 }
2069 }
2070 return nullptr;
2071}
2072
2073static int ft_leaf_get_relative_key_pos(FT ft, FTNODE leaf, const DBT *key, bool *nondeleted_key_found, int *target_childnum)
2074// Effect: Determines what the relative position of the given key is with
2075// respect to a leaf node, and if it exists.
2076// Requires: Leaf is fully in memory and pinned for read or write.
2077// Requires: target_childnum is non-null
2078// Return: < 0 if key is less than the leftmost key in the leaf OR the relative position is unknown, for any reason.
2079// 0 if key is in the bounds [leftmost_key, rightmost_key] for this leaf or the leaf is empty
2080// > 0 if key is greater than the rightmost key in the leaf
2081// *nondeleted_key_found is set (if non-null) if the target key was found and is not deleted, unmodified otherwise
2082// *target_childnum is set to the child that (does or would) contain the key, if calculated, unmodified otherwise
2083{
2084 DBT rightmost_key;
2085 LEAFENTRY rightmost_le = ft_leaf_rightmost_le_and_key(leaf, &rightmost_key);
2086 if (rightmost_le == nullptr) {
2087 // If we can't get a rightmost key then the leaf is empty.
2088 // In such a case, we don't have any information about what keys would be in this leaf.
2089 // We have to assume the leaf node that would contain this key is to the left.
2090 return -1;
2091 }
2092 // We have a rightmost leafentry, so it must exist in some child node
2093 invariant(leaf->n_children > 0);
2094
2095 int relative_pos = 0;
2096 int c = ft_compare_keys(ft, key, &rightmost_key);
2097 if (c > 0) {
2098 relative_pos = 1;
2099 *target_childnum = leaf->n_children - 1;
2100 } else if (c == 0) {
2101 if (nondeleted_key_found != nullptr && !le_latest_is_del(rightmost_le)) {
2102 *nondeleted_key_found = true;
2103 }
2104 relative_pos = 0;
2105 *target_childnum = leaf->n_children - 1;
2106 } else {
2107 // The key is less than the rightmost. It may still be in bounds if it's >= the leftmost.
2108 DBT leftmost_key;
2109 LEAFENTRY leftmost_le = ft_leaf_leftmost_le_and_key(leaf, &leftmost_key);
2110 invariant_notnull(leftmost_le); // Must exist because a rightmost exists
2111 c = ft_compare_keys(ft, key, &leftmost_key);
2112 if (c > 0) {
2113 if (nondeleted_key_found != nullptr) {
2114 // The caller wants to know if a nondeleted key can be found.
2115 LEAFENTRY target_le;
2116 int childnum = toku_ftnode_which_child(leaf, key, ft->cmp);
2117 BASEMENTNODE bn = BLB(leaf, childnum);
2118 struct toku_msg_leafval_heaviside_extra extra(ft->cmp, key);
2119 int r = bn->data_buffer.find_zero<decltype(extra), toku_msg_leafval_heaviside>(
2120 extra,
2121 &target_le,
2122 nullptr, nullptr, nullptr
2123 );
2124 *target_childnum = childnum;
2125 if (r == 0 && !le_latest_is_del(target_le)) {
2126 *nondeleted_key_found = true;
2127 }
2128 }
2129 relative_pos = 0;
2130 } else if (c == 0) {
2131 if (nondeleted_key_found != nullptr && !le_latest_is_del(leftmost_le)) {
2132 *nondeleted_key_found = true;
2133 }
2134 relative_pos = 0;
2135 *target_childnum = 0;
2136 } else {
2137 relative_pos = -1;
2138 }
2139 }
2140
2141 return relative_pos;
2142}
2143
2144static void ft_insert_directly_into_leaf(FT ft, FTNODE leaf, int target_childnum, DBT *key, DBT *val,
2145 XIDS message_xids, enum ft_msg_type type, txn_gc_info *gc_info);
2146static int getf_nothing(uint32_t, const void *, uint32_t, const void *, void *, bool);
2147
2148static int ft_maybe_insert_into_rightmost_leaf(FT ft, DBT *key, DBT *val, XIDS message_xids, enum ft_msg_type type,
2149 txn_gc_info *gc_info, bool unique)
2150// Effect: Pins the rightmost leaf node and attempts to do an insert.
2151// There are three reasons why we may not succeed.
2152// - The rightmost leaf is too full and needs a split.
2153// - The key to insert is not within the provable bounds of this leaf node.
2154// - The key is within bounds, but it already exists.
2155// Return: 0 if this function did insert, DB_KEYEXIST if a unique key constraint exists and
2156// some nondeleted leafentry with the same key exists
2157// < 0 if this function did not insert, for a reason other than DB_KEYEXIST.
2158// Note: Treat this function as a possible, but not necessary, optimization for insert.
2159// Rationale: We want O(1) insertions down the rightmost path of the tree.
2160{
2161 int r = -1;
2162
2163 uint32_t rightmost_fullhash;
2164 BLOCKNUM rightmost_blocknum;
2165 FTNODE rightmost_leaf = nullptr;
2166
2167 // Don't do the optimization if our heurstic suggests that
2168 // insertion pattern is not sequential.
2169 if (toku_unsafe_fetch(&ft->seqinsert_score) < FT_SEQINSERT_SCORE_THRESHOLD) {
2170 goto cleanup;
2171 }
2172
2173 // We know the seqinsert score is high enough that we should
2174 // attempt to directly insert into the rightmost leaf. Because
2175 // the score is non-zero, the rightmost blocknum must have been
2176 // set. See inject_message_in_locked_node(), which only increases
2177 // the score if the target node blocknum == rightmost_blocknum
2178 rightmost_blocknum = ft->rightmost_blocknum;
2179 invariant(rightmost_blocknum.b != RESERVED_BLOCKNUM_NULL);
2180
2181 // Pin the rightmost leaf with a write lock.
2182 rightmost_fullhash = toku_cachetable_hash(ft->cf, rightmost_blocknum);
2183 ftnode_fetch_extra bfe;
2184 bfe.create_for_full_read(ft);
2185 toku_pin_ftnode(ft, rightmost_blocknum, rightmost_fullhash, &bfe, PL_WRITE_CHEAP, &rightmost_leaf, true);
2186
2187 // The rightmost blocknum never chances once it is initialized to something
2188 // other than null. Verify that the pinned node has the correct blocknum.
2189 invariant(rightmost_leaf->blocknum.b == rightmost_blocknum.b);
2190
2191 // If the rightmost leaf is reactive, bail out out and let the normal promotion pass
2192 // take care of it. This also ensures that if any of our ancestors are reactive,
2193 // they'll be taken care of too.
2194 if (toku_ftnode_get_leaf_reactivity(rightmost_leaf, ft->h->nodesize) != RE_STABLE) {
2195 FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_FAIL_REACTIVE, 1);
2196 goto cleanup;
2197 }
2198
2199 // The groundwork has been laid for an insertion directly into the rightmost
2200 // leaf node. We know that it is pinned for write, fully in memory, has
2201 // no messages above it, and is not reactive.
2202 //
2203 // Now, two more things must be true for this insertion to actually happen:
2204 // 1. The key to insert is within the bounds of this leafnode, or to the right.
2205 // 2. If there is a uniqueness constraint, it passes.
2206 bool nondeleted_key_found;
2207 int relative_pos;
2208 int target_childnum;
2209
2210 nondeleted_key_found = false;
2211 target_childnum = -1;
2212 relative_pos = ft_leaf_get_relative_key_pos(ft, rightmost_leaf, key,
2213 unique ? &nondeleted_key_found : nullptr,
2214 &target_childnum);
2215 if (relative_pos >= 0) {
2216 FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_SUCCESS, 1);
2217 if (unique && nondeleted_key_found) {
2218 r = DB_KEYEXIST;
2219 } else {
2220 ft_insert_directly_into_leaf(ft, rightmost_leaf, target_childnum,
2221 key, val, message_xids, type, gc_info);
2222 r = 0;
2223 }
2224 } else {
2225 FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_FAIL_POS, 1);
2226 r = -1;
2227 }
2228
2229cleanup:
2230 // If we did the insert, the rightmost leaf was unpinned for us.
2231 if (r != 0 && rightmost_leaf != nullptr) {
2232 toku_unpin_ftnode(ft, rightmost_leaf);
2233 }
2234
2235 return r;
2236}
2237
2238static void ft_txn_log_insert(FT ft, DBT *key, DBT *val, TOKUTXN txn, bool do_logging, enum ft_msg_type type);
2239
2240int toku_ft_insert_unique(FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool do_logging) {
2241// Effect: Insert a unique key-val pair into the fractal tree.
2242// Return: 0 on success, DB_KEYEXIST if the overwrite constraint failed
2243 XIDS message_xids = txn != nullptr ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2244
2245 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2246 txn_manager_state txn_state_for_gc(txn_manager);
2247
2248 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2249 txn_gc_info gc_info(&txn_state_for_gc,
2250 oldest_referenced_xid_estimate,
2251 // no messages above us, we can implicitly promote uxrs based on this xid
2252 oldest_referenced_xid_estimate,
2253 true);
2254 int r = ft_maybe_insert_into_rightmost_leaf(ft_h->ft, key, val, message_xids, FT_INSERT, &gc_info, true);
2255 if (r != 0 && r != DB_KEYEXIST) {
2256 // Default to a regular unique check + insert algorithm if we couldn't
2257 // do it based on the rightmost leaf alone.
2258 int lookup_r = toku_ft_lookup(ft_h, key, getf_nothing, nullptr);
2259 if (lookup_r == DB_NOTFOUND) {
2260 toku_ft_send_insert(ft_h, key, val, message_xids, FT_INSERT, &gc_info);
2261 r = 0;
2262 } else {
2263 r = DB_KEYEXIST;
2264 }
2265 }
2266
2267 if (r == 0) {
2268 ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, FT_INSERT);
2269 toku_ft_adjust_logical_row_count(ft_h->ft, 1);
2270 }
2271 return r;
2272}
2273
2274// Effect: Insert the key-val pair into an ft.
2275void toku_ft_insert (FT_HANDLE ft_handle, DBT *key, DBT *val, TOKUTXN txn) {
2276 toku_ft_maybe_insert(ft_handle, key, val, txn, false, ZERO_LSN, true, FT_INSERT);
2277}
2278
2279void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) {
2280 paranoid_invariant(txn);
2281 toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log
2282 //before the (old) file is actually unlinked
2283 TOKULOGGER logger = toku_txn_logger(txn);
2284
2285 BYTESTRING new_iname_bs = {.len=(uint32_t) strlen(new_iname), .data=(char*)new_iname};
2286 toku_logger_save_rollback_load(txn, old_filenum, &new_iname_bs);
2287 if (do_log && logger) {
2288 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2289 toku_log_load(logger, load_lsn, do_fsync, txn, xid, old_filenum, new_iname_bs);
2290 }
2291}
2292
2293// 2954
2294// this function handles the tasks needed to be recoverable
2295// - write to rollback log
2296// - write to recovery log
2297void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn)
2298{
2299 paranoid_invariant(txn);
2300 TOKULOGGER logger = toku_txn_logger(txn);
2301
2302 // write to the rollback log
2303 toku_logger_save_rollback_hot_index(txn, &filenums);
2304 if (do_log && logger) {
2305 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2306 // write to the recovery log
2307 toku_log_hot_index(logger, hot_index_lsn, do_fsync, txn, xid, filenums);
2308 }
2309}
2310
2311// Effect: Optimize the ft.
2312void toku_ft_optimize (FT_HANDLE ft_h) {
2313 TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf);
2314 if (logger) {
2315 TXNID oldest = toku_txn_manager_get_oldest_living_xid(logger->txn_manager);
2316
2317 XIDS root_xids = toku_xids_get_root_xids();
2318 XIDS message_xids;
2319 if (oldest == TXNID_NONE_LIVING) {
2320 message_xids = root_xids;
2321 }
2322 else {
2323 int r = toku_xids_create_child(root_xids, &message_xids, oldest);
2324 invariant(r == 0);
2325 }
2326
2327 DBT key;
2328 DBT val;
2329 toku_init_dbt(&key);
2330 toku_init_dbt(&val);
2331 ft_msg msg(&key, &val, FT_OPTIMIZE, ZERO_MSN, message_xids);
2332
2333 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2334 txn_manager_state txn_state_for_gc(txn_manager);
2335
2336 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2337 txn_gc_info gc_info(&txn_state_for_gc,
2338 oldest_referenced_xid_estimate,
2339 // no messages above us, we can implicitly promote uxrs based on this xid
2340 oldest_referenced_xid_estimate,
2341 true);
2342 toku_ft_root_put_msg(ft_h->ft, msg, &gc_info);
2343 toku_xids_destroy(&message_xids);
2344 }
2345}
2346
2347void toku_ft_load(FT_HANDLE ft_handle, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) {
2348 FILENUM old_filenum = toku_cachefile_filenum(ft_handle->ft->cf);
2349 int do_log = 1;
2350 toku_ft_load_recovery(txn, old_filenum, new_iname, do_fsync, do_log, load_lsn);
2351}
2352
2353// ft actions for logging hot index filenums
2354void toku_ft_hot_index(FT_HANDLE ft_handle __attribute__ ((unused)), TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn) {
2355 int do_log = 1;
2356 toku_ft_hot_index_recovery(txn, filenums, do_fsync, do_log, lsn);
2357}
2358
2359void
2360toku_ft_log_put (TOKUTXN txn, FT_HANDLE ft_handle, const DBT *key, const DBT *val) {
2361 TOKULOGGER logger = toku_txn_logger(txn);
2362 if (logger) {
2363 BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2364 BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2365 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2366 toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_handle->ft->cf), xid, keybs, valbs);
2367 }
2368}
2369
2370void
2371toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *fts, uint32_t num_fts, const DBT *key, const DBT *val) {
2372 assert(txn);
2373 assert(num_fts > 0);
2374 TOKULOGGER logger = toku_txn_logger(txn);
2375 if (logger) {
2376 FILENUM fnums[num_fts];
2377 uint32_t i;
2378 for (i = 0; i < num_fts; i++) {
2379 fnums[i] = toku_cachefile_filenum(fts[i]->ft->cf);
2380 }
2381 FILENUMS filenums = {.num = num_fts, .filenums = fnums};
2382 BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2383 BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2384 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2385 FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
2386 toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
2387 }
2388}
2389
2390TXN_MANAGER toku_ft_get_txn_manager(FT_HANDLE ft_h) {
2391 TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf);
2392 return logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
2393}
2394
2395TXNID toku_ft_get_oldest_referenced_xid_estimate(FT_HANDLE ft_h) {
2396 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2397 return txn_manager != nullptr ? toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager) : TXNID_NONE;
2398}
2399
2400static void ft_txn_log_insert(FT ft, DBT *key, DBT *val, TOKUTXN txn, bool do_logging, enum ft_msg_type type) {
2401 paranoid_invariant(type == FT_INSERT || type == FT_INSERT_NO_OVERWRITE);
2402
2403 //By default use committed messages
2404 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2405 if (txn) {
2406 BYTESTRING keybs = {key->size, (char *) key->data};
2407 toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft->cf), &keybs);
2408 toku_txn_maybe_note_ft(txn, ft);
2409 }
2410 TOKULOGGER logger = toku_txn_logger(txn);
2411 if (do_logging && logger) {
2412 BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2413 BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2414 if (type == FT_INSERT) {
2415 toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft->cf), xid, keybs, valbs);
2416 }
2417 else {
2418 toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft->cf), xid, keybs, valbs);
2419 }
2420 }
2421}
2422
2423void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) {
2424 ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, type);
2425
2426 LSN treelsn;
2427 if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2428 // do nothing
2429 } else {
2430 XIDS message_xids = txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2431
2432 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2433 txn_manager_state txn_state_for_gc(txn_manager);
2434
2435 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2436 txn_gc_info gc_info(&txn_state_for_gc,
2437 oldest_referenced_xid_estimate,
2438 // no messages above us, we can implicitly promote uxrs based on this xid
2439 oldest_referenced_xid_estimate,
2440 txn != nullptr ? !txn->for_recovery : false);
2441 int r = ft_maybe_insert_into_rightmost_leaf(ft_h->ft, key, val, message_xids, FT_INSERT, &gc_info, false);
2442 if (r != 0) {
2443 toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info);
2444 }
2445 toku_ft_adjust_logical_row_count(ft_h->ft, 1);
2446 }
2447}
2448
2449static void ft_insert_directly_into_leaf(FT ft, FTNODE leaf, int target_childnum, DBT *key, DBT *val,
2450 XIDS message_xids, enum ft_msg_type type, txn_gc_info *gc_info)
2451// Effect: Insert directly into a leaf node a fractal tree. Does not do any logging.
2452// Requires: Leaf is fully in memory and pinned for write.
2453// Requires: If this insertion were to happen through the root node, the promotion
2454// algorithm would have selected the given leaf node as the point of injection.
2455// That means this function relies on the current implementation of promotion.
2456{
2457 ft_msg msg(key, val, type, ZERO_MSN, message_xids);
2458 size_t flow_deltas[] = { 0, 0 };
2459 inject_message_in_locked_node(ft, leaf, target_childnum, msg, flow_deltas, gc_info);
2460}
2461
2462static void
2463ft_send_update_msg(FT_HANDLE ft_h, const ft_msg &msg, TOKUTXN txn) {
2464 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2465 txn_manager_state txn_state_for_gc(txn_manager);
2466
2467 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2468 txn_gc_info gc_info(&txn_state_for_gc,
2469 oldest_referenced_xid_estimate,
2470 // no messages above us, we can implicitly promote uxrs based on this xid
2471 oldest_referenced_xid_estimate,
2472 txn != nullptr ? !txn->for_recovery : false);
2473 toku_ft_root_put_msg(ft_h->ft, msg, &gc_info);
2474}
2475
2476void toku_ft_maybe_update(FT_HANDLE ft_h,
2477 const DBT *key,
2478 const DBT *update_function_extra,
2479 TOKUTXN txn,
2480 bool oplsn_valid,
2481 LSN oplsn,
2482 bool do_logging) {
2483 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2484 if (txn) {
2485 BYTESTRING keybs = {key->size, (char *)key->data};
2486 toku_logger_save_rollback_cmdupdate(
2487 txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2488 toku_txn_maybe_note_ft(txn, ft_h->ft);
2489 }
2490
2491 TOKULOGGER logger;
2492 logger = toku_txn_logger(txn);
2493 if (do_logging && logger) {
2494 BYTESTRING keybs = {.len = key->size, .data = (char *)key->data};
2495 BYTESTRING extrabs = {.len = update_function_extra->size,
2496 .data = (char *)update_function_extra->data};
2497 toku_log_enq_update(logger,
2498 NULL,
2499 0,
2500 txn,
2501 toku_cachefile_filenum(ft_h->ft->cf),
2502 xid,
2503 keybs,
2504 extrabs);
2505 }
2506
2507 LSN treelsn;
2508 if (oplsn_valid &&
2509 oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2510 // do nothing
2511 } else {
2512 XIDS message_xids =
2513 txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2514 ft_msg msg(
2515 key, update_function_extra, FT_UPDATE, ZERO_MSN, message_xids);
2516 ft_send_update_msg(ft_h, msg, txn);
2517 }
2518 // updates get converted to insert messages, which should do a -1 on the
2519 // logical row count when the messages are permanently applied
2520 toku_ft_adjust_logical_row_count(ft_h->ft, 1);
2521}
2522
2523void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_extra,
2524 TOKUTXN txn, bool oplsn_valid, LSN oplsn,
2525 bool do_logging, bool is_resetting_op) {
2526 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2527 uint8_t resetting = is_resetting_op ? 1 : 0;
2528 if (txn) {
2529 toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(ft_h->ft->cf), resetting);
2530 toku_txn_maybe_note_ft(txn, ft_h->ft);
2531 }
2532
2533 TOKULOGGER logger;
2534 logger = toku_txn_logger(txn);
2535 if (do_logging && logger) {
2536 BYTESTRING extrabs = {.len=update_function_extra->size,
2537 .data = (char *) update_function_extra->data};
2538 toku_log_enq_updatebroadcast(logger, NULL, 0, txn,
2539 toku_cachefile_filenum(ft_h->ft->cf),
2540 xid, extrabs, resetting);
2541 }
2542
2543 //TODO(yoni): remove treelsn here and similar calls (no longer being used)
2544 LSN treelsn;
2545 if (oplsn_valid &&
2546 oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2547
2548 } else {
2549 DBT empty_dbt;
2550 XIDS message_xids = txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2551 ft_msg msg(toku_init_dbt(&empty_dbt), update_function_extra, FT_UPDATE_BROADCAST_ALL, ZERO_MSN, message_xids);
2552 ft_send_update_msg(ft_h, msg, txn);
2553 }
2554}
2555
2556void toku_ft_send_insert(FT_HANDLE ft_handle, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, txn_gc_info *gc_info) {
2557 ft_msg msg(key, val, type, ZERO_MSN, xids);
2558 toku_ft_root_put_msg(ft_handle->ft, msg, gc_info);
2559}
2560
2561void toku_ft_send_commit_any(FT_HANDLE ft_handle, DBT *key, XIDS xids, txn_gc_info *gc_info) {
2562 DBT val;
2563 ft_msg msg(key, toku_init_dbt(&val), FT_COMMIT_ANY, ZERO_MSN, xids);
2564 toku_ft_root_put_msg(ft_handle->ft, msg, gc_info);
2565}
2566
2567void toku_ft_delete(FT_HANDLE ft_handle, DBT *key, TOKUTXN txn) {
2568 toku_ft_maybe_delete(ft_handle, key, txn, false, ZERO_LSN, true);
2569}
2570
2571void
2572toku_ft_log_del(TOKUTXN txn, FT_HANDLE ft_handle, const DBT *key) {
2573 TOKULOGGER logger = toku_txn_logger(txn);
2574 if (logger) {
2575 BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2576 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2577 toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_handle->ft->cf), xid, keybs);
2578 }
2579}
2580
2581void
2582toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *fts, uint32_t num_fts, const DBT *key, const DBT *val) {
2583 assert(txn);
2584 assert(num_fts > 0);
2585 TOKULOGGER logger = toku_txn_logger(txn);
2586 if (logger) {
2587 FILENUM fnums[num_fts];
2588 uint32_t i;
2589 for (i = 0; i < num_fts; i++) {
2590 fnums[i] = toku_cachefile_filenum(fts[i]->ft->cf);
2591 }
2592 FILENUMS filenums = {.num = num_fts, .filenums = fnums};
2593 BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2594 BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2595 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2596 FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
2597 toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
2598 }
2599}
2600
2601void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging) {
2602 XIDS message_xids = toku_xids_get_root_xids(); //By default use committed messages
2603 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2604 if (txn) {
2605 BYTESTRING keybs = {key->size, (char *) key->data};
2606 toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2607 toku_txn_maybe_note_ft(txn, ft_h->ft);
2608 message_xids = toku_txn_get_xids(txn);
2609 }
2610 TOKULOGGER logger = toku_txn_logger(txn);
2611 if (do_logging && logger) {
2612 BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2613 toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
2614 }
2615
2616 LSN treelsn;
2617 if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2618 // do nothing
2619 } else {
2620 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2621 txn_manager_state txn_state_for_gc(txn_manager);
2622
2623 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2624 txn_gc_info gc_info(&txn_state_for_gc,
2625 oldest_referenced_xid_estimate,
2626 // no messages above us, we can implicitly promote uxrs based on this xid
2627 oldest_referenced_xid_estimate,
2628 txn != nullptr ? !txn->for_recovery : false);
2629 toku_ft_send_delete(ft_h, key, message_xids, &gc_info);
2630 toku_ft_adjust_logical_row_count(ft_h->ft, -1);
2631 }
2632}
2633
2634void toku_ft_send_delete(FT_HANDLE ft_handle, DBT *key, XIDS xids, txn_gc_info *gc_info) {
2635 DBT val; toku_init_dbt(&val);
2636 ft_msg msg(key, toku_init_dbt(&val), FT_DELETE_ANY, ZERO_MSN, xids);
2637 toku_ft_root_put_msg(ft_handle->ft, msg, gc_info);
2638}
2639
2640/* ******************** open,close and create ********************** */
2641
2642// Test only function (not used in running system). This one has no env
2643int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *ft_handle_p, int nodesize,
2644 int basementnodesize,
2645 enum toku_compression_method compression_method,
2646 CACHETABLE cachetable, TOKUTXN txn,
2647 int (*compare_fun)(DB *, const DBT*,const DBT*)) {
2648 FT_HANDLE ft_handle;
2649 const int only_create = 0;
2650
2651 toku_ft_handle_create(&ft_handle);
2652 toku_ft_handle_set_nodesize(ft_handle, nodesize);
2653 toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize);
2654 toku_ft_handle_set_compression_method(ft_handle, compression_method);
2655 toku_ft_handle_set_fanout(ft_handle, 16);
2656 toku_ft_set_bt_compare(ft_handle, compare_fun);
2657
2658 int r = toku_ft_handle_open(ft_handle, fname, is_create, only_create, cachetable, txn);
2659 if (r != 0) {
2660 return r;
2661 }
2662
2663 *ft_handle_p = ft_handle;
2664 return r;
2665}
2666
2667static bool use_direct_io = true;
2668
2669void toku_ft_set_direct_io (bool direct_io_on) {
2670 use_direct_io = direct_io_on;
2671}
2672
2673static inline int ft_open_maybe_direct(const char *filename,
2674 int oflag,
2675 int mode) {
2676 if (use_direct_io) {
2677 return toku_os_open_direct(
2678 filename, oflag, mode, *tokudb_file_data_key);
2679 } else {
2680 return toku_os_open(filename, oflag, mode, *tokudb_file_data_key);
2681 }
2682}
2683
2684static const mode_t file_mode = S_IRUSR+S_IWUSR+S_IRGRP+S_IWGRP+S_IROTH+S_IWOTH;
2685
2686inline bool toku_file_is_root(const char *path, const char *last_slash) {
2687 return last_slash == path;
2688}
2689
2690static std::unique_ptr<char[], decltype(&toku_free)> toku_file_get_parent_dir(
2691 const char *path) {
2692 std::unique_ptr<char[], decltype(&toku_free)> result(nullptr, &toku_free);
2693
2694 bool has_trailing_slash = false;
2695
2696 /* Find the offset of the last slash */
2697 const char *last_slash = strrchr(path, OS_PATH_SEPARATOR);
2698
2699 if (!last_slash) {
2700 /* No slash in the path, return NULL */
2701 return result;
2702 }
2703
2704 /* Ok, there is a slash. Is there anything after it? */
2705 if (static_cast<size_t>(last_slash - path + 1) == strlen(path)) {
2706 has_trailing_slash = true;
2707 }
2708
2709 /* Reduce repetative slashes. */
2710 while (last_slash > path && last_slash[-1] == OS_PATH_SEPARATOR) {
2711 last_slash--;
2712 }
2713
2714 /* Check for the root of a drive. */
2715 if (toku_file_is_root(path, last_slash)) {
2716 return result;
2717 }
2718
2719 /* If a trailing slash prevented the first strrchr() from trimming
2720 the last component of the path, trim that component now. */
2721 if (has_trailing_slash) {
2722 /* Back up to the previous slash. */
2723 last_slash--;
2724 while (last_slash > path && last_slash[0] != OS_PATH_SEPARATOR) {
2725 last_slash--;
2726 }
2727
2728 /* Reduce repetative slashes. */
2729 while (last_slash > path && last_slash[-1] == OS_PATH_SEPARATOR) {
2730 last_slash--;
2731 }
2732 }
2733
2734 /* Check for the root of a drive. */
2735 if (toku_file_is_root(path, last_slash)) {
2736 return result;
2737 }
2738
2739 result.reset(toku_strndup(path, last_slash - path));
2740 return result;
2741}
2742
2743bool toku_create_subdirs_if_needed(const char *path) {
2744 static const mode_t dir_mode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
2745 S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH;
2746
2747 toku_struct_stat stat;
2748 bool subdir_exists = true;
2749 auto subdir = toku_file_get_parent_dir(path);
2750
2751 if (!subdir.get())
2752 return true;
2753
2754 if (toku_stat(subdir.get(), &stat, toku_uninstrumented) == -1) {
2755 if (ENOENT == get_error_errno())
2756 subdir_exists = false;
2757 else
2758 return false;
2759 }
2760
2761 if (subdir_exists) {
2762 if (!S_ISDIR(stat.st_mode))
2763 return false;
2764 return true;
2765 }
2766
2767 if (!toku_create_subdirs_if_needed(subdir.get()))
2768 return false;
2769
2770 if (toku_os_mkdir(subdir.get(), dir_mode))
2771 return false;
2772
2773 return true;
2774}
2775
2776// open a file for use by the ft
2777// Requires: File does not exist.
2778static int ft_create_file(FT_HANDLE UU(ft_handle), const char *fname, int *fdp) {
2779 int r;
2780 int fd;
2781 int er;
2782 if (!toku_create_subdirs_if_needed(fname))
2783 return get_error_errno();
2784 fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, file_mode);
2785 assert(fd==-1);
2786 if ((er = get_maybe_error_errno()) != ENOENT) {
2787 return er;
2788 }
2789 fd = ft_open_maybe_direct(fname, O_RDWR | O_CREAT | O_BINARY, file_mode);
2790 if (fd==-1) {
2791 r = get_error_errno();
2792 return r;
2793 }
2794
2795 r = toku_fsync_directory(fname);
2796 if (r == 0) {
2797 *fdp = fd;
2798 } else {
2799 int rr = close(fd);
2800 assert_zero(rr);
2801 }
2802 return r;
2803}
2804
2805// open a file for use by the ft. if the file does not exist, error
2806static int ft_open_file(const char *fname, int *fdp) {
2807 int fd;
2808 fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, file_mode);
2809 if (fd==-1) {
2810 return get_error_errno();
2811 }
2812 *fdp = fd;
2813 return 0;
2814}
2815
2816void
2817toku_ft_handle_set_compression_method(FT_HANDLE t, enum toku_compression_method method)
2818{
2819 if (t->ft) {
2820 toku_ft_set_compression_method(t->ft, method);
2821 }
2822 else {
2823 t->options.compression_method = method;
2824 }
2825}
2826
2827void
2828toku_ft_handle_get_compression_method(FT_HANDLE t, enum toku_compression_method *methodp)
2829{
2830 if (t->ft) {
2831 toku_ft_get_compression_method(t->ft, methodp);
2832 }
2833 else {
2834 *methodp = t->options.compression_method;
2835 }
2836}
2837
2838void
2839toku_ft_handle_set_fanout(FT_HANDLE ft_handle, unsigned int fanout)
2840{
2841 if (ft_handle->ft) {
2842 toku_ft_set_fanout(ft_handle->ft, fanout);
2843 }
2844 else {
2845 ft_handle->options.fanout = fanout;
2846 }
2847}
2848
2849void
2850toku_ft_handle_get_fanout(FT_HANDLE ft_handle, unsigned int *fanout)
2851{
2852 if (ft_handle->ft) {
2853 toku_ft_get_fanout(ft_handle->ft, fanout);
2854 }
2855 else {
2856 *fanout = ft_handle->options.fanout;
2857 }
2858}
2859
2860// The memcmp magic byte may be set on a per fractal tree basis to communicate
2861// that if two keys begin with this byte, they may be compared with the builtin
2862// key comparison function. This greatly optimizes certain in-memory workloads,
2863// such as lookups by OID primary key in TokuMX.
2864int toku_ft_handle_set_memcmp_magic(FT_HANDLE ft_handle, uint8_t magic) {
2865 if (magic == comparator::MEMCMP_MAGIC_NONE) {
2866 return EINVAL;
2867 }
2868 if (ft_handle->ft != nullptr) {
2869 // if the handle is already open, then we cannot set the memcmp magic
2870 // (because it may or may not have been set by someone else already)
2871 return EINVAL;
2872 }
2873 ft_handle->options.memcmp_magic = magic;
2874 return 0;
2875}
2876
2877static int
2878verify_builtin_comparisons_consistent(FT_HANDLE t, uint32_t flags) {
2879 if ((flags & TOKU_DB_KEYCMP_BUILTIN) && (t->options.compare_fun != toku_builtin_compare_fun)) {
2880 return EINVAL;
2881 }
2882 return 0;
2883}
2884
2885//
2886// See comments in toku_db_change_descriptor to understand invariants
2887// in the system when this function is called
2888//
2889void toku_ft_change_descriptor(
2890 FT_HANDLE ft_h,
2891 const DBT* old_descriptor,
2892 const DBT* new_descriptor,
2893 bool do_log,
2894 TOKUTXN txn,
2895 bool update_cmp_descriptor
2896 )
2897{
2898 DESCRIPTOR_S new_d;
2899
2900 // if running with txns, save to rollback + write to recovery log
2901 if (txn) {
2902 // put information into rollback file
2903 BYTESTRING old_desc_bs = { old_descriptor->size, (char *) old_descriptor->data };
2904 BYTESTRING new_desc_bs = { new_descriptor->size, (char *) new_descriptor->data };
2905 toku_logger_save_rollback_change_fdescriptor(
2906 txn,
2907 toku_cachefile_filenum(ft_h->ft->cf),
2908 &old_desc_bs
2909 );
2910 toku_txn_maybe_note_ft(txn, ft_h->ft);
2911
2912 if (do_log) {
2913 TOKULOGGER logger = toku_txn_logger(txn);
2914 TXNID_PAIR xid = toku_txn_get_txnid(txn);
2915 toku_log_change_fdescriptor(
2916 logger, NULL, 0,
2917 txn,
2918 toku_cachefile_filenum(ft_h->ft->cf),
2919 xid,
2920 old_desc_bs,
2921 new_desc_bs,
2922 update_cmp_descriptor
2923 );
2924 }
2925 }
2926
2927 // write new_descriptor to header
2928 new_d.dbt = *new_descriptor;
2929 toku_ft_update_descriptor(ft_h->ft, &new_d);
2930 // very infrequent operation, worth precise threadsafe count
2931 FT_STATUS_INC(FT_DESCRIPTOR_SET, 1);
2932
2933 if (update_cmp_descriptor) {
2934 toku_ft_update_cmp_descriptor(ft_h->ft);
2935 }
2936}
2937
2938static void
2939toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
2940 struct ft_options options = {
2941 .nodesize = ft->h->nodesize,
2942 .basementnodesize = ft->h->basementnodesize,
2943 .compression_method = ft->h->compression_method,
2944 .fanout = ft->h->fanout,
2945 .flags = ft->h->flags,
2946 .memcmp_magic = ft->cmp.get_memcmp_magic(),
2947 .compare_fun = ft->cmp.get_compare_func(),
2948 .update_fun = ft->update_fun
2949 };
2950 t->options = options;
2951 t->did_set_flags = true;
2952}
2953
2954// This is the actual open, used for various purposes, such as normal use, recovery, and redirect.
2955// fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix).
2956// The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn .
2957// Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring.
2958static int
2959ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) {
2960 int r;
2961 bool txn_created = false;
2962 char *fname_in_cwd = NULL;
2963 CACHEFILE cf = NULL;
2964 FT ft = NULL;
2965 bool did_create = false;
2966 bool was_already_open = false;
2967
2968 toku_ft_open_close_lock();
2969
2970 if (ft_h->did_set_flags) {
2971 r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
2972 if (r!=0) { goto exit; }
2973 }
2974
2975 assert(is_create || !only_create);
2976 FILENUM reserved_filenum;
2977 reserved_filenum = use_filenum;
2978 fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env);
2979 {
2980 int fd = -1;
2981 r = ft_open_file(fname_in_cwd, &fd);
2982 if (reserved_filenum.fileid == FILENUM_NONE.fileid) {
2983 reserved_filenum = toku_cachetable_reserve_filenum(cachetable);
2984 }
2985 if (r==ENOENT && is_create) {
2986 did_create = true;
2987 if (txn) {
2988 BYTESTRING bs = { .len=(uint32_t) strlen(fname_in_env), .data = (char*)fname_in_env };
2989 toku_logger_save_rollback_fcreate(txn, reserved_filenum, &bs); // bs is a copy of the fname relative to the environment
2990 }
2991 txn_created = (bool)(txn!=NULL);
2992 toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, file_mode, ft_h->options.flags, ft_h->options.nodesize, ft_h->options.basementnodesize, ft_h->options.compression_method);
2993 r = ft_create_file(ft_h, fname_in_cwd, &fd);
2994 if (r) { goto exit; }
2995 }
2996 if (r) { goto exit; }
2997 r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum, &was_already_open);
2998 if (r) { goto exit; }
2999 }
3000 assert(ft_h->options.nodesize>0);
3001 if (is_create) {
3002 r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
3003 if (r==TOKUDB_DICTIONARY_NO_HEADER) {
3004 toku_ft_create(&ft, &ft_h->options, cf, txn);
3005 }
3006 else if (r!=0) {
3007 goto exit;
3008 }
3009 else if (only_create) {
3010 assert_zero(r);
3011 r = EEXIST;
3012 goto exit;
3013 }
3014 // if we get here, then is_create was true but only_create was false,
3015 // so it is ok for toku_read_ft_and_store_in_cachefile to have read
3016 // the header via toku_read_ft_and_store_in_cachefile
3017 } else {
3018 r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
3019 if (r) { goto exit; }
3020 }
3021 if (!ft_h->did_set_flags) {
3022 r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
3023 if (r) { goto exit; }
3024 } else if (ft_h->options.flags != ft->h->flags) { /* if flags have been set then flags must match */
3025 r = EINVAL;
3026 goto exit;
3027 }
3028
3029 // Ensure that the memcmp magic bits are consistent, if set.
3030 if (ft->cmp.get_memcmp_magic() != toku::comparator::MEMCMP_MAGIC_NONE &&
3031 ft_h->options.memcmp_magic != toku::comparator::MEMCMP_MAGIC_NONE &&
3032 ft_h->options.memcmp_magic != ft->cmp.get_memcmp_magic()) {
3033 r = EINVAL;
3034 goto exit;
3035 }
3036 toku_ft_handle_inherit_options(ft_h, ft);
3037
3038 if (!was_already_open) {
3039 if (!did_create) { //Only log the fopen that OPENs the file. If it was already open, don't log.
3040 toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), ft_h->options.flags);
3041 }
3042 }
3043 int use_reserved_dict_id;
3044 use_reserved_dict_id = use_dictionary_id.dictid != DICTIONARY_ID_NONE.dictid;
3045 if (!was_already_open) {
3046 DICTIONARY_ID dict_id;
3047 if (use_reserved_dict_id) {
3048 dict_id = use_dictionary_id;
3049 }
3050 else {
3051 dict_id = next_dict_id();
3052 }
3053 ft->dict_id = dict_id;
3054 }
3055 else {
3056 // dict_id is already in header
3057 if (use_reserved_dict_id) {
3058 assert(ft->dict_id.dictid == use_dictionary_id.dictid);
3059 }
3060 }
3061 assert(ft);
3062 assert(ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
3063 assert(ft->dict_id.dictid < dict_id_serial);
3064
3065 // important note here,
3066 // after this point, where we associate the header
3067 // with the ft_handle, the function is not allowed to fail
3068 // Code that handles failure (located below "exit"),
3069 // depends on this
3070 toku_ft_note_ft_handle_open(ft, ft_h);
3071 if (txn_created) {
3072 assert(txn);
3073 toku_txn_maybe_note_ft(txn, ft);
3074 }
3075
3076 // Opening an ft may restore to previous checkpoint.
3077 // Truncate if necessary.
3078 {
3079 int fd = toku_cachefile_get_fd (ft->cf);
3080 ft->blocktable.maybe_truncate_file_on_open(fd);
3081 }
3082
3083 r = 0;
3084exit:
3085 if (fname_in_cwd) {
3086 toku_free(fname_in_cwd);
3087 }
3088 if (r != 0 && cf) {
3089 if (ft) {
3090 // we only call toku_ft_note_ft_handle_open
3091 // when the function succeeds, so if we are here,
3092 // then that means we have a reference to the header
3093 // but we have not linked it to this ft. So,
3094 // we can simply try to remove the header.
3095 // We don't need to unlink this ft from the header
3096 toku_ft_grab_reflock(ft);
3097 bool needed = toku_ft_needed_unlocked(ft);
3098 toku_ft_release_reflock(ft);
3099 if (!needed) {
3100 // close immediately.
3101 toku_ft_evict_from_memory(ft, false, ZERO_LSN);
3102 }
3103 }
3104 else {
3105 toku_cachefile_close(&cf, false, ZERO_LSN);
3106 }
3107 }
3108 toku_ft_open_close_unlock();
3109 return r;
3110}
3111
3112// Open an ft for the purpose of recovery, which requires that the ft be open to a pre-determined FILENUM
3113// and may require a specific checkpointed version of the file.
3114// (dict_id is assigned by the ft_handle_open() function.)
3115int
3116toku_ft_handle_open_recovery(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, LSN max_acceptable_lsn) {
3117 int r;
3118 assert(use_filenum.fileid != FILENUM_NONE.fileid);
3119 r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable,
3120 txn, use_filenum, DICTIONARY_ID_NONE, max_acceptable_lsn);
3121 return r;
3122}
3123
3124// Open an ft in normal use. The FILENUM and dict_id are assigned by the ft_handle_open() function.
3125// Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring.
3126int
3127toku_ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn) {
3128 int r;
3129 r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable, txn, FILENUM_NONE, DICTIONARY_ID_NONE, MAX_LSN);
3130 return r;
3131}
3132
3133// clone an ft handle. the cloned handle has a new dict_id but refers to the same fractal tree
3134int
3135toku_ft_handle_clone(FT_HANDLE *cloned_ft_handle, FT_HANDLE ft_handle, TOKUTXN txn) {
3136 FT_HANDLE result_ft_handle;
3137 toku_ft_handle_create(&result_ft_handle);
3138
3139 // we're cloning, so the handle better have an open ft and open cf
3140 invariant(ft_handle->ft);
3141 invariant(ft_handle->ft->cf);
3142
3143 // inherit the options of the ft whose handle is being cloned.
3144 toku_ft_handle_inherit_options(result_ft_handle, ft_handle->ft);
3145
3146 // we can clone the handle by creating a new handle with the same fname
3147 CACHEFILE cf = ft_handle->ft->cf;
3148 CACHETABLE ct = toku_cachefile_get_cachetable(cf);
3149 const char *fname_in_env = toku_cachefile_fname_in_env(cf);
3150 int r = toku_ft_handle_open(result_ft_handle, fname_in_env, false, false, ct, txn);
3151 if (r != 0) {
3152 toku_ft_handle_close(result_ft_handle);
3153 result_ft_handle = NULL;
3154 }
3155 *cloned_ft_handle = result_ft_handle;
3156 return r;
3157}
3158
3159// Open an ft in normal use. The FILENUM and dict_id are assigned by the ft_handle_open() function.
3160int
3161toku_ft_handle_open_with_dict_id(
3162 FT_HANDLE t,
3163 const char *fname_in_env,
3164 int is_create,
3165 int only_create,
3166 CACHETABLE cachetable,
3167 TOKUTXN txn,
3168 DICTIONARY_ID use_dictionary_id
3169 )
3170{
3171 int r;
3172 r = ft_handle_open(
3173 t,
3174 fname_in_env,
3175 is_create,
3176 only_create,
3177 cachetable,
3178 txn,
3179 FILENUM_NONE,
3180 use_dictionary_id,
3181 MAX_LSN
3182 );
3183 return r;
3184}
3185
3186DICTIONARY_ID
3187toku_ft_get_dictionary_id(FT_HANDLE ft_handle) {
3188 FT ft = ft_handle->ft;
3189 return ft->dict_id;
3190}
3191
3192void toku_ft_set_flags(FT_HANDLE ft_handle, unsigned int flags) {
3193 ft_handle->did_set_flags = true;
3194 ft_handle->options.flags = flags;
3195}
3196
3197void toku_ft_get_flags(FT_HANDLE ft_handle, unsigned int *flags) {
3198 *flags = ft_handle->options.flags;
3199}
3200
3201void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, unsigned int *max_val_len)
3202// return the maximum advisable key value lengths. The ft doesn't enforce these.
3203{
3204 *max_key_len = 32*1024;
3205 *max_val_len = 32*1024*1024;
3206}
3207
3208
3209void toku_ft_handle_set_nodesize(FT_HANDLE ft_handle, unsigned int nodesize) {
3210 if (ft_handle->ft) {
3211 toku_ft_set_nodesize(ft_handle->ft, nodesize);
3212 }
3213 else {
3214 ft_handle->options.nodesize = nodesize;
3215 }
3216}
3217
3218void toku_ft_handle_get_nodesize(FT_HANDLE ft_handle, unsigned int *nodesize) {
3219 if (ft_handle->ft) {
3220 toku_ft_get_nodesize(ft_handle->ft, nodesize);
3221 }
3222 else {
3223 *nodesize = ft_handle->options.nodesize;
3224 }
3225}
3226
3227void toku_ft_handle_set_basementnodesize(FT_HANDLE ft_handle, unsigned int basementnodesize) {
3228 if (ft_handle->ft) {
3229 toku_ft_set_basementnodesize(ft_handle->ft, basementnodesize);
3230 }
3231 else {
3232 ft_handle->options.basementnodesize = basementnodesize;
3233 }
3234}
3235
3236void toku_ft_handle_get_basementnodesize(FT_HANDLE ft_handle, unsigned int *basementnodesize) {
3237 if (ft_handle->ft) {
3238 toku_ft_get_basementnodesize(ft_handle->ft, basementnodesize);
3239 }
3240 else {
3241 *basementnodesize = ft_handle->options.basementnodesize;
3242 }
3243}
3244
3245void toku_ft_set_bt_compare(FT_HANDLE ft_handle, int (*bt_compare)(DB*, const DBT*, const DBT*)) {
3246 ft_handle->options.compare_fun = bt_compare;
3247}
3248
3249void toku_ft_set_redirect_callback(FT_HANDLE ft_handle, on_redirect_callback redir_cb, void* extra) {
3250 ft_handle->redirect_callback = redir_cb;
3251 ft_handle->redirect_callback_extra = extra;
3252}
3253
3254void toku_ft_set_update(FT_HANDLE ft_handle, ft_update_func update_fun) {
3255 ft_handle->options.update_fun = update_fun;
3256}
3257
3258const toku::comparator &toku_ft_get_comparator(FT_HANDLE ft_handle) {
3259 invariant_notnull(ft_handle->ft);
3260 return ft_handle->ft->cmp;
3261}
3262
3263static void
3264ft_remove_handle_ref_callback(FT UU(ft), void *extra) {
3265 FT_HANDLE CAST_FROM_VOIDP(handle, extra);
3266 toku_list_remove(&handle->live_ft_handle_link);
3267}
3268
3269static void ft_handle_close(FT_HANDLE ft_handle, bool oplsn_valid, LSN oplsn) {
3270 FT ft = ft_handle->ft;
3271 // There are error paths in the ft_handle_open that end with ft_handle->ft == nullptr.
3272 if (ft != nullptr) {
3273 toku_ft_remove_reference(ft, oplsn_valid, oplsn, ft_remove_handle_ref_callback, ft_handle);
3274 }
3275 toku_free(ft_handle);
3276}
3277
3278// close an ft handle during normal operation. the underlying ft may or may not close,
3279// depending if there are still references. an lsn for this close will come from the logger.
3280void toku_ft_handle_close(FT_HANDLE ft_handle) {
3281 ft_handle_close(ft_handle, false, ZERO_LSN);
3282}
3283
3284// close an ft handle during recovery. the underlying ft must close, and will use the given lsn.
3285void toku_ft_handle_close_recovery(FT_HANDLE ft_handle, LSN oplsn) {
3286 // the ft must exist if closing during recovery. error paths during
3287 // open for recovery should close handles using toku_ft_handle_close()
3288 invariant_notnull(ft_handle->ft);
3289 ft_handle_close(ft_handle, true, oplsn);
3290}
3291
3292// TODO: remove this, callers should instead just use toku_ft_handle_close()
3293int toku_close_ft_handle_nolsn(FT_HANDLE ft_handle, char **UU(error_string)) {
3294 toku_ft_handle_close(ft_handle);
3295 return 0;
3296}
3297
3298void toku_ft_handle_create(FT_HANDLE *ft_handle_ptr) {
3299 FT_HANDLE XMALLOC(ft_handle);
3300 memset(ft_handle, 0, sizeof *ft_handle);
3301 toku_list_init(&ft_handle->live_ft_handle_link);
3302 ft_handle->options.flags = 0;
3303 ft_handle->did_set_flags = false;
3304 ft_handle->options.nodesize = FT_DEFAULT_NODE_SIZE;
3305 ft_handle->options.basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
3306 ft_handle->options.compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
3307 ft_handle->options.fanout = FT_DEFAULT_FANOUT;
3308 ft_handle->options.compare_fun = toku_builtin_compare_fun;
3309 ft_handle->options.update_fun = NULL;
3310 *ft_handle_ptr = ft_handle;
3311}
3312
3313/******************************* search ***************************************/
3314
3315// Return true if this key is within the search bound. If there is no search bound then the tree search continues.
3316static bool search_continue(ft_search *search, void *key, uint32_t key_len) {
3317 bool result = true;
3318 if (search->direction == FT_SEARCH_LEFT && search->k_bound) {
3319 FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context);
3320 DBT this_key = { .data = key, .size = key_len };
3321 // search continues if this key <= key bound
3322 result = (ft_handle->ft->cmp(&this_key, search->k_bound) <= 0);
3323 }
3324 return result;
3325}
3326
3327static int heaviside_from_search_t(const DBT &kdbt, ft_search &search) {
3328 int cmp = search.compare(search,
3329 search.k ? &kdbt : 0);
3330 // The search->compare function returns only 0 or 1
3331 switch (search.direction) {
3332 case FT_SEARCH_LEFT: return cmp==0 ? -1 : +1;
3333 case FT_SEARCH_RIGHT: return cmp==0 ? +1 : -1; // Because the comparison runs backwards for right searches.
3334 }
3335 abort(); return 0;
3336}
3337
3338// This is a bottom layer of the search functions.
3339static int
3340ft_search_basement_node(
3341 BASEMENTNODE bn,
3342 ft_search *search,
3343 FT_GET_CALLBACK_FUNCTION getf,
3344 void *getf_v,
3345 bool *doprefetch,
3346 FT_CURSOR ftcursor,
3347 bool can_bulk_fetch
3348 )
3349{
3350 // Now we have to convert from ft_search to the heaviside function with a direction. What a pain...
3351
3352 int direction;
3353 switch (search->direction) {
3354 case FT_SEARCH_LEFT: direction = +1; goto ok;
3355 case FT_SEARCH_RIGHT: direction = -1; goto ok;
3356 }
3357 return EINVAL; // This return and the goto are a hack to get both compile-time and run-time checking on enum
3358ok: ;
3359 uint32_t idx = 0;
3360 LEAFENTRY le;
3361 uint32_t keylen;
3362 void *key;
3363 int r = bn->data_buffer.find<decltype(*search), heaviside_from_search_t>(
3364 *search,
3365 direction,
3366 &le,
3367 &key,
3368 &keylen,
3369 &idx
3370 );
3371 if (r!=0) return r;
3372
3373 if (toku_ft_cursor_is_leaf_mode(ftcursor))
3374 goto got_a_good_value; // leaf mode cursors see all leaf entries
3375 if (le_val_is_del(le, ftcursor->read_type, ftcursor->ttxn)) {
3376 // Provisionally deleted stuff is gone.
3377 // So we need to scan in the direction to see if we can find something.
3378 // Every 64 deleted leaf entries check if the leaf's key is within the search bounds.
3379 for (uint64_t n_deleted = 1; ; n_deleted++) {
3380 switch (search->direction) {
3381 case FT_SEARCH_LEFT:
3382 idx++;
3383 if (idx >= bn->data_buffer.num_klpairs() || ((n_deleted % 64) == 0 && !search_continue(search, key, keylen))) {
3384 FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
3385 if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) {
3386 return TOKUDB_INTERRUPTED;
3387 }
3388 return DB_NOTFOUND;
3389 }
3390 break;
3391 case FT_SEARCH_RIGHT:
3392 if (idx == 0) {
3393 FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
3394 if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) {
3395 return TOKUDB_INTERRUPTED;
3396 }
3397 return DB_NOTFOUND;
3398 }
3399 idx--;
3400 break;
3401 default:
3402 abort();
3403 }
3404 r = bn->data_buffer.fetch_klpair(idx, &le, &keylen, &key);
3405 assert_zero(r); // we just validated the index
3406 if (!le_val_is_del(le, ftcursor->read_type, ftcursor->ttxn)) {
3407 FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
3408 if (ftcursor->interrupt_cb)
3409 ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted);
3410 goto got_a_good_value;
3411 }
3412 }
3413 }
3414got_a_good_value:
3415 {
3416 uint32_t vallen;
3417 void *val;
3418
3419 le_extract_val(le, toku_ft_cursor_is_leaf_mode(ftcursor),
3420 ftcursor->read_type, ftcursor->ttxn,
3421 &vallen, &val);
3422 r = toku_ft_cursor_check_restricted_range(ftcursor, key, keylen);
3423 if (r == 0) {
3424 r = getf(keylen, key, vallen, val, getf_v, false);
3425 }
3426 if (r == 0 || r == TOKUDB_CURSOR_CONTINUE) {
3427 //
3428 // IMPORTANT: bulk fetch CANNOT go past the current basement node,
3429 // because there is no guarantee that messages have been applied
3430 // to other basement nodes, as part of #5770
3431 //
3432 if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) {
3433 r = toku_ft_cursor_shortcut(ftcursor, direction, idx, &bn->data_buffer,
3434 getf, getf_v, &keylen, &key, &vallen, &val);
3435 }
3436
3437 toku_destroy_dbt(&ftcursor->key);
3438 toku_destroy_dbt(&ftcursor->val);
3439 if (!ftcursor->is_temporary) {
3440 toku_memdup_dbt(&ftcursor->key, key, keylen);
3441 toku_memdup_dbt(&ftcursor->val, val, vallen);
3442 }
3443 // The search was successful. Prefetching can continue.
3444 *doprefetch = true;
3445 }
3446 }
3447 if (r == TOKUDB_CURSOR_CONTINUE) r = 0;
3448 return r;
3449}
3450
3451static int
3452ft_search_node (
3453 FT_HANDLE ft_handle,
3454 FTNODE node,
3455 ft_search *search,
3456 int child_to_search,
3457 FT_GET_CALLBACK_FUNCTION getf,
3458 void *getf_v,
3459 bool *doprefetch,
3460 FT_CURSOR ftcursor,
3461 UNLOCKERS unlockers,
3462 ANCESTORS,
3463 const pivot_bounds &bounds,
3464 bool can_bulk_fetch
3465 );
3466
3467static int
3468ftnode_fetch_callback_and_free_bfe(CACHEFILE cf, PAIR p, int fd, BLOCKNUM blocknum, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int *dirtyp, void *extraargs)
3469{
3470 int r = toku_ftnode_fetch_callback(cf, p, fd, blocknum, fullhash, ftnode_pv, disk_data, sizep, dirtyp, extraargs);
3471 ftnode_fetch_extra *CAST_FROM_VOIDP(bfe, extraargs);
3472 bfe->destroy();
3473 toku_free(bfe);
3474 return r;
3475}
3476
3477static int
3478ftnode_pf_callback_and_free_bfe(void *ftnode_pv, void* disk_data, void *read_extraargs, int fd, PAIR_ATTR *sizep)
3479{
3480 int r = toku_ftnode_pf_callback(ftnode_pv, disk_data, read_extraargs, fd, sizep);
3481 ftnode_fetch_extra *CAST_FROM_VOIDP(bfe, read_extraargs);
3482 bfe->destroy();
3483 toku_free(bfe);
3484 return r;
3485}
3486
3487CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT ft) {
3488 CACHETABLE_WRITE_CALLBACK wc;
3489 wc.flush_callback = toku_ftnode_flush_callback;
3490 wc.pe_est_callback = toku_ftnode_pe_est_callback;
3491 wc.pe_callback = toku_ftnode_pe_callback;
3492 wc.cleaner_callback = toku_ftnode_cleaner_callback;
3493 wc.clone_callback = toku_ftnode_clone_callback;
3494 wc.checkpoint_complete_callback = toku_ftnode_checkpoint_complete_callback;
3495 wc.write_extraargs = ft;
3496 return wc;
3497}
3498
3499static void
3500ft_node_maybe_prefetch(FT_HANDLE ft_handle, FTNODE node, int childnum, FT_CURSOR ftcursor, bool *doprefetch) {
3501 // the number of nodes to prefetch
3502 const int num_nodes_to_prefetch = 1;
3503
3504 // if we want to prefetch in the tree
3505 // then prefetch the next children if there are any
3506 if (*doprefetch && toku_ft_cursor_prefetching(ftcursor) && !ftcursor->disable_prefetching) {
3507 int rc = ft_cursor_rightmost_child_wanted(ftcursor, ft_handle, node);
3508 for (int i = childnum + 1; (i <= childnum + num_nodes_to_prefetch) && (i <= rc); i++) {
3509 BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, i);
3510 uint32_t nextfullhash = compute_child_fullhash(ft_handle->ft->cf, node, i);
3511 ftnode_fetch_extra *XCALLOC(bfe);
3512 bfe->create_for_prefetch(ft_handle->ft, ftcursor);
3513 bool doing_prefetch = false;
3514 toku_cachefile_prefetch(
3515 ft_handle->ft->cf,
3516 nextchildblocknum,
3517 nextfullhash,
3518 get_write_callbacks_for_node(ft_handle->ft),
3519 ftnode_fetch_callback_and_free_bfe,
3520 toku_ftnode_pf_req_callback,
3521 ftnode_pf_callback_and_free_bfe,
3522 bfe,
3523 &doing_prefetch
3524 );
3525 if (!doing_prefetch) {
3526 bfe->destroy();
3527 toku_free(bfe);
3528 }
3529 *doprefetch = false;
3530 }
3531 }
3532}
3533
3534struct unlock_ftnode_extra {
3535 FT_HANDLE ft_handle;
3536 FTNODE node;
3537 bool msgs_applied;
3538};
3539
3540// When this is called, the cachetable lock is held
3541static void
3542unlock_ftnode_fun (void *v) {
3543 struct unlock_ftnode_extra *x = NULL;
3544 CAST_FROM_VOIDP(x, v);
3545 FT_HANDLE ft_handle = x->ft_handle;
3546 FTNODE node = x->node;
3547 // CT lock is held
3548 int r = toku_cachetable_unpin_ct_prelocked_no_flush(
3549 ft_handle->ft->cf,
3550 node->ct_pair,
3551 (enum cachetable_dirty) node->dirty,
3552 x->msgs_applied ? make_ftnode_pair_attr(node) : make_invalid_pair_attr()
3553 );
3554 assert_zero(r);
3555}
3556
3557/* search in a node's child */
3558static int
3559ft_search_child(FT_HANDLE ft_handle, FTNODE node, int childnum, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, bool *doprefetch, FT_CURSOR ftcursor, UNLOCKERS unlockers,
3560 ANCESTORS ancestors, const pivot_bounds &bounds, bool can_bulk_fetch)
3561// Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned).
3562{
3563 struct ancestors next_ancestors = {node, childnum, ancestors};
3564
3565 BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
3566 uint32_t fullhash = compute_child_fullhash(ft_handle->ft->cf, node, childnum);
3567 FTNODE childnode = nullptr;
3568
3569 // If the current node's height is greater than 1, then its child is an internal node.
3570 // Therefore, to warm the cache better (#5798), we want to read all the partitions off disk in one shot.
3571 bool read_all_partitions = node->height > 1;
3572 ftnode_fetch_extra bfe;
3573 bfe.create_for_subset_read(
3574 ft_handle->ft,
3575 search,
3576 &ftcursor->range_lock_left_key,
3577 &ftcursor->range_lock_right_key,
3578 ftcursor->left_is_neg_infty,
3579 ftcursor->right_is_pos_infty,
3580 ftcursor->disable_prefetching,
3581 read_all_partitions
3582 );
3583 bool msgs_applied = false;
3584 {
3585 int rr = toku_pin_ftnode_for_query(ft_handle, childblocknum, fullhash,
3586 unlockers,
3587 &next_ancestors, bounds,
3588 &bfe,
3589 true,
3590 &childnode,
3591 &msgs_applied);
3592 if (rr==TOKUDB_TRY_AGAIN) {
3593 return rr;
3594 }
3595 invariant_zero(rr);
3596 }
3597
3598 struct unlock_ftnode_extra unlock_extra = { ft_handle, childnode, msgs_applied };
3599 struct unlockers next_unlockers = { true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers };
3600 int r = ft_search_node(ft_handle, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, ftcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch);
3601 if (r!=TOKUDB_TRY_AGAIN) {
3602 // maybe prefetch the next child
3603 if (r == 0 && node->height == 1) {
3604 ft_node_maybe_prefetch(ft_handle, node, childnum, ftcursor, doprefetch);
3605 }
3606
3607 assert(next_unlockers.locked);
3608 if (msgs_applied) {
3609 toku_unpin_ftnode(ft_handle->ft, childnode);
3610 }
3611 else {
3612 toku_unpin_ftnode_read_only(ft_handle->ft, childnode);
3613 }
3614 } else {
3615 // try again.
3616
3617 // there are two cases where we get TOKUDB_TRY_AGAIN
3618 // case 1 is when some later call to toku_pin_ftnode returned
3619 // that value and unpinned all the nodes anyway. case 2
3620 // is when ft_search_node had to stop its search because
3621 // some piece of a node that it needed was not in memory. In this case,
3622 // the node was not unpinned, so we unpin it here
3623 if (next_unlockers.locked) {
3624 if (msgs_applied) {
3625 toku_unpin_ftnode(ft_handle->ft, childnode);
3626 }
3627 else {
3628 toku_unpin_ftnode_read_only(ft_handle->ft, childnode);
3629 }
3630 }
3631 }
3632
3633 return r;
3634}
3635
3636static inline int
3637search_which_child_cmp_with_bound(const toku::comparator &cmp, FTNODE node, int childnum,
3638 ft_search *search, DBT *dbt) {
3639 return cmp(toku_copyref_dbt(dbt, node->pivotkeys.get_pivot(childnum)), &search->pivot_bound);
3640}
3641
3642int
3643toku_ft_search_which_child(const toku::comparator &cmp, FTNODE node, ft_search *search) {
3644 if (node->n_children <= 1) return 0;
3645
3646 DBT pivotkey;
3647 toku_init_dbt(&pivotkey);
3648 int lo = 0;
3649 int hi = node->n_children - 1;
3650 int mi;
3651 while (lo < hi) {
3652 mi = (lo + hi) / 2;
3653 node->pivotkeys.fill_pivot(mi, &pivotkey);
3654 // search->compare is really strange, and only works well with a
3655 // linear search, it makes binary search a pita.
3656 //
3657 // if you are searching left to right, it returns
3658 // "0" for pivots that are < the target, and
3659 // "1" for pivots that are >= the target
3660 // if you are searching right to left, it's the opposite.
3661 //
3662 // so if we're searching from the left and search->compare says
3663 // "1", we want to go left from here, if it says "0" we want to go
3664 // right. searching from the right does the opposite.
3665 bool c = search->compare(*search, &pivotkey);
3666 if (((search->direction == FT_SEARCH_LEFT) && c) ||
3667 ((search->direction == FT_SEARCH_RIGHT) && !c)) {
3668 hi = mi;
3669 } else {
3670 assert(((search->direction == FT_SEARCH_LEFT) && !c) ||
3671 ((search->direction == FT_SEARCH_RIGHT) && c));
3672 lo = mi + 1;
3673 }
3674 }
3675 // ready to return something, if the pivot is bounded, we have to move
3676 // over a bit to get away from what we've already searched
3677 if (search->pivot_bound.data != nullptr) {
3678 if (search->direction == FT_SEARCH_LEFT) {
3679 while (lo < node->n_children - 1 &&
3680 search_which_child_cmp_with_bound(cmp, node, lo, search, &pivotkey) <= 0) {
3681 // searching left to right, if the comparison says the
3682 // current pivot (lo) is left of or equal to our bound,
3683 // don't search that child again
3684 lo++;
3685 }
3686 } else {
3687 while (lo > 0 &&
3688 search_which_child_cmp_with_bound(cmp, node, lo - 1, search, &pivotkey) >= 0) {
3689 // searching right to left, same argument as just above
3690 // (but we had to pass lo - 1 because the pivot between lo
3691 // and the thing just less than it is at that position in
3692 // the pivot keys array)
3693 lo--;
3694 }
3695 }
3696 }
3697 return lo;
3698}
3699
3700static void
3701maybe_search_save_bound(
3702 FTNODE node,
3703 int child_searched,
3704 ft_search *search)
3705{
3706 int p = (search->direction == FT_SEARCH_LEFT) ? child_searched : child_searched - 1;
3707 if (p >= 0 && p < node->n_children-1) {
3708 toku_destroy_dbt(&search->pivot_bound);
3709 toku_clone_dbt(&search->pivot_bound, node->pivotkeys.get_pivot(p));
3710 }
3711}
3712
3713// Returns true if there are still children left to search in this node within the search bound (if any).
3714static bool search_try_again(FTNODE node, int child_to_search, ft_search *search) {
3715 bool try_again = false;
3716 if (search->direction == FT_SEARCH_LEFT) {
3717 if (child_to_search < node->n_children-1) {
3718 try_again = true;
3719 // if there is a search bound and the bound is within the search pivot then continue the search
3720 if (search->k_bound) {
3721 FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context);
3722 try_again = (ft_handle->ft->cmp(search->k_bound, &search->pivot_bound) > 0);
3723 }
3724 }
3725 } else if (search->direction == FT_SEARCH_RIGHT) {
3726 if (child_to_search > 0)
3727 try_again = true;
3728 }
3729 return try_again;
3730}
3731
3732static int
3733ft_search_node(
3734 FT_HANDLE ft_handle,
3735 FTNODE node,
3736 ft_search *search,
3737 int child_to_search,
3738 FT_GET_CALLBACK_FUNCTION getf,
3739 void *getf_v,
3740 bool *doprefetch,
3741 FT_CURSOR ftcursor,
3742 UNLOCKERS unlockers,
3743 ANCESTORS ancestors,
3744 const pivot_bounds &bounds,
3745 bool can_bulk_fetch
3746 )
3747{
3748 int r = 0;
3749 // assert that we got a valid child_to_search
3750 invariant(child_to_search >= 0);
3751 invariant(child_to_search < node->n_children);
3752 //
3753 // At this point, we must have the necessary partition available to continue the search
3754 //
3755 assert(BP_STATE(node,child_to_search) == PT_AVAIL);
3756 const pivot_bounds next_bounds = bounds.next_bounds(node, child_to_search);
3757 if (node->height > 0) {
3758 r = ft_search_child(
3759 ft_handle,
3760 node,
3761 child_to_search,
3762 search,
3763 getf,
3764 getf_v,
3765 doprefetch,
3766 ftcursor,
3767 unlockers,
3768 ancestors,
3769 next_bounds,
3770 can_bulk_fetch
3771 );
3772 }
3773 else {
3774 r = ft_search_basement_node(
3775 BLB(node, child_to_search),
3776 search,
3777 getf,
3778 getf_v,
3779 doprefetch,
3780 ftcursor,
3781 can_bulk_fetch
3782 );
3783 }
3784 if (r == 0) {
3785 return r; //Success
3786 }
3787
3788 if (r != DB_NOTFOUND) {
3789 return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN)
3790 }
3791 // not really necessary, just put this here so that reading the
3792 // code becomes simpler. The point is at this point in the code,
3793 // we know that we got DB_NOTFOUND and we have to continue
3794 assert(r == DB_NOTFOUND);
3795 // we have a new pivotkey
3796 if (node->height == 0) {
3797 // when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
3798 const DBT *pivot = search->direction == FT_SEARCH_LEFT ? next_bounds.ubi() : // left -> right
3799 next_bounds.lbe(); // right -> left
3800 if (pivot != nullptr) {
3801 int rr = getf(pivot->size, pivot->data, 0, nullptr, getf_v, true);
3802 if (rr != 0) {
3803 return rr; // lock was not granted
3804 }
3805 }
3806 }
3807
3808 // If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible.
3809 // This way of doing DB_NOTFOUND is a kludge, and ought to be simplified. Something like this is needed for DB_NEXT, but
3810 // for point queries, it's overkill. If we got a DB_NOTFOUND on a point query then we should just stop looking.
3811 // When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress.
3812 // If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left).
3813 // So save the pivot key in the search object.
3814 maybe_search_save_bound(node, child_to_search, search);
3815
3816 // as part of #5770, if we can continue searching,
3817 // we MUST return TOKUDB_TRY_AGAIN,
3818 // because there is no guarantee that messages have been applied
3819 // on any other path.
3820 if (search_try_again(node, child_to_search, search)) {
3821 r = TOKUDB_TRY_AGAIN;
3822 }
3823
3824 return r;
3825}
3826
3827int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, FT_CURSOR ftcursor, bool can_bulk_fetch)
3828// Effect: Perform a search. Associate cursor with a leaf if possible.
3829// All searches are performed through this function.
3830{
3831 int r;
3832 uint trycount = 0; // How many tries did it take to get the result?
3833 FT ft = ft_handle->ft;
3834
3835 toku::context search_ctx(CTX_SEARCH);
3836
3837try_again:
3838
3839 trycount++;
3840
3841 //
3842 // Here is how searches work
3843 // At a high level, we descend down the tree, using the search parameter
3844 // to guide us towards where to look. But the search parameter is not
3845 // used here to determine which child of a node to read (regardless
3846 // of whether that child is another node or a basement node)
3847 // The search parameter is used while we are pinning the node into
3848 // memory, because that is when the system needs to ensure that
3849 // the appropriate partition of the child we are using is in memory.
3850 // So, here are the steps for a search (and this applies to this function
3851 // as well as ft_search_child:
3852 // - Take the search parameter, and create a ftnode_fetch_extra, that will be used by toku_pin_ftnode
3853 // - Call toku_pin_ftnode with the bfe as the extra for the fetch callback (in case the node is not at all in memory)
3854 // and the partial fetch callback (in case the node is perhaps partially in memory) to the fetch the node
3855 // - This eventually calls either toku_ftnode_fetch_callback or toku_ftnode_pf_req_callback depending on whether the node is in
3856 // memory at all or not.
3857 // - Within these functions, the "ft_search search" parameter is used to evaluate which child the search is interested in.
3858 // If the node is not in memory at all, toku_ftnode_fetch_callback will read the node and decompress only the partition for the
3859 // relevant child, be it a message buffer or basement node. If the node is in memory, then toku_ftnode_pf_req_callback
3860 // will tell the cachetable that a partial fetch is required if and only if the relevant child is not in memory. If the relevant child
3861 // is not in memory, then toku_ftnode_pf_callback is called to fetch the partition.
3862 // - These functions set bfe->child_to_read so that the search code does not need to reevaluate it.
3863 // - Just to reiterate, all of the last item happens within toku_ftnode_pin(_holding_lock)
3864 // - At this point, toku_ftnode_pin_holding_lock has returned, with bfe.child_to_read set,
3865 // - ft_search_node is called, assuming that the node and its relevant partition are in memory.
3866 //
3867 ftnode_fetch_extra bfe;
3868 bfe.create_for_subset_read(
3869 ft,
3870 search,
3871 &ftcursor->range_lock_left_key,
3872 &ftcursor->range_lock_right_key,
3873 ftcursor->left_is_neg_infty,
3874 ftcursor->right_is_pos_infty,
3875 ftcursor->disable_prefetching,
3876 true // We may as well always read the whole root into memory, if it's a leaf node it's a tiny tree anyway.
3877 );
3878 FTNODE node = NULL;
3879 {
3880 uint32_t fullhash;
3881 CACHEKEY root_key;
3882 toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
3883 toku_pin_ftnode(
3884 ft,
3885 root_key,
3886 fullhash,
3887 &bfe,
3888 PL_READ, // may_modify_node set to false, because root cannot change during search
3889 &node,
3890 true
3891 );
3892 }
3893
3894 uint tree_height = node->height + 1; // How high is the tree? This is the height of the root node plus one (leaf is at height 0).
3895
3896
3897 struct unlock_ftnode_extra unlock_extra = {ft_handle,node,false};
3898 struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};
3899
3900 {
3901 bool doprefetch = false;
3902 //static int counter = 0; counter++;
3903 r = ft_search_node(ft_handle, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, ftcursor, &unlockers, (ANCESTORS)NULL, pivot_bounds::infinite_bounds(), can_bulk_fetch);
3904 if (r==TOKUDB_TRY_AGAIN) {
3905 // there are two cases where we get TOKUDB_TRY_AGAIN
3906 // case 1 is when some later call to toku_pin_ftnode returned
3907 // that value and unpinned all the nodes anyway. case 2
3908 // is when ft_search_node had to stop its search because
3909 // some piece of a node that it needed was not in memory.
3910 // In this case, the node was not unpinned, so we unpin it here
3911 if (unlockers.locked) {
3912 toku_unpin_ftnode_read_only(ft_handle->ft, node);
3913 }
3914 goto try_again;
3915 } else {
3916 assert(unlockers.locked);
3917 }
3918 }
3919
3920 assert(unlockers.locked);
3921 toku_unpin_ftnode_read_only(ft_handle->ft, node);
3922
3923
3924 //Heaviside function (+direction) queries define only a lower or upper
3925 //bound. Some queries require both an upper and lower bound.
3926 //They do this by wrapping the FT_GET_CALLBACK_FUNCTION with another
3927 //test that checks for the other bound. If the other bound fails,
3928 //it returns TOKUDB_FOUND_BUT_REJECTED which means not found, but
3929 //stop searching immediately, as opposed to DB_NOTFOUND
3930 //which can mean not found, but keep looking in another leaf.
3931 if (r==TOKUDB_FOUND_BUT_REJECTED) r = DB_NOTFOUND;
3932 else if (r==DB_NOTFOUND) {
3933 //We truly did not find an answer to the query.
3934 //Therefore, the FT_GET_CALLBACK_FUNCTION has NOT been called.
3935 //The contract specifies that the callback function must be called
3936 //for 'r= (0|DB_NOTFOUND|TOKUDB_FOUND_BUT_REJECTED)'
3937 //TODO: #1378 This is not the ultimate location of this call to the
3938 //callback. It is surely wrong for node-level locking, and probably
3939 //wrong for the STRADDLE callback for heaviside function(two sets of key/vals)
3940 int r2 = getf(0,NULL, 0,NULL, getf_v, false);
3941 if (r2!=0) r = r2;
3942 }
3943 { // accounting (to detect and measure thrashing)
3944 uint retrycount = trycount - 1; // how many retries were needed?
3945 if (retrycount) {
3946 FT_STATUS_INC(FT_TOTAL_RETRIES, retrycount);
3947 }
3948 if (retrycount > tree_height) { // if at least one node was read from disk more than once
3949 FT_STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHT, 1);
3950 if (retrycount > (tree_height+3))
3951 FT_STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHTPLUS3, 1);
3952 }
3953 }
3954 return r;
3955}
3956
3957/* ********************************* delete **************************************/
3958static int
3959getf_nothing (uint32_t UU(keylen), const void *UU(key), uint32_t UU(vallen), const void *UU(val), void *UU(pair_v), bool UU(lock_only)) {
3960 return 0;
3961}
3962
3963int toku_ft_cursor_delete(FT_CURSOR cursor, int flags, TOKUTXN txn) {
3964 int r;
3965
3966 int unchecked_flags = flags;
3967 bool error_if_missing = (bool) !(flags&DB_DELETE_ANY);
3968 unchecked_flags &= ~DB_DELETE_ANY;
3969 if (unchecked_flags!=0) r = EINVAL;
3970 else if (toku_ft_cursor_not_set(cursor)) r = EINVAL;
3971 else {
3972 r = 0;
3973 if (error_if_missing) {
3974 r = toku_ft_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL);
3975 }
3976 if (r == 0) {
3977 toku_ft_delete(cursor->ft_handle, &cursor->key, txn);
3978 }
3979 }
3980 return r;
3981}
3982
3983/* ********************* keyrange ************************ */
3984
3985struct keyrange_compare_s {
3986 FT ft;
3987 const DBT *key;
3988};
3989
3990// TODO: Remove me, I'm boring
3991static int keyrange_compare(DBT const &kdbt,
3992 const struct keyrange_compare_s &s) {
3993 return s.ft->cmp(&kdbt, s.key);
3994}
3995
3996static void keysrange_in_leaf_partition(FT_HANDLE ft_handle,
3997 FTNODE node,
3998 DBT *key_left,
3999 DBT *key_right,
4000 int left_child_number,
4001 int right_child_number,
4002 uint64_t estimated_num_rows,
4003 uint64_t *less,
4004 uint64_t *equal_left,
4005 uint64_t *middle,
4006 uint64_t *equal_right,
4007 uint64_t *greater,
4008 bool *single_basement_node)
4009// If the partition is in main memory then estimate the number
4010// Treat key_left == NULL as negative infinity
4011// Treat key_right == NULL as positive infinity
4012{
4013 paranoid_invariant(node->height == 0); // we are in a leaf
4014 paranoid_invariant(!(key_left == NULL && key_right != NULL));
4015 paranoid_invariant(left_child_number <= right_child_number);
4016 bool single_basement = left_child_number == right_child_number;
4017 paranoid_invariant(!single_basement ||
4018 (BP_STATE(node, left_child_number) == PT_AVAIL));
4019 if (BP_STATE(node, left_child_number) == PT_AVAIL) {
4020 int r;
4021 // The partition is in main memory then get an exact count.
4022 struct keyrange_compare_s s_left = {ft_handle->ft, key_left};
4023 BASEMENTNODE bn = BLB(node, left_child_number);
4024 uint32_t idx_left = 0;
4025 // if key_left is NULL then set r==-1 and idx==0.
4026 r = key_left
4027 ? bn->data_buffer.find_zero<decltype(s_left), keyrange_compare>(
4028 s_left, nullptr, nullptr, nullptr, &idx_left)
4029 : -1;
4030 *less = idx_left;
4031 *equal_left = (r == 0) ? 1 : 0;
4032
4033 uint32_t size = bn->data_buffer.num_klpairs();
4034 uint32_t idx_right = size;
4035 r = -1;
4036 if (single_basement && key_right) {
4037 struct keyrange_compare_s s_right = {ft_handle->ft, key_right};
4038 r = bn->data_buffer.find_zero<decltype(s_right), keyrange_compare>(
4039 s_right, nullptr, nullptr, nullptr, &idx_right);
4040 }
4041 *middle = idx_right - idx_left - *equal_left;
4042 *equal_right = (r == 0) ? 1 : 0;
4043 *greater = size - idx_right - *equal_right;
4044 } else {
4045 paranoid_invariant(!single_basement);
4046 uint32_t idx_left = estimated_num_rows / 2;
4047 if (!key_left) {
4048 // Both nullptr, assume key_left belongs before leftmost entry,
4049 // key_right belongs after rightmost entry
4050 idx_left = 0;
4051 paranoid_invariant(!key_right);
4052 }
4053 // Assume idx_left and idx_right point to where key_left and key_right
4054 // belong, (but are not there).
4055 *less = idx_left;
4056 *equal_left = 0;
4057 *middle = estimated_num_rows - idx_left;
4058 *equal_right = 0;
4059 *greater = 0;
4060 }
4061 *single_basement_node = single_basement;
4062}
4063
4064static int toku_ft_keysrange_internal(
4065 FT_HANDLE ft_handle,
4066 FTNODE node,
4067 DBT *key_left,
4068 DBT *key_right,
4069 bool may_find_right,
4070 uint64_t *less,
4071 uint64_t *equal_left,
4072 uint64_t *middle,
4073 uint64_t *equal_right,
4074 uint64_t *greater,
4075 bool *single_basement_node,
4076 uint64_t estimated_num_rows,
4077 ftnode_fetch_extra *min_bfe, // set up to read a minimal read.
4078 ftnode_fetch_extra
4079 *match_bfe, // set up to read a basement node iff both keys in it
4080 struct unlockers *unlockers,
4081 ANCESTORS ancestors,
4082 const pivot_bounds &bounds)
4083// Implementation note: Assign values to less, equal, and greater, and then on
4084// the way out (returning up the stack) we add more values in.
4085{
4086 int r = 0;
4087 // if KEY is NULL then use the leftmost key.
4088 int left_child_number =
4089 key_left ? toku_ftnode_which_child(node, key_left, ft_handle->ft->cmp)
4090 : 0;
4091 int right_child_number =
4092 node->n_children; // Sentinel that does not equal left_child_number.
4093 if (may_find_right) {
4094 right_child_number =
4095 key_right
4096 ? toku_ftnode_which_child(node, key_right, ft_handle->ft->cmp)
4097 : node->n_children - 1;
4098 }
4099
4100 uint64_t rows_per_child = estimated_num_rows / node->n_children;
4101 if (node->height == 0) {
4102 keysrange_in_leaf_partition(ft_handle,
4103 node,
4104 key_left,
4105 key_right,
4106 left_child_number,
4107 right_child_number,
4108 rows_per_child,
4109 less,
4110 equal_left,
4111 middle,
4112 equal_right,
4113 greater,
4114 single_basement_node);
4115
4116 *less += rows_per_child * left_child_number;
4117 if (*single_basement_node) {
4118 *greater +=
4119 rows_per_child * (node->n_children - left_child_number - 1);
4120 } else {
4121 *middle +=
4122 rows_per_child * (node->n_children - left_child_number - 1);
4123 }
4124 } else {
4125 // do the child.
4126 struct ancestors next_ancestors = {node, left_child_number, ancestors};
4127 BLOCKNUM childblocknum = BP_BLOCKNUM(node, left_child_number);
4128 uint32_t fullhash =
4129 compute_child_fullhash(ft_handle->ft->cf, node, left_child_number);
4130 FTNODE childnode;
4131 bool msgs_applied = false;
4132 bool child_may_find_right =
4133 may_find_right && left_child_number == right_child_number;
4134 r = toku_pin_ftnode_for_query(
4135 ft_handle,
4136 childblocknum,
4137 fullhash,
4138 unlockers,
4139 &next_ancestors,
4140 bounds,
4141 child_may_find_right ? match_bfe : min_bfe,
4142 false,
4143 &childnode,
4144 &msgs_applied);
4145 paranoid_invariant(!msgs_applied);
4146 if (r != TOKUDB_TRY_AGAIN) {
4147 assert_zero(r);
4148
4149 struct unlock_ftnode_extra unlock_extra = {
4150 ft_handle, childnode, false};
4151 struct unlockers next_unlockers = {
4152 true, unlock_ftnode_fun, (void *)&unlock_extra, unlockers};
4153 const pivot_bounds next_bounds =
4154 bounds.next_bounds(node, left_child_number);
4155
4156 r = toku_ft_keysrange_internal(ft_handle,
4157 childnode,
4158 key_left,
4159 key_right,
4160 child_may_find_right,
4161 less,
4162 equal_left,
4163 middle,
4164 equal_right,
4165 greater,
4166 single_basement_node,
4167 rows_per_child,
4168 min_bfe,
4169 match_bfe,
4170 &next_unlockers,
4171 &next_ancestors,
4172 next_bounds);
4173 if (r != TOKUDB_TRY_AGAIN) {
4174 assert_zero(r);
4175
4176 *less += rows_per_child * left_child_number;
4177 if (*single_basement_node) {
4178 *greater += rows_per_child *
4179 (node->n_children - left_child_number - 1);
4180 } else {
4181 *middle += rows_per_child *
4182 (node->n_children - left_child_number - 1);
4183 }
4184
4185 assert(unlockers->locked);
4186 toku_unpin_ftnode_read_only(ft_handle->ft, childnode);
4187 }
4188 }
4189 }
4190 return r;
4191}
4192
4193void toku_ft_keysrange(FT_HANDLE ft_handle,
4194 DBT *key_left,
4195 DBT *key_right,
4196 uint64_t *less_p,
4197 uint64_t *equal_left_p,
4198 uint64_t *middle_p,
4199 uint64_t *equal_right_p,
4200 uint64_t *greater_p,
4201 bool *middle_3_exact_p)
4202// Effect: Return an estimate of the number of keys to the left, the number
4203// equal (to left key), number between keys, number equal to right key, and the
4204// number to the right of both keys.
4205// The values are an estimate.
4206// If you perform a keyrange on two keys that are in the same basement,
4207// equal_less, middle, and equal_right will be exact.
4208// 4184: What to do with a NULL key?
4209// key_left==NULL is treated as -infinity
4210// key_right==NULL is treated as +infinity
4211// If KEY is NULL then the system picks an arbitrary key and returns it.
4212// key_right can be non-null only if key_left is non-null;
4213{
4214 if (!key_left && key_right) {
4215 // Simplify internals by only supporting key_right != null when key_left
4216 // != null
4217 // If key_right != null and key_left == null, then swap them and fix up
4218 // numbers.
4219 uint64_t less = 0, equal_left = 0, middle = 0, equal_right = 0,
4220 greater = 0;
4221 toku_ft_keysrange(ft_handle,
4222 key_right,
4223 nullptr,
4224 &less,
4225 &equal_left,
4226 &middle,
4227 &equal_right,
4228 &greater,
4229 middle_3_exact_p);
4230 *less_p = 0;
4231 *equal_left_p = 0;
4232 *middle_p = less;
4233 *equal_right_p = equal_left;
4234 *greater_p = middle;
4235 invariant_zero(equal_right);
4236 invariant_zero(greater);
4237 return;
4238 }
4239 paranoid_invariant(!(!key_left && key_right));
4240 ftnode_fetch_extra min_bfe;
4241 ftnode_fetch_extra match_bfe;
4242 min_bfe.create_for_min_read(
4243 ft_handle->ft); // read pivot keys but not message buffers
4244 match_bfe.create_for_keymatch(
4245 ft_handle->ft,
4246 key_left,
4247 key_right,
4248 false,
4249 false); // read basement node only if both keys in it.
4250try_again : {
4251 uint64_t less = 0, equal_left = 0, middle = 0, equal_right = 0, greater = 0;
4252 bool single_basement_node = false;
4253 FTNODE node = NULL;
4254 {
4255 uint32_t fullhash;
4256 CACHEKEY root_key;
4257 toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash);
4258 toku_pin_ftnode(
4259 ft_handle->ft,
4260 root_key,
4261 fullhash,
4262 &match_bfe,
4263 PL_READ, // may_modify_node, cannot change root during keyrange
4264 &node,
4265 true);
4266 }
4267
4268 struct unlock_ftnode_extra unlock_extra = {ft_handle, node, false};
4269 struct unlockers unlockers = {
4270 true, unlock_ftnode_fun, (void *)&unlock_extra, (UNLOCKERS)NULL};
4271
4272 {
4273 int r;
4274 int64_t numrows = ft_handle->ft->in_memory_logical_rows;
4275 if (numrows < 0)
4276 numrows = 0; // prevent appearance of a negative number
4277 r = toku_ft_keysrange_internal(ft_handle,
4278 node,
4279 key_left,
4280 key_right,
4281 true,
4282 &less,
4283 &equal_left,
4284 &middle,
4285 &equal_right,
4286 &greater,
4287 &single_basement_node,
4288 numrows,
4289 &min_bfe,
4290 &match_bfe,
4291 &unlockers,
4292 (ANCESTORS)NULL,
4293 pivot_bounds::infinite_bounds());
4294 assert(r == 0 || r == TOKUDB_TRY_AGAIN);
4295 if (r == TOKUDB_TRY_AGAIN) {
4296 assert(!unlockers.locked);
4297 goto try_again;
4298 }
4299 // May need to do a second query.
4300 if (!single_basement_node && key_right != nullptr) {
4301 // "greater" is stored in "middle"
4302 invariant_zero(equal_right);
4303 invariant_zero(greater);
4304 uint64_t less2 = 0, equal_left2 = 0, middle2 = 0, equal_right2 = 0,
4305 greater2 = 0;
4306 bool ignore;
4307 r = toku_ft_keysrange_internal(ft_handle,
4308 node,
4309 key_right,
4310 nullptr,
4311 false,
4312 &less2,
4313 &equal_left2,
4314 &middle2,
4315 &equal_right2,
4316 &greater2,
4317 &ignore,
4318 numrows,
4319 &min_bfe,
4320 &match_bfe,
4321 &unlockers,
4322 (ANCESTORS) nullptr,
4323 pivot_bounds::infinite_bounds());
4324 assert(r == 0 || r == TOKUDB_TRY_AGAIN);
4325 if (r == TOKUDB_TRY_AGAIN) {
4326 assert(!unlockers.locked);
4327 goto try_again;
4328 }
4329 invariant_zero(equal_right2);
4330 invariant_zero(greater2);
4331 // Update numbers.
4332 // less is already correct.
4333 // equal_left is already correct.
4334
4335 // "middle" currently holds everything greater than left_key in
4336 // first query
4337 // 'middle2' currently holds everything greater than right_key in
4338 // second query
4339 // 'equal_left2' is how many match right_key
4340
4341 // Prevent underflow.
4342 if (middle >= equal_left2 + middle2) {
4343 middle -= equal_left2 + middle2;
4344 } else {
4345 middle = 0;
4346 }
4347 equal_right = equal_left2;
4348 greater = middle2;
4349 }
4350 }
4351 assert(unlockers.locked);
4352 toku_unpin_ftnode_read_only(ft_handle->ft, node);
4353 if (!key_right) {
4354 paranoid_invariant_zero(equal_right);
4355 paranoid_invariant_zero(greater);
4356 }
4357 if (!key_left) {
4358 paranoid_invariant_zero(less);
4359 paranoid_invariant_zero(equal_left);
4360 }
4361 *less_p = less;
4362 *equal_left_p = equal_left;
4363 *middle_p = middle;
4364 *equal_right_p = equal_right;
4365 *greater_p = greater;
4366 *middle_3_exact_p = single_basement_node;
4367}
4368}
4369
4370struct get_key_after_bytes_iterate_extra {
4371 uint64_t skip_len;
4372 uint64_t *skipped;
4373 void (*callback)(const DBT *, uint64_t, void *);
4374 void *cb_extra;
4375};
4376
4377static int get_key_after_bytes_iterate(const void* key, const uint32_t keylen, const LEAFENTRY & le, const uint32_t UU(idx), struct get_key_after_bytes_iterate_extra * const e) {
4378 // only checking the latest val, mvcc will make this inaccurate
4379 uint64_t pairlen = keylen + le_latest_vallen(le);
4380 if (*e->skipped + pairlen > e->skip_len) {
4381 // found our key!
4382 DBT end_key;
4383 toku_fill_dbt(&end_key, key, keylen);
4384 e->callback(&end_key, *e->skipped, e->cb_extra);
4385 return 1;
4386 } else {
4387 *e->skipped += pairlen;
4388 return 0;
4389 }
4390}
4391
4392static int get_key_after_bytes_in_basementnode(FT ft, BASEMENTNODE bn, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
4393 int r;
4394 uint32_t idx_left = 0;
4395 if (start_key != nullptr) {
4396 struct keyrange_compare_s cmp = {ft, start_key};
4397 r = bn->data_buffer.find_zero<decltype(cmp), keyrange_compare>(cmp, nullptr, nullptr, nullptr, &idx_left);
4398 assert(r == 0 || r == DB_NOTFOUND);
4399 }
4400 struct get_key_after_bytes_iterate_extra iter_extra = {skip_len, skipped, callback, cb_extra};
4401 r = bn->data_buffer.iterate_on_range<get_key_after_bytes_iterate_extra, get_key_after_bytes_iterate>(idx_left, bn->data_buffer.num_klpairs(), &iter_extra);
4402
4403 // Invert the sense of r == 0 (meaning the iterate finished, which means we didn't find what we wanted)
4404 if (r == 1) {
4405 r = 0;
4406 } else {
4407 r = DB_NOTFOUND;
4408 }
4409 return r;
4410}
4411
4412static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped);
4413
4414static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, int childnum, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
4415 int r;
4416 struct ancestors next_ancestors = {node, childnum, ancestors};
4417 BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
4418 uint32_t fullhash = compute_child_fullhash(ft->cf, node, childnum);
4419 FTNODE child;
4420 bool msgs_applied = false;
4421 r = toku_pin_ftnode_for_query(ft_h, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, false, &child, &msgs_applied);
4422 paranoid_invariant(!msgs_applied);
4423 if (r == TOKUDB_TRY_AGAIN) {
4424 return r;
4425 }
4426 assert_zero(r);
4427 struct unlock_ftnode_extra unlock_extra = {ft_h, child, false};
4428 struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers};
4429 const pivot_bounds next_bounds = bounds.next_bounds(node, childnum);
4430 return get_key_after_bytes_in_subtree(ft_h, ft, child, &next_unlockers, &next_ancestors, next_bounds, bfe, search, subtree_bytes, start_key, skip_len, callback, cb_extra, skipped);
4431}
4432
4433static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
4434 int r;
4435 int childnum = toku_ft_search_which_child(ft->cmp, node, search);
4436 const uint64_t child_subtree_bytes = subtree_bytes / node->n_children;
4437 if (node->height == 0) {
4438 r = DB_NOTFOUND;
4439 for (int i = childnum; r == DB_NOTFOUND && i < node->n_children; ++i) {
4440 // The theory here is that a leaf node could only be very
4441 // unbalanced if it's dirty, which means all its basements are
4442 // available. So if a basement node is available, we should
4443 // check it as carefully as possible, but if it's compressed
4444 // or on disk, then it should be fairly well balanced so we
4445 // can trust the fanout calculation.
4446 if (BP_STATE(node, i) == PT_AVAIL) {
4447 r = get_key_after_bytes_in_basementnode(ft, BLB(node, i), (i == childnum) ? start_key : nullptr, skip_len, callback, cb_extra, skipped);
4448 } else {
4449 *skipped += child_subtree_bytes;
4450 if (*skipped >= skip_len && i < node->n_children - 1) {
4451 DBT pivot;
4452 callback(node->pivotkeys.fill_pivot(i, &pivot), *skipped, cb_extra);
4453 r = 0;
4454 }
4455 // Otherwise, r is still DB_NOTFOUND. If this is the last
4456 // basement node, we'll return DB_NOTFOUND and that's ok.
4457 // Some ancestor in the call stack will check the next
4458 // node over and that will call the callback, or if no
4459 // such node exists, we're at the max key and we should
4460 // return DB_NOTFOUND up to the top.
4461 }
4462 }
4463 } else {
4464 r = get_key_after_bytes_in_child(ft_h, ft, node, unlockers, ancestors, bounds, bfe, search, childnum, child_subtree_bytes, start_key, skip_len, callback, cb_extra, skipped);
4465 for (int i = childnum + 1; r == DB_NOTFOUND && i < node->n_children; ++i) {
4466 if (*skipped + child_subtree_bytes < skip_len) {
4467 *skipped += child_subtree_bytes;
4468 } else {
4469 r = get_key_after_bytes_in_child(ft_h, ft, node, unlockers, ancestors, bounds, bfe, search, i, child_subtree_bytes, nullptr, skip_len, callback, cb_extra, skipped);
4470 }
4471 }
4472 }
4473
4474 if (r != TOKUDB_TRY_AGAIN) {
4475 assert(unlockers->locked);
4476 toku_unpin_ftnode_read_only(ft, node);
4477 unlockers->locked = false;
4478 }
4479 return r;
4480}
4481
4482int toku_ft_get_key_after_bytes(FT_HANDLE ft_h, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *end_key, uint64_t actually_skipped, void *extra), void *cb_extra)
4483// Effect:
4484// Call callback with end_key set to the largest key such that the sum of the sizes of the key/val pairs in the range [start_key, end_key) is <= skip_len.
4485// Call callback with actually_skipped set to the sum of the sizes of the key/val pairs in the range [start_key, end_key).
4486// Notes:
4487// start_key == nullptr is interpreted as negative infinity.
4488// end_key == nullptr is interpreted as positive infinity.
4489// Only the latest val is counted toward the size, in the case of MVCC data.
4490// Implementation:
4491// This is an estimated calculation. We assume for a node that each of its subtrees have equal size. If the tree is a single basement node, then we will be accurate, but otherwise we could be quite off.
4492// Returns:
4493// 0 on success
4494// an error code otherwise
4495{
4496 FT ft = ft_h->ft;
4497 ftnode_fetch_extra bfe;
4498 bfe.create_for_min_read(ft);
4499 while (true) {
4500 FTNODE root;
4501 {
4502 uint32_t fullhash;
4503 CACHEKEY root_key;
4504 toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
4505 toku_pin_ftnode(ft, root_key, fullhash, &bfe, PL_READ, &root, true);
4506 }
4507 struct unlock_ftnode_extra unlock_extra = {ft_h, root, false};
4508 struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS) nullptr};
4509 ft_search search;
4510 ft_search_init(&search, (start_key == nullptr ? toku_ft_cursor_compare_one : toku_ft_cursor_compare_set_range), FT_SEARCH_LEFT, start_key, nullptr, ft_h);
4511
4512 int r;
4513 // We can't do this because of #5768, there may be dictionaries in the wild that have negative stats. This won't affect mongo so it's ok:
4514 //paranoid_invariant(ft->in_memory_stats.numbytes >= 0);
4515 int64_t numbytes = ft->in_memory_stats.numbytes;
4516 if (numbytes < 0) {
4517 numbytes = 0;
4518 }
4519 uint64_t skipped = 0;
4520 r = get_key_after_bytes_in_subtree(ft_h, ft, root, &unlockers, nullptr, pivot_bounds::infinite_bounds(), &bfe, &search, (uint64_t) numbytes, start_key, skip_len, callback, cb_extra, &skipped);
4521 assert(!unlockers.locked);
4522 if (r != TOKUDB_TRY_AGAIN) {
4523 if (r == DB_NOTFOUND) {
4524 callback(nullptr, skipped, cb_extra);
4525 r = 0;
4526 }
4527 return r;
4528 }
4529 }
4530}
4531
4532//Test-only wrapper for the old one-key range function
4533void toku_ft_keyrange(FT_HANDLE ft_handle, DBT *key, uint64_t *less, uint64_t *equal, uint64_t *greater) {
4534 uint64_t zero_equal_right, zero_greater;
4535 bool ignore;
4536 toku_ft_keysrange(ft_handle, key, nullptr, less, equal, greater, &zero_equal_right, &zero_greater, &ignore);
4537 invariant_zero(zero_equal_right);
4538 invariant_zero(zero_greater);
4539}
4540
4541void toku_ft_handle_stat64 (FT_HANDLE ft_handle, TOKUTXN UU(txn), struct ftstat64_s *s) {
4542 toku_ft_stat64(ft_handle->ft, s);
4543}
4544
4545void toku_ft_handle_get_fractal_tree_info64(FT_HANDLE ft_h, struct ftinfo64 *s) {
4546 toku_ft_get_fractal_tree_info64(ft_h->ft, s);
4547}
4548
4549int toku_ft_handle_iterate_fractal_tree_block_map(FT_HANDLE ft_h, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
4550 return toku_ft_iterate_fractal_tree_block_map(ft_h->ft, iter, iter_extra);
4551}
4552
4553/* ********************* debugging dump ************************ */
4554static int
4555toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth, const DBT *lorange, const DBT *hirange) {
4556 int result=0;
4557 FTNODE node;
4558 toku_get_node_for_verify(blocknum, ft_handle, &node);
4559 result=toku_verify_ftnode(ft_handle, ft_handle->ft->h->max_msn_in_ft, ft_handle->ft->h->max_msn_in_ft, false, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
4560 uint32_t fullhash = toku_cachetable_hash(ft_handle->ft->cf, blocknum);
4561 ftnode_fetch_extra bfe;
4562 bfe.create_for_full_read(ft_handle->ft);
4563 toku_pin_ftnode(
4564 ft_handle->ft,
4565 blocknum,
4566 fullhash,
4567 &bfe,
4568 PL_WRITE_EXPENSIVE,
4569 &node,
4570 true
4571 );
4572 assert(node->fullhash==fullhash);
4573 fprintf(file, "%*sNode=%p\n", depth, "", node);
4574
4575 fprintf(file, "%*sNode %" PRId64 " height=%d n_children=%d keyrange=%s %s\n",
4576 depth, "", blocknum.b, node->height, node->n_children, (char*)(lorange ? lorange->data : 0), (char*)(hirange ? hirange->data : 0));
4577 {
4578 int i;
4579 for (i=0; i+1< node->n_children; i++) {
4580 fprintf(file, "%*spivotkey %d =", depth+1, "", i);
4581 toku_print_BYTESTRING(file, node->pivotkeys.get_pivot(i).size, (char *) node->pivotkeys.get_pivot(i).data);
4582 fprintf(file, "\n");
4583 }
4584 for (i=0; i< node->n_children; i++) {
4585 if (node->height > 0) {
4586 NONLEAF_CHILDINFO bnc = BNC(node, i);
4587 fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_bnc_n_entries(bnc));
4588 struct print_msg_fn {
4589 FILE *file;
4590 int depth;
4591 print_msg_fn(FILE *f, int d) : file(f), depth(d) { }
4592 int operator()(const ft_msg &msg, bool UU(is_fresh)) {
4593 fprintf(file, "%*s xid=%" PRIu64 " %u (type=%d) msn=0x%" PRIu64 "\n",
4594 depth+2, "",
4595 toku_xids_get_innermost_xid(msg.xids()),
4596 static_cast<unsigned>(toku_dtoh32(*(int*)msg.kdbt()->data)),
4597 msg.type(), msg.msn().msn);
4598 return 0;
4599 }
4600 } print_fn(file, depth);
4601 bnc->msg_buffer.iterate(print_fn);
4602 }
4603 else {
4604 int size = BLB_DATA(node, i)->num_klpairs();
4605 if (0)
4606 for (int j=0; j<size; j++) {
4607 LEAFENTRY le;
4608 void* keyp = NULL;
4609 uint32_t keylen = 0;
4610 int r = BLB_DATA(node,i)->fetch_klpair(j, &le, &keylen, &keyp);
4611 assert_zero(r);
4612 fprintf(file, " [%d]=", j);
4613 print_klpair(file, keyp, keylen, le);
4614 fprintf(file, "\n");
4615 }
4616 fprintf(file, "\n");
4617 }
4618 }
4619 if (node->height > 0) {
4620 for (i=0; i<node->n_children; i++) {
4621 fprintf(file, "%*schild %d\n", depth, "", i);
4622 if (i>0) {
4623 char *CAST_FROM_VOIDP(key, node->pivotkeys.get_pivot(i - 1).data);
4624 fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->pivotkeys.get_pivot(i - 1).size, (unsigned)toku_dtoh32(*(int*)key));
4625 }
4626 DBT x, y;
4627 toku_dump_ftnode(file, ft_handle, BP_BLOCKNUM(node, i), depth+4,
4628 (i==0) ? lorange : node->pivotkeys.fill_pivot(i - 1, &x),
4629 (i==node->n_children-1) ? hirange : node->pivotkeys.fill_pivot(i, &y));
4630 }
4631 }
4632 }
4633 toku_unpin_ftnode(ft_handle->ft, node);
4634 return result;
4635}
4636
4637int toku_dump_ft(FILE *f, FT_HANDLE ft_handle) {
4638 FT ft = ft_handle->ft;
4639 invariant_notnull(ft);
4640 ft->blocktable.dump_translation_table(f);
4641
4642 uint32_t fullhash = 0;
4643 CACHEKEY root_key;
4644 toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash);
4645 return toku_dump_ftnode(f, ft_handle, root_key, 0, 0, 0);
4646}
4647
4648
4649static void toku_pfs_keys_init(const char *toku_instr_group_name) {
4650 kibbutz_mutex_key = new toku_instr_key(
4651 toku_instr_object_type::mutex, toku_instr_group_name, "kibbutz_mutex");
4652 minicron_p_mutex_key = new toku_instr_key(
4653 toku_instr_object_type::mutex, toku_instr_group_name,
4654 "minicron_p_mutex");
4655 queue_result_mutex_key = new toku_instr_key(
4656 toku_instr_object_type::mutex, toku_instr_group_name,
4657 "queue_result_mutex");
4658 tpool_lock_mutex_key = new toku_instr_key(
4659 toku_instr_object_type::mutex, toku_instr_group_name,
4660 "tpool_lock_mutex");
4661 workset_lock_mutex_key = new toku_instr_key(
4662 toku_instr_object_type::mutex, toku_instr_group_name,
4663 "workset_lock_mutex");
4664 bjm_jobs_lock_mutex_key = new toku_instr_key(
4665 toku_instr_object_type::mutex, toku_instr_group_name,
4666 "bjm_jobs_lock_mutex");
4667 log_internal_lock_mutex_key = new toku_instr_key(
4668 toku_instr_object_type::mutex, toku_instr_group_name,
4669 "log_internal_lock_mutex");
4670 cachetable_ev_thread_lock_mutex_key =
4671 new toku_instr_key(toku_instr_object_type::mutex,
4672 toku_instr_group_name,
4673 "cachetable_ev_thread_lock_mutex");
4674 cachetable_disk_nb_mutex_key = new toku_instr_key(
4675 toku_instr_object_type::mutex, toku_instr_group_name,
4676 "cachetable_disk_nb_mutex");
4677 safe_file_size_lock_mutex_key = new toku_instr_key(
4678 toku_instr_object_type::mutex, toku_instr_group_name,
4679 "safe_file_size_lock_mutex");
4680 cachetable_m_mutex_key = new toku_instr_key(
4681 toku_instr_object_type::mutex, toku_instr_group_name,
4682 "cachetable_m_mutex_key");
4683 checkpoint_safe_mutex_key = new toku_instr_key(
4684 toku_instr_object_type::mutex, toku_instr_group_name,
4685 "checkpoint_safe_mutex");
4686 ft_ref_lock_mutex_key = new toku_instr_key(
4687 toku_instr_object_type::mutex, toku_instr_group_name,
4688 "ft_ref_lock_mutex");
4689 ft_open_close_lock_mutex_key = new toku_instr_key(
4690 toku_instr_object_type::mutex, toku_instr_group_name,
4691 "ft_open_close_lock_mutex");
4692 loader_error_mutex_key = new toku_instr_key(
4693 toku_instr_object_type::mutex, toku_instr_group_name,
4694 "loader_error_mutex");
4695 bfs_mutex_key =
4696 new toku_instr_key(toku_instr_object_type::mutex, toku_instr_group_name,
4697 "bfs_mutex");
4698 loader_bl_mutex_key = new toku_instr_key(
4699 toku_instr_object_type::mutex, toku_instr_group_name,
4700 "loader_bl_mutex");
4701 loader_fi_lock_mutex_key = new toku_instr_key(
4702 toku_instr_object_type::mutex, toku_instr_group_name,
4703 "loader_fi_lock_mutex");
4704 loader_out_mutex_key = new toku_instr_key(
4705 toku_instr_object_type::mutex, toku_instr_group_name,
4706 "loader_out_mutex");
4707 result_output_condition_lock_mutex_key =
4708 new toku_instr_key(toku_instr_object_type::mutex,
4709 toku_instr_group_name,
4710 "result_output_condition_lock_mutex");
4711 block_table_mutex_key = new toku_instr_key(
4712 toku_instr_object_type::mutex, toku_instr_group_name,
4713 "block_table_mutex");
4714 rollback_log_node_cache_mutex_key = new toku_instr_key(
4715 toku_instr_object_type::mutex, toku_instr_group_name,
4716 "rollback_log_node_cache_mutex");
4717 txn_lock_mutex_key = new toku_instr_key(
4718 toku_instr_object_type::mutex, toku_instr_group_name, "txn_lock_mutex");
4719 txn_state_lock_mutex_key = new toku_instr_key(
4720 toku_instr_object_type::mutex, toku_instr_group_name,
4721 "txn_state_lock_mutex");
4722 txn_child_manager_mutex_key = new toku_instr_key(
4723 toku_instr_object_type::mutex, toku_instr_group_name,
4724 "txn_child_manager_mutex");
4725 txn_manager_lock_mutex_key = new toku_instr_key(
4726 toku_instr_object_type::mutex, toku_instr_group_name,
4727 "txn_manager_lock_mutex");
4728 treenode_mutex_key = new toku_instr_key(
4729 toku_instr_object_type::mutex, toku_instr_group_name, "treenode_mutex");
4730 locktree_request_info_mutex_key = new toku_instr_key(
4731 toku_instr_object_type::mutex, toku_instr_group_name,
4732 "locktree_request_info_mutex");
4733 locktree_request_info_retry_mutex_key = new toku_instr_key(
4734 toku_instr_object_type::mutex, toku_instr_group_name,
4735 "locktree_request_info_retry_mutex_key");
4736 manager_mutex_key = new toku_instr_key(
4737 toku_instr_object_type::mutex, toku_instr_group_name, "manager_mutex");
4738 manager_escalation_mutex_key = new toku_instr_key(
4739 toku_instr_object_type::mutex, toku_instr_group_name,
4740 "manager_escalation_mutex");
4741 db_txn_struct_i_txn_mutex_key = new toku_instr_key(
4742 toku_instr_object_type::mutex, toku_instr_group_name,
4743 "db_txn_struct_i_txn_mutex");
4744 manager_escalator_mutex_key = new toku_instr_key(
4745 toku_instr_object_type::mutex, toku_instr_group_name,
4746 "manager_escalator_mutex");
4747 indexer_i_indexer_lock_mutex_key = new toku_instr_key(
4748 toku_instr_object_type::mutex, toku_instr_group_name,
4749 "indexer_i_indexer_lock_mutex");
4750 indexer_i_indexer_estimate_lock_mutex_key =
4751 new toku_instr_key(toku_instr_object_type::mutex,
4752 toku_instr_group_name,
4753 "indexer_i_indexer_estimate_lock_mutex");
4754
4755 tokudb_file_data_key = new toku_instr_key(
4756 toku_instr_object_type::file, toku_instr_group_name, "tokudb_data_file");
4757 tokudb_file_load_key = new toku_instr_key(
4758 toku_instr_object_type::file, toku_instr_group_name, "tokudb_load_file");
4759 tokudb_file_tmp_key = new toku_instr_key(
4760 toku_instr_object_type::file, toku_instr_group_name, "tokudb_tmp_file");
4761 tokudb_file_log_key = new toku_instr_key(
4762 toku_instr_object_type::file, toku_instr_group_name, "tokudb_log_file");
4763
4764 fti_probe_1_key =
4765 new toku_instr_key(toku_instr_object_type::mutex, toku_instr_group_name,
4766 "fti_probe_1");
4767
4768 extractor_thread_key = new toku_instr_key(
4769 toku_instr_object_type::thread, toku_instr_group_name,
4770 "extractor_thread");
4771 fractal_thread_key = new toku_instr_key(
4772 toku_instr_object_type::thread, toku_instr_group_name, "fractal_thread");
4773 io_thread_key =
4774 new toku_instr_key(toku_instr_object_type::thread, toku_instr_group_name,
4775 "io_thread");
4776 eviction_thread_key = new toku_instr_key(
4777 toku_instr_object_type::thread, toku_instr_group_name,
4778 "eviction_thread");
4779 kibbutz_thread_key = new toku_instr_key(
4780 toku_instr_object_type::thread, toku_instr_group_name, "kibbutz_thread");
4781 minicron_thread_key = new toku_instr_key(
4782 toku_instr_object_type::thread, toku_instr_group_name,
4783 "minicron_thread");
4784 tp_internal_thread_key = new toku_instr_key(
4785 toku_instr_object_type::thread, toku_instr_group_name,
4786 "tp_internal_thread");
4787
4788 result_state_cond_key = new toku_instr_key(
4789 toku_instr_object_type::cond, toku_instr_group_name,
4790 "result_state_cond");
4791 bjm_jobs_wait_key = new toku_instr_key(
4792 toku_instr_object_type::cond, toku_instr_group_name, "bjm_jobs_wait");
4793 cachetable_p_refcount_wait_key = new toku_instr_key(
4794 toku_instr_object_type::cond, toku_instr_group_name,
4795 "cachetable_p_refcount_wait");
4796 cachetable_m_flow_control_cond_key = new toku_instr_key(
4797 toku_instr_object_type::cond, toku_instr_group_name,
4798 "cachetable_m_flow_control_cond");
4799 cachetable_m_ev_thread_cond_key = new toku_instr_key(
4800 toku_instr_object_type::cond, toku_instr_group_name,
4801 "cachetable_m_ev_thread_cond");
4802 bfs_cond_key =
4803 new toku_instr_key(toku_instr_object_type::cond, toku_instr_group_name,
4804 "bfs_cond");
4805 result_output_condition_key = new toku_instr_key(
4806 toku_instr_object_type::cond, toku_instr_group_name,
4807 "result_output_condition");
4808 manager_m_escalator_done_key = new toku_instr_key(
4809 toku_instr_object_type::cond, toku_instr_group_name,
4810 "manager_m_escalator_done");
4811 lock_request_m_wait_cond_key = new toku_instr_key(
4812 toku_instr_object_type::cond, toku_instr_group_name,
4813 "lock_request_m_wait_cond");
4814 queue_result_cond_key = new toku_instr_key(
4815 toku_instr_object_type::cond, toku_instr_group_name,
4816 "queue_result_cond");
4817 ws_worker_wait_key = new toku_instr_key(
4818 toku_instr_object_type::cond, toku_instr_group_name, "ws_worker_wait");
4819 rwlock_wait_read_key = new toku_instr_key(
4820 toku_instr_object_type::cond, toku_instr_group_name, "rwlock_wait_read");
4821 rwlock_wait_write_key = new toku_instr_key(
4822 toku_instr_object_type::cond, toku_instr_group_name,
4823 "rwlock_wait_write");
4824 rwlock_cond_key =
4825 new toku_instr_key(toku_instr_object_type::cond, toku_instr_group_name,
4826 "rwlock_cond");
4827 tp_thread_wait_key = new toku_instr_key(
4828 toku_instr_object_type::cond, toku_instr_group_name, "tp_thread_wait");
4829 tp_pool_wait_free_key = new toku_instr_key(
4830 toku_instr_object_type::cond, toku_instr_group_name,
4831 "tp_pool_wait_free");
4832 frwlock_m_wait_read_key = new toku_instr_key(
4833 toku_instr_object_type::cond, toku_instr_group_name,
4834 "frwlock_m_wait_read");
4835 kibbutz_k_cond_key = new toku_instr_key(
4836 toku_instr_object_type::cond, toku_instr_group_name, "kibbutz_k_cond");
4837 minicron_p_condvar_key = new toku_instr_key(
4838 toku_instr_object_type::cond, toku_instr_group_name,
4839 "minicron_p_condvar");
4840 locktree_request_info_retry_cv_key = new toku_instr_key(
4841 toku_instr_object_type::cond, toku_instr_group_name,
4842 "locktree_request_info_retry_cv_key");
4843
4844 multi_operation_lock_key = new toku_instr_key(
4845 toku_instr_object_type::rwlock, toku_instr_group_name,
4846 "multi_operation_lock");
4847 low_priority_multi_operation_lock_key =
4848 new toku_instr_key(toku_instr_object_type::rwlock,
4849 toku_instr_group_name,
4850 "low_priority_multi_operation_lock");
4851 cachetable_m_list_lock_key = new toku_instr_key(
4852 toku_instr_object_type::rwlock, toku_instr_group_name,
4853 "cachetable_m_list_lock");
4854 cachetable_m_pending_lock_expensive_key =
4855 new toku_instr_key(toku_instr_object_type::rwlock,
4856 toku_instr_group_name,
4857 "cachetable_m_pending_lock_expensive");
4858 cachetable_m_pending_lock_cheap_key =
4859 new toku_instr_key(toku_instr_object_type::rwlock,
4860 toku_instr_group_name,
4861 "cachetable_m_pending_lock_cheap");
4862 cachetable_m_lock_key = new toku_instr_key(
4863 toku_instr_object_type::rwlock, toku_instr_group_name,
4864 "cachetable_m_lock");
4865 result_i_open_dbs_rwlock_key = new toku_instr_key(
4866 toku_instr_object_type::rwlock, toku_instr_group_name,
4867 "result_i_open_dbs_rwlock");
4868 checkpoint_safe_rwlock_key = new toku_instr_key(
4869 toku_instr_object_type::rwlock, toku_instr_group_name,
4870 "checkpoint_safe_rwlock");
4871 cachetable_value_key = new toku_instr_key(
4872 toku_instr_object_type::rwlock, toku_instr_group_name,
4873 "cachetable_value");
4874 safe_file_size_lock_rwlock_key = new toku_instr_key(
4875 toku_instr_object_type::rwlock, toku_instr_group_name,
4876 "safe_file_size_lock_rwlock");
4877 cachetable_disk_nb_rwlock_key = new toku_instr_key(
4878 toku_instr_object_type::rwlock, toku_instr_group_name,
4879 "cachetable_disk_nb_rwlock");
4880
4881 toku_instr_probe_1 = new toku_instr_probe(*fti_probe_1_key);
4882}
4883
4884static void toku_pfs_keys_destroy(void) {
4885 delete kibbutz_mutex_key;
4886 delete minicron_p_mutex_key;
4887 delete queue_result_mutex_key;
4888 delete tpool_lock_mutex_key;
4889 delete workset_lock_mutex_key;
4890 delete bjm_jobs_lock_mutex_key;
4891 delete log_internal_lock_mutex_key;
4892 delete cachetable_ev_thread_lock_mutex_key;
4893 delete cachetable_disk_nb_mutex_key;
4894 delete safe_file_size_lock_mutex_key;
4895 delete cachetable_m_mutex_key;
4896 delete checkpoint_safe_mutex_key;
4897 delete ft_ref_lock_mutex_key;
4898 delete ft_open_close_lock_mutex_key;
4899 delete loader_error_mutex_key;
4900 delete bfs_mutex_key;
4901 delete loader_bl_mutex_key;
4902 delete loader_fi_lock_mutex_key;
4903 delete loader_out_mutex_key;
4904 delete result_output_condition_lock_mutex_key;
4905 delete block_table_mutex_key;
4906 delete rollback_log_node_cache_mutex_key;
4907 delete txn_lock_mutex_key;
4908 delete txn_state_lock_mutex_key;
4909 delete txn_child_manager_mutex_key;
4910 delete txn_manager_lock_mutex_key;
4911 delete treenode_mutex_key;
4912 delete locktree_request_info_mutex_key;
4913 delete locktree_request_info_retry_mutex_key;
4914 delete manager_mutex_key;
4915 delete manager_escalation_mutex_key;
4916 delete db_txn_struct_i_txn_mutex_key;
4917 delete manager_escalator_mutex_key;
4918 delete indexer_i_indexer_lock_mutex_key;
4919 delete indexer_i_indexer_estimate_lock_mutex_key;
4920
4921 delete tokudb_file_data_key;
4922 delete tokudb_file_load_key;
4923 delete tokudb_file_tmp_key;
4924 delete tokudb_file_log_key;
4925
4926 delete fti_probe_1_key;
4927
4928 delete extractor_thread_key;
4929 delete fractal_thread_key;
4930 delete io_thread_key;
4931 delete eviction_thread_key;
4932 delete kibbutz_thread_key;
4933 delete minicron_thread_key;
4934 delete tp_internal_thread_key;
4935
4936 delete result_state_cond_key;
4937 delete bjm_jobs_wait_key;
4938 delete cachetable_p_refcount_wait_key;
4939 delete cachetable_m_flow_control_cond_key;
4940 delete cachetable_m_ev_thread_cond_key;
4941 delete bfs_cond_key;
4942 delete result_output_condition_key;
4943 delete manager_m_escalator_done_key;
4944 delete lock_request_m_wait_cond_key;
4945 delete queue_result_cond_key;
4946 delete ws_worker_wait_key;
4947 delete rwlock_wait_read_key;
4948 delete rwlock_wait_write_key;
4949 delete rwlock_cond_key;
4950 delete tp_thread_wait_key;
4951 delete tp_pool_wait_free_key;
4952 delete frwlock_m_wait_read_key;
4953 delete kibbutz_k_cond_key;
4954 delete minicron_p_condvar_key;
4955 delete locktree_request_info_retry_cv_key;
4956
4957 delete multi_operation_lock_key;
4958 delete low_priority_multi_operation_lock_key;
4959 delete cachetable_m_list_lock_key;
4960 delete cachetable_m_pending_lock_expensive_key;
4961 delete cachetable_m_pending_lock_cheap_key;
4962 delete cachetable_m_lock_key;
4963 delete result_i_open_dbs_rwlock_key;
4964 delete checkpoint_safe_rwlock_key;
4965 delete cachetable_value_key;
4966 delete safe_file_size_lock_rwlock_key;
4967
4968 delete cachetable_disk_nb_rwlock_key;
4969 delete toku_instr_probe_1;
4970}
4971
4972int toku_ft_layer_init(void) {
4973 int r = 0;
4974
4975 // Portability must be initialized first
4976 r = toku_portability_init();
4977 if (r) {
4978 goto exit;
4979 }
4980
4981 toku_pfs_keys_init("fti");
4982
4983 r = db_env_set_toku_product_name("tokudb");
4984 if (r) {
4985 goto exit;
4986 }
4987
4988 partitioned_counters_init();
4989 toku_status_init();
4990 toku_context_status_init();
4991 toku_checkpoint_init();
4992 toku_ft_serialize_layer_init();
4993 toku_mutex_init(
4994 *ft_open_close_lock_mutex_key, &ft_open_close_lock, nullptr);
4995 toku_scoped_malloc_init();
4996exit:
4997 return r;
4998}
4999
5000void toku_ft_layer_destroy(void) {
5001 toku_mutex_destroy(&ft_open_close_lock);
5002 toku_ft_serialize_layer_destroy();
5003 toku_checkpoint_destroy();
5004 toku_context_status_destroy();
5005 toku_status_destroy();
5006 partitioned_counters_destroy();
5007 toku_scoped_malloc_destroy();
5008 toku_pfs_keys_destroy();
5009
5010 // Portability must be cleaned up last
5011 toku_portability_destroy();
5012}
5013
5014// This lock serializes all opens and closes because the cachetable requires that clients do not try to open or close a cachefile in parallel. We made
5015// it coarser by not allowing any cachefiles to be open or closed in parallel.
5016void toku_ft_open_close_lock(void) {
5017 toku_mutex_lock(&ft_open_close_lock);
5018}
5019
5020void toku_ft_open_close_unlock(void) {
5021 toku_mutex_unlock(&ft_open_close_lock);
5022}
5023
5024// Prepare to remove a dictionary from the database when this transaction is committed:
5025// - mark transaction as NEED fsync on commit
5026// - make entry in rollback log
5027// - make fdelete entry in recovery log
5028//
5029// Effect: when the txn commits, the ft's cachefile will be marked as unlink
5030// on close. see toku_commit_fdelete and how unlink on close works
5031// in toku_cachefile_close();
5032// Requires: serialized with begin checkpoint
5033// this does not need to take the open close lock because
5034// 1.) the ft/cf cannot go away because we have a live handle.
5035// 2.) we're not setting the unlink on close bit _here_. that
5036// happens on txn commit (as the name suggests).
5037// 3.) we're already holding the multi operation lock to
5038// synchronize with begin checkpoint.
5039// Contract: the iname of the ft should never be reused.
5040void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn) {
5041 assert(txn);
5042
5043 CACHEFILE cf = handle->ft->cf;
5044 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
5045
5046 toku_txn_maybe_note_ft(txn, ft);
5047
5048 // If the txn commits, the commit MUST be in the log before the file is actually unlinked
5049 toku_txn_force_fsync_on_commit(txn);
5050 // make entry in rollback log
5051 FILENUM filenum = toku_cachefile_filenum(cf);
5052 toku_logger_save_rollback_fdelete(txn, filenum);
5053 // make entry in recovery log
5054 toku_logger_log_fdelete(txn, filenum);
5055}
5056
5057// Non-transactional version of fdelete
5058//
5059// Effect: The ft file is unlinked when the handle closes and it's ft is not
5060// pinned by checkpoint. see toku_remove_ft_ref() and how unlink on
5061// close works in toku_cachefile_close();
5062// Requires: serialized with begin checkpoint
5063void toku_ft_unlink(FT_HANDLE handle) {
5064 CACHEFILE cf;
5065 cf = handle->ft->cf;
5066 toku_cachefile_unlink_on_close(cf);
5067}
5068
5069int toku_ft_rename_iname(DB_TXN *txn,
5070 const char *data_dir,
5071 const char *old_iname,
5072 const char *new_iname,
5073 CACHETABLE ct) {
5074 int r = 0;
5075
5076 std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(nullptr,
5077 &toku_free);
5078 std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(nullptr,
5079 &toku_free);
5080
5081 new_iname_full.reset(toku_construct_full_name(2, data_dir, new_iname));
5082 old_iname_full.reset(toku_construct_full_name(2, data_dir, old_iname));
5083
5084 if (txn) {
5085 BYTESTRING bs_old_name = {static_cast<uint32_t>(strlen(old_iname) + 1),
5086 const_cast<char *>(old_iname)};
5087 BYTESTRING bs_new_name = {static_cast<uint32_t>(strlen(new_iname) + 1),
5088 const_cast<char *>(new_iname)};
5089 FILENUM filenum = FILENUM_NONE;
5090 {
5091 CACHEFILE cf;
5092 r = toku_cachefile_of_iname_in_env(ct, old_iname, &cf);
5093 if (r != ENOENT) {
5094 char *old_fname_in_cf = toku_cachefile_fname_in_env(cf);
5095 toku_cachefile_set_fname_in_env(cf, toku_xstrdup(new_iname));
5096 toku_free(old_fname_in_cf);
5097 filenum = toku_cachefile_filenum(cf);
5098 }
5099 }
5100 toku_logger_save_rollback_frename(
5101 db_txn_struct_i(txn)->tokutxn, &bs_old_name, &bs_new_name);
5102 toku_log_frename(db_txn_struct_i(txn)->tokutxn->logger,
5103 (LSN *)0,
5104 0,
5105 toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn),
5106 bs_old_name,
5107 filenum,
5108 bs_new_name);
5109 }
5110
5111 if (!toku_create_subdirs_if_needed(new_iname_full.get()))
5112 return get_error_errno();
5113 r = toku_os_rename(old_iname_full.get(), new_iname_full.get());
5114 if (r != 0)
5115 return r;
5116 r = toku_fsync_directory(new_iname_full.get());
5117 return r;
5118}
5119
5120int toku_ft_get_fragmentation(FT_HANDLE ft_handle, TOKU_DB_FRAGMENTATION report) {
5121 int fd = toku_cachefile_get_fd(ft_handle->ft->cf);
5122 toku_ft_lock(ft_handle->ft);
5123
5124 int64_t file_size;
5125 int r = toku_os_get_file_size(fd, &file_size);
5126 if (r == 0) {
5127 report->file_size_bytes = file_size;
5128 ft_handle->ft->blocktable.get_fragmentation_unlocked(report);
5129 }
5130 toku_ft_unlock(ft_handle->ft);
5131 return r;
5132}
5133
5134static bool is_empty_fast_iter (FT_HANDLE ft_handle, FTNODE node) {
5135 if (node->height > 0) {
5136 for (int childnum=0; childnum<node->n_children; childnum++) {
5137 if (toku_bnc_nbytesinbuf(BNC(node, childnum)) != 0) {
5138 return 0; // it's not empty if there are bytes in buffers
5139 }
5140 FTNODE childnode;
5141 {
5142 BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
5143 uint32_t fullhash = compute_child_fullhash(ft_handle->ft->cf, node, childnum);
5144 ftnode_fetch_extra bfe;
5145 bfe.create_for_full_read(ft_handle->ft);
5146 // don't need to pass in dependent nodes as we are not
5147 // modifying nodes we are pinning
5148 toku_pin_ftnode(
5149 ft_handle->ft,
5150 childblocknum,
5151 fullhash,
5152 &bfe,
5153 PL_READ, // may_modify_node set to false, as nodes not modified
5154 &childnode,
5155 true
5156 );
5157 }
5158 int child_is_empty = is_empty_fast_iter(ft_handle, childnode);
5159 toku_unpin_ftnode(ft_handle->ft, childnode);
5160 if (!child_is_empty) return 0;
5161 }
5162 return 1;
5163 } else {
5164 // leaf: If the dmt is empty, we are happy.
5165 for (int i = 0; i < node->n_children; i++) {
5166 if (BLB_DATA(node, i)->num_klpairs()) {
5167 return false;
5168 }
5169 }
5170 return true;
5171 }
5172}
5173
5174bool toku_ft_is_empty_fast (FT_HANDLE ft_handle)
5175// A fast check to see if the tree is empty. If there are any messages or leafentries, we consider the tree to be nonempty. It's possible that those
5176// messages and leafentries would all optimize away and that the tree is empty, but we'll say it is nonempty.
5177{
5178 uint32_t fullhash;
5179 FTNODE node;
5180 {
5181 CACHEKEY root_key;
5182 toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash);
5183 ftnode_fetch_extra bfe;
5184 bfe.create_for_full_read(ft_handle->ft);
5185 toku_pin_ftnode(
5186 ft_handle->ft,
5187 root_key,
5188 fullhash,
5189 &bfe,
5190 PL_READ, // may_modify_node set to false, node does not change
5191 &node,
5192 true
5193 );
5194 }
5195 bool r = is_empty_fast_iter(ft_handle, node);
5196 toku_unpin_ftnode(ft_handle->ft, node);
5197 return r;
5198}
5199
5200// test-only
5201int toku_ft_strerror_r(int error, char *buf, size_t buflen)
5202{
5203 if (error>=0) {
5204 return (long) strerror_r(error, buf, buflen);
5205 } else {
5206 switch (error) {
5207 case DB_KEYEXIST:
5208 snprintf(buf, buflen, "Key exists");
5209 return 0;
5210 case TOKUDB_CANCELED:
5211 snprintf(buf, buflen, "User canceled operation");
5212 return 0;
5213 default:
5214 snprintf(buf, buflen, "Unknown error %d", error);
5215 return EINVAL;
5216 }
5217 }
5218}
5219
5220int toku_keycompare(const void *key1, uint32_t key1len, const void *key2, uint32_t key2len) {
5221 int comparelen = key1len < key2len ? key1len : key2len;
5222 int c = memcmp(key1, key2, comparelen);
5223 if (__builtin_expect(c != 0, 1)) {
5224 return c;
5225 } else {
5226 if (key1len < key2len) {
5227 return -1;
5228 } else if (key1len > key2len) {
5229 return 1;
5230 } else {
5231 return 0;
5232 }
5233 }
5234}
5235
5236int toku_builtin_compare_fun(DB *db __attribute__((__unused__)), const DBT *a, const DBT*b) {
5237 return toku_keycompare(a->data, a->size, b->data, b->size);
5238}
5239
5240#include <toku_race_tools.h>
5241void __attribute__((__constructor__)) toku_ft_helgrind_ignore(void);
5242void
5243toku_ft_helgrind_ignore(void) {
5244 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ft_status, sizeof ft_status);
5245}
5246