1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3/*======
4This file is part of PerconaFT.
5
6
7Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8
9 PerconaFT is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License, version 2,
11 as published by the Free Software Foundation.
12
13 PerconaFT is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
20
21----------------------------------------
22
23 PerconaFT is free software: you can redistribute it and/or modify
24 it under the terms of the GNU Affero General Public License, version 3,
25 as published by the Free Software Foundation.
26
27 PerconaFT is distributed in the hope that it will be useful,
28 but WITHOUT ANY WARRANTY; without even the implied warranty of
29 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 GNU Affero General Public License for more details.
31
32 You should have received a copy of the GNU Affero General Public License
33 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
34======= */
35
36#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37
38#include <my_global.h>
39#include "ft/ft.h"
40#include "ft/ft-internal.h"
41#include "ft/serialize/ft_node-serialize.h"
42#include "ft/node.h"
43#include "ft/serialize/rbuf.h"
44#include "ft/serialize/wbuf.h"
45#include "util/scoped_malloc.h"
46#include "util/sort.h"
47
48// Effect: Fill in N as an empty ftnode.
49// TODO: Rename toku_ftnode_create
50void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) {
51 paranoid_invariant(layout_version != 0);
52 paranoid_invariant(height >= 0);
53
54 n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others
55 n->flags = flags;
56 n->blocknum = blocknum;
57 n->layout_version = layout_version;
58 n->layout_version_original = layout_version;
59 n->layout_version_read_from_disk = layout_version;
60 n->height = height;
61 n->pivotkeys.create_empty();
62 n->bp = 0;
63 n->n_children = num_children;
64 n->oldest_referenced_xid_known = TXNID_NONE;
65
66 if (num_children > 0) {
67 XMALLOC_N(num_children, n->bp);
68 for (int i = 0; i < num_children; i++) {
69 BP_BLOCKNUM(n,i).b=0;
70 BP_STATE(n,i) = PT_INVALID;
71 BP_WORKDONE(n,i) = 0;
72 BP_INIT_TOUCHED_CLOCK(n, i);
73 set_BNULL(n,i);
74 if (height > 0) {
75 set_BNC(n, i, toku_create_empty_nl());
76 } else {
77 set_BLB(n, i, toku_create_empty_bn());
78 }
79 }
80 }
81 n->dirty = 1; // special case exception, it's okay to mark as dirty because the basements are empty
82
83 toku_ft_status_note_ftnode(height, true);
84}
85
86// destroys the internals of the ftnode, but it does not free the values
87// that are stored
88// this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf
89// MUST NOT do anything besides free the structures that have been allocated
90void toku_destroy_ftnode_internals(FTNODE node) {
91 node->pivotkeys.destroy();
92 for (int i = 0; i < node->n_children; i++) {
93 if (BP_STATE(node,i) == PT_AVAIL) {
94 if (node->height > 0) {
95 destroy_nonleaf_childinfo(BNC(node,i));
96 } else {
97 paranoid_invariant(BLB_LRD(node, i) == 0);
98 destroy_basement_node(BLB(node, i));
99 }
100 } else if (BP_STATE(node,i) == PT_COMPRESSED) {
101 SUB_BLOCK sb = BSB(node,i);
102 toku_free(sb->compressed_ptr);
103 toku_free(sb);
104 } else {
105 paranoid_invariant(is_BNULL(node, i));
106 }
107 set_BNULL(node, i);
108 }
109 toku_free(node->bp);
110 node->bp = NULL;
111}
112
113/* Frees a node, including all the stuff in the hash table. */
114void toku_ftnode_free(FTNODE *nodep) {
115 FTNODE node = *nodep;
116 toku_ft_status_note_ftnode(node->height, false);
117 toku_destroy_ftnode_internals(node);
118 toku_free(node);
119 *nodep = nullptr;
120}
121
122void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) {
123 STAT64INFO_S deltas = ZEROSTATS;
124 // capture deltas before rebalancing basements for serialization
125 deltas = toku_get_and_clear_basement_stats(ftnode);
126 // locking not necessary here with respect to checkpointing
127 // in Clayface (because of the pending lock and cachetable lock
128 // in toku_cachetable_begin_checkpoint)
129 // essentially, if we are dealing with a for_checkpoint
130 // parameter in a function that is called by the flush_callback,
131 // then the cachetable needs to ensure that this is called in a safe
132 // manner that does not interfere with the beginning
133 // of a checkpoint, which it does with the cachetable lock
134 // and pending lock
135 toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
136 if (for_checkpoint) {
137 toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
138 }
139}
140
141void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
142 for (int i = 0; i < node->n_children; i++) {
143 BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
144 paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
145 BP_STATE(cloned_node,i) = PT_AVAIL;
146 BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
147 if (node->height == 0) {
148 set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
149 } else {
150 set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
151 }
152 }
153}
154
155void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) {
156 // free the basement node
157 assert(!node->dirty);
158 BASEMENTNODE bn = BLB(node, childnum);
159 toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta);
160 destroy_basement_node(bn);
161 set_BNULL(node, childnum);
162 BP_STATE(node, childnum) = PT_ON_DISK;
163}
164
165BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) {
166 assert(BP_STATE(node, childnum) == PT_AVAIL);
167 BASEMENTNODE bn = BLB(node, childnum);
168 set_BNULL(node, childnum);
169 BP_STATE(node, childnum) = PT_ON_DISK;
170 return bn;
171}
172
173//
174// Orthopush
175//
176
177struct store_msg_buffer_offset_extra {
178 int32_t *offsets;
179 int i;
180};
181
182int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra) __attribute__((nonnull(3)));
183int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra)
184{
185 extra->offsets[extra->i] = offset;
186 extra->i++;
187 return 0;
188}
189
190/**
191 * Given pointers to offsets within a message buffer where we can find messages,
192 * figure out the MSN of each message, and compare those MSNs. Returns 1,
193 * 0, or -1 if a is larger than, equal to, or smaller than b.
194 */
195int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo);
196int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo)
197{
198 MSN amsn, bmsn;
199 msg_buffer.get_message_key_msn(ao, nullptr, &amsn);
200 msg_buffer.get_message_key_msn(bo, nullptr, &bmsn);
201 if (amsn.msn > bmsn.msn) {
202 return +1;
203 }
204 if (amsn.msn < bmsn.msn) {
205 return -1;
206 }
207 return 0;
208}
209
210/**
211 * Given a message buffer and and offset, apply the message with
212 * toku_ft_bn_apply_msg, or discard it,
213 * based on its MSN and the MSN of the basement node.
214 */
215static void do_bn_apply_msg(
216 FT_HANDLE ft_handle,
217 BASEMENTNODE bn,
218 message_buffer* msg_buffer,
219 int32_t offset,
220 txn_gc_info* gc_info,
221 uint64_t* workdone,
222 STAT64INFO stats_to_update,
223 int64_t* logical_rows_delta) {
224
225 DBT k, v;
226 ft_msg msg = msg_buffer->get_message(offset, &k, &v);
227
228 // The messages are being iterated over in (key,msn) order or just in
229 // msn order, so all the messages for one key, from one buffer, are in
230 // ascending msn order. So it's ok that we don't update the basement
231 // node's msn until the end.
232 if (msg.msn().msn > bn->max_msn_applied.msn) {
233 toku_ft_bn_apply_msg(
234 ft_handle->ft->cmp,
235 ft_handle->ft->update_fun,
236 bn,
237 msg,
238 gc_info,
239 workdone,
240 stats_to_update,
241 logical_rows_delta);
242 } else {
243 toku_ft_status_note_msn_discard();
244 }
245
246 // We must always mark message as stale since it has been marked
247 // (using omt::iterate_and_mark_range)
248 // It is possible to call do_bn_apply_msg even when it won't apply the
249 // message because the node containing it could have been evicted and
250 // brought back in.
251 msg_buffer->set_freshness(offset, false);
252}
253
254
255struct iterate_do_bn_apply_msg_extra {
256 FT_HANDLE t;
257 BASEMENTNODE bn;
258 NONLEAF_CHILDINFO bnc;
259 txn_gc_info *gc_info;
260 uint64_t *workdone;
261 STAT64INFO stats_to_update;
262 int64_t *logical_rows_delta;
263};
264
265int iterate_do_bn_apply_msg(
266 const int32_t &offset,
267 const uint32_t UU(idx),
268 struct iterate_do_bn_apply_msg_extra* const e)
269 __attribute__((nonnull(3)));
270
271int iterate_do_bn_apply_msg(
272 const int32_t &offset,
273 const uint32_t UU(idx),
274 struct iterate_do_bn_apply_msg_extra* const e)
275{
276 do_bn_apply_msg(
277 e->t,
278 e->bn,
279 &e->bnc->msg_buffer,
280 offset,
281 e->gc_info,
282 e->workdone,
283 e->stats_to_update,
284 e->logical_rows_delta);
285 return 0;
286}
287
288/**
289 * Given the bounds of the basement node to which we will apply messages,
290 * find the indexes within message_tree which contain the range of
291 * relevant messages.
292 *
293 * The message tree contains offsets into the buffer, where messages are
294 * found. The pivot_bounds are the lower bound exclusive and upper bound
295 * inclusive, because they come from pivot keys in the tree. We want OMT
296 * indices, which must have the lower bound be inclusive and the upper
297 * bound exclusive. We will get these by telling omt::find to look
298 * for something strictly bigger than each of our pivot bounds.
299 *
300 * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
301 * bound exclusive).
302 */
303template<typename find_bounds_omt_t>
304static void
305find_bounds_within_message_tree(
306 const toku::comparator &cmp,
307 const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices
308 message_buffer *msg_buffer, /// message buffer in which messages are found
309 const pivot_bounds &bounds, /// key bounds within the basement node we're applying messages to
310 uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
311 uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree)
312 )
313{
314 int r = 0;
315
316 if (!toku_dbt_is_empty(bounds.lbe())) {
317 // By setting msn to MAX_MSN and by using direction of +1, we will
318 // get the first message greater than (in (key, msn) order) any
319 // message (with any msn) with the key lower_bound_exclusive.
320 // This will be a message we want to try applying, so it is the
321 // "lower bound inclusive" within the message_tree.
322 struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds.lbe(), MAX_MSN);
323 int32_t found_lb;
324 r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
325 if (r == DB_NOTFOUND) {
326 // There is no relevant data (the lower bound is bigger than
327 // any message in this tree), so we have no range and we're
328 // done.
329 *lbi = 0;
330 *ube = 0;
331 return;
332 }
333 if (!toku_dbt_is_empty(bounds.ubi())) {
334 // Check if what we found for lbi is greater than the upper
335 // bound inclusive that we have. If so, there are no relevant
336 // messages between these bounds.
337 const DBT *ubi = bounds.ubi();
338 const int32_t offset = found_lb;
339 DBT found_lbidbt;
340 msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
341 int c = cmp(&found_lbidbt, ubi);
342 // These DBTs really are both inclusive bounds, so we need
343 // strict inequality in order to determine that there's
344 // nothing between them. If they're equal, then we actually
345 // need to apply the message pointed to by lbi, and also
346 // anything with the same key but a bigger msn.
347 if (c > 0) {
348 *lbi = 0;
349 *ube = 0;
350 return;
351 }
352 }
353 } else {
354 // No lower bound given, it's negative infinity, so we start at
355 // the first message in the OMT.
356 *lbi = 0;
357 }
358 if (!toku_dbt_is_empty(bounds.ubi())) {
359 // Again, we use an msn of MAX_MSN and a direction of +1 to get
360 // the first thing bigger than the upper_bound_inclusive key.
361 // This is therefore the smallest thing we don't want to apply,
362 // and omt::iterate_on_range will not examine it.
363 struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds.ubi(), MAX_MSN);
364 r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
365 if (r == DB_NOTFOUND) {
366 // Couldn't find anything in the buffer bigger than our key,
367 // so we need to look at everything up to the end of
368 // message_tree.
369 *ube = message_tree.size();
370 }
371 } else {
372 // No upper bound given, it's positive infinity, so we need to go
373 // through the end of the OMT.
374 *ube = message_tree.size();
375 }
376}
377
378// For each message in the ancestor's buffer (determined by childnum) that
379// is key-wise between lower_bound_exclusive and upper_bound_inclusive,
380// apply the message to the basement node. We treat the bounds as minus
381// or plus infinity respectively if they are NULL. Do not mark the node
382// as dirty (preserve previous state of 'dirty' bit).
383static void bnc_apply_messages_to_basement_node(
384 FT_HANDLE t, // used for comparison function
385 BASEMENTNODE bn, // where to apply messages
386 FTNODE ancestor, // the ancestor node where we can find messages to apply
387 int childnum, // which child buffer of ancestor contains messages we want
388 const pivot_bounds &
389 bounds, // contains pivot key bounds of this basement node
390 txn_gc_info *gc_info,
391 bool *msgs_applied) {
392 int r;
393 NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
394
395 // Determine the offsets in the message trees between which we need to
396 // apply messages from this buffer
397 STAT64INFO_S stats_delta = {0, 0};
398 uint64_t workdone_this_ancestor = 0;
399 int64_t logical_rows_delta = 0;
400
401 uint32_t stale_lbi, stale_ube;
402 if (!bn->stale_ancestor_messages_applied) {
403 find_bounds_within_message_tree(t->ft->cmp,
404 bnc->stale_message_tree,
405 &bnc->msg_buffer,
406 bounds,
407 &stale_lbi,
408 &stale_ube);
409 } else {
410 stale_lbi = 0;
411 stale_ube = 0;
412 }
413 uint32_t fresh_lbi, fresh_ube;
414 find_bounds_within_message_tree(t->ft->cmp,
415 bnc->fresh_message_tree,
416 &bnc->msg_buffer,
417 bounds,
418 &fresh_lbi,
419 &fresh_ube);
420
421 // We now know where all the messages we must apply are, so one of the
422 // following 4 cases will do the application, depending on which of
423 // the lists contains relevant messages:
424 //
425 // 1. broadcast messages and anything else, or a mix of fresh and stale
426 // 2. only fresh messages
427 // 3. only stale messages
428 if (bnc->broadcast_list.size() > 0 ||
429 (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
430 // We have messages in multiple trees, so we grab all
431 // the relevant messages' offsets and sort them by MSN, then apply
432 // them in MSN order.
433 const int buffer_size =
434 ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) +
435 bnc->broadcast_list.size());
436 toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
437 int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
438 struct store_msg_buffer_offset_extra sfo_extra = {.offsets = offsets,
439 .i = 0};
440
441 // Populate offsets array with offsets to stale messages
442 r = bnc->stale_message_tree
443 .iterate_on_range<struct store_msg_buffer_offset_extra,
444 store_msg_buffer_offset>(
445 stale_lbi, stale_ube, &sfo_extra);
446 assert_zero(r);
447
448 // Then store fresh offsets, and mark them to be moved to stale later.
449 r = bnc->fresh_message_tree
450 .iterate_and_mark_range<struct store_msg_buffer_offset_extra,
451 store_msg_buffer_offset>(
452 fresh_lbi, fresh_ube, &sfo_extra);
453 assert_zero(r);
454
455 // Store offsets of all broadcast messages.
456 r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra,
457 store_msg_buffer_offset>(&sfo_extra);
458 assert_zero(r);
459 invariant(sfo_extra.i == buffer_size);
460
461 // Sort by MSN.
462 toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::
463 mergesort_r(offsets, buffer_size, bnc->msg_buffer);
464
465 // Apply the messages in MSN order.
466 for (int i = 0; i < buffer_size; ++i) {
467 *msgs_applied = true;
468 do_bn_apply_msg(t,
469 bn,
470 &bnc->msg_buffer,
471 offsets[i],
472 gc_info,
473 &workdone_this_ancestor,
474 &stats_delta,
475 &logical_rows_delta);
476 }
477 } else if (stale_lbi == stale_ube) {
478 // No stale messages to apply, we just apply fresh messages, and mark
479 // them to be moved to stale later.
480 struct iterate_do_bn_apply_msg_extra iter_extra = {
481 .t = t,
482 .bn = bn,
483 .bnc = bnc,
484 .gc_info = gc_info,
485 .workdone = &workdone_this_ancestor,
486 .stats_to_update = &stats_delta,
487 .logical_rows_delta = &logical_rows_delta};
488 if (fresh_ube - fresh_lbi > 0)
489 *msgs_applied = true;
490 r = bnc->fresh_message_tree
491 .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra,
492 iterate_do_bn_apply_msg>(
493 fresh_lbi, fresh_ube, &iter_extra);
494 assert_zero(r);
495 } else {
496 invariant(fresh_lbi == fresh_ube);
497 // No fresh messages to apply, we just apply stale messages.
498
499 if (stale_ube - stale_lbi > 0)
500 *msgs_applied = true;
501 struct iterate_do_bn_apply_msg_extra iter_extra = {
502 .t = t,
503 .bn = bn,
504 .bnc = bnc,
505 .gc_info = gc_info,
506 .workdone = &workdone_this_ancestor,
507 .stats_to_update = &stats_delta,
508 .logical_rows_delta = &logical_rows_delta};
509
510 r = bnc->stale_message_tree
511 .iterate_on_range<struct iterate_do_bn_apply_msg_extra,
512 iterate_do_bn_apply_msg>(
513 stale_lbi, stale_ube, &iter_extra);
514 assert_zero(r);
515 }
516 //
517 // update stats
518 //
519 if (workdone_this_ancestor > 0) {
520 (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum),
521 workdone_this_ancestor);
522 }
523 if (stats_delta.numbytes || stats_delta.numrows) {
524 toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
525 }
526 toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
527 bn->logical_rows_delta += logical_rows_delta;
528}
529
530static void
531apply_ancestors_messages_to_bn(
532 FT_HANDLE t,
533 FTNODE node,
534 int childnum,
535 ANCESTORS ancestors,
536 const pivot_bounds &bounds,
537 txn_gc_info *gc_info,
538 bool* msgs_applied
539 )
540{
541 BASEMENTNODE curr_bn = BLB(node, childnum);
542 const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
543 for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
544 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
545 paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
546 bnc_apply_messages_to_basement_node(
547 t,
548 curr_bn,
549 curr_ancestors->node,
550 curr_ancestors->childnum,
551 curr_bounds,
552 gc_info,
553 msgs_applied
554 );
555 // We don't want to check this ancestor node again if the
556 // next time we query it, the msn hasn't changed.
557 curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
558 }
559 }
560 // At this point, we know all the stale messages above this
561 // basement node have been applied, and any new messages will be
562 // fresh, so we don't need to look at stale messages for this
563 // basement node, unless it gets evicted (and this field becomes
564 // false when it's read in again).
565 curr_bn->stale_ancestor_messages_applied = true;
566}
567
568void
569toku_apply_ancestors_messages_to_node (
570 FT_HANDLE t,
571 FTNODE node,
572 ANCESTORS ancestors,
573 const pivot_bounds &bounds,
574 bool* msgs_applied,
575 int child_to_read
576 )
577// Effect:
578// Bring a leaf node up-to-date according to all the messages in the ancestors.
579// If the leaf node is already up-to-date then do nothing.
580// If the leaf node is not already up-to-date, then record the work done
581// for that leaf in each ancestor.
582// Requires:
583// This is being called when pinning a leaf node for the query path.
584// The entire root-to-leaf path is pinned and appears in the ancestors list.
585{
586 VERIFY_NODE(t, node);
587 paranoid_invariant(node->height == 0);
588
589 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t);
590 txn_manager_state txn_state_for_gc(txn_manager);
591
592 TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t);
593 txn_gc_info gc_info(&txn_state_for_gc,
594 oldest_referenced_xid_for_simple_gc,
595 node->oldest_referenced_xid_known,
596 true);
597 if (!node->dirty && child_to_read >= 0) {
598 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
599 apply_ancestors_messages_to_bn(
600 t,
601 node,
602 child_to_read,
603 ancestors,
604 bounds,
605 &gc_info,
606 msgs_applied
607 );
608 }
609 else {
610 // know we are a leaf node
611 // An important invariant:
612 // We MUST bring every available basement node for a dirty node up to date.
613 // flushing on the cleaner thread depends on this. This invariant
614 // allows the cleaner thread to just pick an internal node and flush it
615 // as opposed to being forced to start from the root.
616 for (int i = 0; i < node->n_children; i++) {
617 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
618 apply_ancestors_messages_to_bn(
619 t,
620 node,
621 i,
622 ancestors,
623 bounds,
624 &gc_info,
625 msgs_applied
626 );
627 }
628 }
629 VERIFY_NODE(t, node);
630}
631
632static bool bn_needs_ancestors_messages(
633 FT ft,
634 FTNODE node,
635 int childnum,
636 const pivot_bounds &bounds,
637 ANCESTORS ancestors,
638 MSN* max_msn_applied
639 )
640{
641 BASEMENTNODE bn = BLB(node, childnum);
642 const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
643 bool needs_ancestors_messages = false;
644 for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
645 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
646 paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
647 NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
648 if (bnc->broadcast_list.size() > 0) {
649 needs_ancestors_messages = true;
650 goto cleanup;
651 }
652 if (!bn->stale_ancestor_messages_applied) {
653 uint32_t stale_lbi, stale_ube;
654 find_bounds_within_message_tree(ft->cmp,
655 bnc->stale_message_tree,
656 &bnc->msg_buffer,
657 curr_bounds,
658 &stale_lbi,
659 &stale_ube);
660 if (stale_lbi < stale_ube) {
661 needs_ancestors_messages = true;
662 goto cleanup;
663 }
664 }
665 uint32_t fresh_lbi, fresh_ube;
666 find_bounds_within_message_tree(ft->cmp,
667 bnc->fresh_message_tree,
668 &bnc->msg_buffer,
669 curr_bounds,
670 &fresh_lbi,
671 &fresh_ube);
672 if (fresh_lbi < fresh_ube) {
673 needs_ancestors_messages = true;
674 goto cleanup;
675 }
676 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) {
677 max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn;
678 }
679 }
680 }
681cleanup:
682 return needs_ancestors_messages;
683}
684
685bool toku_ft_leaf_needs_ancestors_messages(
686 FT ft,
687 FTNODE node,
688 ANCESTORS ancestors,
689 const pivot_bounds &bounds,
690 MSN *const max_msn_in_path,
691 int child_to_read
692 )
693// Effect: Determine whether there are messages in a node's ancestors
694// which must be applied to it. These messages are in the correct
695// keyrange for any available basement nodes, and are in nodes with the
696// correct max_msn_applied_to_node_on_disk.
697// Notes:
698// This is an approximate query.
699// Output:
700// max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
701// ancestors. This is used later to update basement nodes'
702// max_msn_applied values in case we don't do the full algorithm.
703// Returns:
704// true if there may be some such messages
705// false only if there are definitely no such messages
706// Rationale:
707// When we pin a node with a read lock, we want to quickly determine if
708// we should exchange it for a write lock in preparation for applying
709// messages. If there are no messages, we don't need the write lock.
710{
711 paranoid_invariant(node->height == 0);
712 bool needs_ancestors_messages = false;
713 // child_to_read may be -1 in test cases
714 if (!node->dirty && child_to_read >= 0) {
715 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
716 needs_ancestors_messages = bn_needs_ancestors_messages(
717 ft,
718 node,
719 child_to_read,
720 bounds,
721 ancestors,
722 max_msn_in_path
723 );
724 }
725 else {
726 for (int i = 0; i < node->n_children; ++i) {
727 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
728 needs_ancestors_messages = bn_needs_ancestors_messages(
729 ft,
730 node,
731 i,
732 bounds,
733 ancestors,
734 max_msn_in_path
735 );
736 if (needs_ancestors_messages) {
737 goto cleanup;
738 }
739 }
740 }
741cleanup:
742 return needs_ancestors_messages;
743}
744
745void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) {
746 invariant(node->height == 0);
747 if (!node->dirty && child_to_read >= 0) {
748 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
749 BASEMENTNODE bn = BLB(node, child_to_read);
750 if (max_msn_applied.msn > bn->max_msn_applied.msn) {
751 // see comment below
752 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
753 }
754 }
755 else {
756 for (int i = 0; i < node->n_children; ++i) {
757 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
758 BASEMENTNODE bn = BLB(node, i);
759 if (max_msn_applied.msn > bn->max_msn_applied.msn) {
760 // This function runs in a shared access context, so to silence tools
761 // like DRD, we use a CAS and ignore the result.
762 // Any threads trying to update these basement nodes should be
763 // updating them to the same thing (since they all have a read lock on
764 // the same root-to-leaf path) so this is safe.
765 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
766 }
767 }
768 }
769}
770
771struct copy_to_stale_extra {
772 FT ft;
773 NONLEAF_CHILDINFO bnc;
774};
775
776int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3)));
777int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
778{
779 MSN msn;
780 DBT key;
781 extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn);
782 struct toku_msg_buffer_key_msn_heaviside_extra heaviside_extra(extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn);
783 int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr);
784 invariant_zero(r);
785 return 0;
786}
787
788void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) {
789 struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
790 int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
791 invariant_zero(r);
792 bnc->fresh_message_tree.delete_all_marked();
793}
794
795void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
796 invariant(node->height > 0);
797 for (int i = 0; i < node->n_children; ++i) {
798 if (BP_STATE(node, i) != PT_AVAIL) {
799 continue;
800 }
801 NONLEAF_CHILDINFO bnc = BNC(node, i);
802 // We can't delete things out of the fresh tree inside the above
803 // procedures because we're still looking at the fresh tree. Instead
804 // we have to move messages after we're done looking at it.
805 toku_ft_bnc_move_messages_to_stale(ft, bnc);
806 }
807}
808
809//
810// Balance // Availibility // Size
811
812struct rebalance_array_info {
813 uint32_t offset;
814 LEAFENTRY *le_array;
815 uint32_t *key_sizes_array;
816 const void **key_ptr_array;
817 static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le,
818 const uint32_t idx, struct rebalance_array_info *const ai) {
819 ai->le_array[idx+ai->offset] = le;
820 ai->key_sizes_array[idx+ai->offset] = keylen;
821 ai->key_ptr_array[idx+ai->offset] = key;
822 return 0;
823 }
824};
825
826// There must still be at least one child
827// Requires that all messages in buffers above have been applied.
828// Because all messages above have been applied, setting msn of all new basements
829// to max msn of existing basements is correct. (There cannot be any messages in
830// buffers above that still need to be applied.)
831void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) {
832
833 assert(node->height == 0);
834 assert(node->dirty);
835
836 uint32_t num_orig_basements = node->n_children;
837 // Count number of leaf entries in this leaf (num_le).
838 uint32_t num_le = 0;
839 for (uint32_t i = 0; i < num_orig_basements; i++) {
840 num_le += BLB_DATA(node, i)->num_klpairs();
841 }
842
843 uint32_t num_alloc = num_le ? num_le : 1; // simplify logic below by always having at least one entry per array
844
845 // Create an array of OMTVALUE's that store all the pointers to all the data.
846 // Each element in leafpointers is a pointer to a leaf.
847 toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc);
848 LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get());
849 leafpointers[0] = NULL;
850
851 toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc);
852 const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get());
853 key_pointers[0] = NULL;
854
855 toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc);
856 uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get());
857
858 // Capture pointers to old mempools' buffers (so they can be destroyed)
859 toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements);
860 BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get());
861 old_bns[0] = NULL;
862
863 uint32_t curr_le = 0;
864 for (uint32_t i = 0; i < num_orig_basements; i++) {
865 bn_data* bd = BLB_DATA(node, i);
866 struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers };
867 bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai);
868 curr_le += bd->num_klpairs();
869 }
870
871 // Create an array that will store indexes of new pivots.
872 // Each element in new_pivots is the index of a pivot key.
873 // (Allocating num_le of them is overkill, but num_le is an upper bound.)
874 toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc);
875 uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get());
876 new_pivots[0] = 0;
877
878 // Each element in le_sizes is the size of the leafentry pointed to by leafpointers.
879 toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc);
880 size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get());
881 le_sizes[0] = 0;
882
883 // Create an array that will store the size of each basement.
884 // This is the sum of the leaf sizes of all the leaves in that basement.
885 // We don't know how many basements there will be, so we use num_le as the upper bound.
886
887 // Sum of all le sizes in a single basement
888 toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc);
889 size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get());
890
891 // Sum of all key sizes in a single basement
892 toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc);
893 size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get());
894
895 // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les).
896 // Each entry is the number of leafentries in this basement. (Again, num_le is overkill upper baound.)
897 toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc);
898 uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get());
899 num_les_this_bn[0] = 0;
900
901 // Figure out the new pivots.
902 // We need the index of each pivot, and for each basement we need
903 // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement).
904 uint32_t curr_pivot = 0;
905 uint32_t num_le_in_curr_bn = 0;
906 uint32_t bn_size_so_far = 0;
907 for (uint32_t i = 0; i < num_le; i++) {
908 uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]);
909 le_sizes[i] = curr_le_size;
910 if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) {
911 // cap off the current basement node to end with the element before i
912 new_pivots[curr_pivot] = i-1;
913 curr_pivot++;
914 num_le_in_curr_bn = 0;
915 bn_size_so_far = 0;
916 }
917 num_le_in_curr_bn++;
918 num_les_this_bn[curr_pivot] = num_le_in_curr_bn;
919 bn_le_sizes[curr_pivot] += curr_le_size;
920 bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i]; // uint32_t le_offset
921 bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i];
922 }
923 // curr_pivot is now the total number of pivot keys in the leaf node
924 int num_pivots = curr_pivot;
925 int num_children = num_pivots + 1;
926
927 // now we need to fill in the new basement nodes and pivots
928
929 // TODO: (Zardosht) this is an ugly thing right now
930 // Need to figure out how to properly deal with seqinsert.
931 // I am not happy with how this is being
932 // handled with basement nodes
933 uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1);
934
935 // choose the max msn applied to any basement as the max msn applied to all new basements
936 MSN max_msn = ZERO_MSN;
937 for (uint32_t i = 0; i < num_orig_basements; i++) {
938 MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i);
939 max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn;
940 }
941 // remove the basement node in the node, we've saved a copy
942 for (uint32_t i = 0; i < num_orig_basements; i++) {
943 // save a reference to the old basement nodes
944 // we will need them to ensure that the memory
945 // stays intact
946 old_bns[i] = toku_detach_bn(node, i);
947 }
948 // Now destroy the old basements, but do not destroy leaves
949 toku_destroy_ftnode_internals(node);
950
951 // now reallocate pieces and start filling them in
952 invariant(num_children > 0);
953
954 node->n_children = num_children;
955 XCALLOC_N(num_children, node->bp); // allocate pointers to basements (bp)
956 for (int i = 0; i < num_children; i++) {
957 set_BLB(node, i, toku_create_empty_bn()); // allocate empty basements and set bp pointers
958 }
959
960 // now we start to fill in the data
961
962 // first the pivots
963 toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT));
964 DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get());
965 for (int i = 0; i < num_pivots; i++) {
966 uint32_t size = key_sizes[new_pivots[i]];
967 const void *key = key_pointers[new_pivots[i]];
968 toku_fill_dbt(&pivotkeys[i], key, size);
969 }
970 node->pivotkeys.create_from_dbts(pivotkeys, num_pivots);
971
972 uint32_t baseindex_this_bn = 0;
973 // now the basement nodes
974 for (int i = 0; i < num_children; i++) {
975 // put back seqinsert
976 BLB_SEQINSERT(node, i) = tmp_seqinsert;
977
978 // create start (inclusive) and end (exclusive) boundaries for data of basement node
979 uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1; // index of first leaf in basement
980 uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1; // index of first leaf in next basement
981 uint32_t num_in_bn = curr_end - curr_start; // number of leaves in this basement
982
983 // create indexes for new basement
984 invariant(baseindex_this_bn == curr_start);
985 uint32_t num_les_to_copy = num_les_this_bn[i];
986 invariant(num_les_to_copy == num_in_bn);
987
988 bn_data* bd = BLB_DATA(node, i);
989 bd->set_contents_as_clone_of_sorted_array(
990 num_les_to_copy,
991 &key_pointers[baseindex_this_bn],
992 &key_sizes[baseindex_this_bn],
993 &leafpointers[baseindex_this_bn],
994 &le_sizes[baseindex_this_bn],
995 bn_key_sizes[i], // Total key sizes
996 bn_le_sizes[i] // total le sizes
997 );
998
999 BP_STATE(node,i) = PT_AVAIL;
1000 BP_TOUCH_CLOCK(node,i);
1001 BLB_MAX_MSN_APPLIED(node,i) = max_msn;
1002 baseindex_this_bn += num_les_to_copy; // set to index of next bn
1003 }
1004 node->max_msn_applied_to_node_on_disk = max_msn;
1005
1006 // destroy buffers of old mempools
1007 for (uint32_t i = 0; i < num_orig_basements; i++) {
1008 destroy_basement_node(old_bns[i]);
1009 }
1010}
1011
1012bool toku_ftnode_fully_in_memory(FTNODE node) {
1013 for (int i = 0; i < node->n_children; i++) {
1014 if (BP_STATE(node,i) != PT_AVAIL) {
1015 return false;
1016 }
1017 }
1018 return true;
1019}
1020
1021void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) {
1022 paranoid_invariant(toku_ftnode_fully_in_memory(node));
1023}
1024
1025uint32_t toku_ftnode_leaf_num_entries(FTNODE node) {
1026 toku_ftnode_assert_fully_in_memory(node);
1027 uint32_t num_entries = 0;
1028 for (int i = 0; i < node->n_children; i++) {
1029 num_entries += BLB_DATA(node, i)->num_klpairs();
1030 }
1031 return num_entries;
1032}
1033
1034enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) {
1035 enum reactivity re = RE_STABLE;
1036 toku_ftnode_assert_fully_in_memory(node);
1037 paranoid_invariant(node->height==0);
1038 unsigned int size = toku_serialize_ftnode_size(node);
1039 if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) {
1040 re = RE_FISSIBLE;
1041 } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) {
1042 re = RE_FUSIBLE;
1043 }
1044 return re;
1045}
1046
1047enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) {
1048 paranoid_invariant(node->height > 0);
1049 int n_children = node->n_children;
1050 if (n_children > (int) fanout) {
1051 return RE_FISSIBLE;
1052 }
1053 if (n_children * 4 < (int) fanout) {
1054 return RE_FUSIBLE;
1055 }
1056 return RE_STABLE;
1057}
1058
1059enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) {
1060 toku_ftnode_assert_fully_in_memory(node);
1061 if (node->height == 0) {
1062 return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize);
1063 } else {
1064 return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout);
1065 }
1066}
1067
1068unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) {
1069 return bnc->msg_buffer.buffer_size_in_use();
1070}
1071
1072// Return true if the size of the buffers plus the amount of work done is large enough.
1073// Return false if there is nothing to be flushed (the buffers empty).
1074bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) {
1075 uint64_t size = toku_serialize_ftnode_size(node);
1076
1077 bool buffers_are_empty = true;
1078 toku_ftnode_assert_fully_in_memory(node);
1079 //
1080 // the nonleaf node is gorged if the following holds true:
1081 // - the buffers are non-empty
1082 // - the total workdone by the buffers PLUS the size of the buffers
1083 // is greater than nodesize (which as of Maxwell should be
1084 // 4MB)
1085 //
1086 paranoid_invariant(node->height > 0);
1087 for (int child = 0; child < node->n_children; ++child) {
1088 size += BP_WORKDONE(node, child);
1089 }
1090 for (int child = 0; child < node->n_children; ++child) {
1091 if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
1092 buffers_are_empty = false;
1093 break;
1094 }
1095 }
1096 return ((size > nodesize)
1097 &&
1098 (!buffers_are_empty));
1099}
1100
1101int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) {
1102 return bnc->msg_buffer.num_entries();
1103}
1104
1105// how much memory does this child buffer consume?
1106long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) {
1107 return (sizeof(*bnc) +
1108 bnc->msg_buffer.memory_footprint() +
1109 bnc->fresh_message_tree.memory_size() +
1110 bnc->stale_message_tree.memory_size() +
1111 bnc->broadcast_list.memory_size());
1112}
1113
1114// how much memory in this child buffer holds useful data?
1115// originally created solely for use by test program(s).
1116long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) {
1117 return (sizeof(*bnc) +
1118 bnc->msg_buffer.memory_size_in_use() +
1119 bnc->fresh_message_tree.memory_size() +
1120 bnc->stale_message_tree.memory_size() +
1121 bnc->broadcast_list.memory_size());
1122}
1123
1124//
1125// Garbage collection
1126// Message injection
1127// Message application
1128//
1129
1130// Used only by test programs: append a child node to a parent node
1131void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
1132 int childnum = node->n_children;
1133 node->n_children++;
1134 REALLOC_N(node->n_children, node->bp);
1135 BP_BLOCKNUM(node,childnum) = child->blocknum;
1136 BP_STATE(node,childnum) = PT_AVAIL;
1137 BP_WORKDONE(node, childnum) = 0;
1138 set_BNC(node, childnum, toku_create_empty_nl());
1139 if (pivotkey) {
1140 invariant(childnum > 0);
1141 node->pivotkeys.insert_at(pivotkey, childnum - 1);
1142 }
1143 node->dirty = 1;
1144}
1145
1146void
1147toku_ft_bn_apply_msg_once (
1148 BASEMENTNODE bn,
1149 const ft_msg &msg,
1150 uint32_t idx,
1151 uint32_t le_keylen,
1152 LEAFENTRY le,
1153 txn_gc_info *gc_info,
1154 uint64_t *workdone,
1155 STAT64INFO stats_to_update,
1156 int64_t *logical_rows_delta
1157 )
1158// Effect: Apply msg to leafentry (msn is ignored)
1159// Calculate work done by message on leafentry and add it to caller's workdone counter.
1160// idx is the location where it goes
1161// le is old leafentry
1162{
1163 size_t newsize=0, oldsize=0, workdone_this_le=0;
1164 LEAFENTRY new_le=0;
1165 // how many bytes of user data (not including overhead) were added or
1166 // deleted from this row
1167 int64_t numbytes_delta = 0;
1168 // will be +1 or -1 or 0 (if row was added or deleted or not)
1169 int64_t numrows_delta = 0;
1170 // will be +1, -1 or 0 if a message that was accounted for logically has
1171 // changed in meaning such as an insert changed to an update or a delete
1172 // changed to a noop
1173 int64_t logical_rows_delta_le = 0;
1174 uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t);
1175 if (le) {
1176 oldsize = leafentry_memsize(le) + key_storage_size;
1177 }
1178
1179 // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt()
1180 // to allocate more space. That means le is guaranteed to not cause a
1181 // sigsegv but it may point to a mempool that is no longer in use.
1182 // We'll have to release the old mempool later.
1183 logical_rows_delta_le = toku_le_apply_msg(
1184 msg,
1185 le,
1186 &bn->data_buffer,
1187 idx,
1188 le_keylen,
1189 gc_info,
1190 &new_le,
1191 &numbytes_delta);
1192
1193 // at this point, we cannot trust cmd->u.id.key to be valid.
1194 // The dmt may have realloced its mempool and freed the one containing key.
1195
1196 newsize = new_le ? (leafentry_memsize(new_le) + + key_storage_size) : 0;
1197 if (le && new_le) {
1198 workdone_this_le = (oldsize > newsize ? oldsize : newsize); // work done is max of le size before and after message application
1199
1200 } else { // we did not just replace a row, so ...
1201 if (le) {
1202 // ... we just deleted a row ...
1203 workdone_this_le = oldsize;
1204 numrows_delta = -1;
1205 }
1206 if (new_le) {
1207 // ... or we just added a row
1208 workdone_this_le = newsize;
1209 numrows_delta = 1;
1210 }
1211 }
1212 if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL
1213 *workdone += workdone_this_le;
1214 }
1215
1216 if (FT_LIKELY(logical_rows_delta != NULL)) {
1217 *logical_rows_delta += logical_rows_delta_le;
1218 }
1219 // now update stat64 statistics
1220 bn->stat64_delta.numrows += numrows_delta;
1221 bn->stat64_delta.numbytes += numbytes_delta;
1222 // the only reason stats_to_update may be null is for tests
1223 if (FT_LIKELY(stats_to_update != NULL)) {
1224 stats_to_update->numrows += numrows_delta;
1225 stats_to_update->numbytes += numbytes_delta;
1226 }
1227}
1228
1229static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in.
1230struct setval_extra_s {
1231 uint32_t tag;
1232 bool did_set_val;
1233 // any error code that setval_fun wants to return goes here.
1234 int setval_r;
1235 // need arguments for toku_ft_bn_apply_msg_once
1236 BASEMENTNODE bn;
1237 // captured from original message, not currently used
1238 MSN msn;
1239 XIDS xids;
1240 const DBT* key;
1241 uint32_t idx;
1242 uint32_t le_keylen;
1243 LEAFENTRY le;
1244 txn_gc_info* gc_info;
1245 uint64_t* workdone; // set by toku_ft_bn_apply_msg_once()
1246 STAT64INFO stats_to_update;
1247 int64_t* logical_rows_delta;
1248};
1249
1250/*
1251 * If new_val == NULL, we send a delete message instead of an insert.
1252 * This happens here instead of in do_delete() for consistency.
1253 * setval_fun() is called from handlerton, passing in svextra_v
1254 * from setval_extra_s input arg to ft->update_fun().
1255 */
1256static void setval_fun (const DBT *new_val, void *svextra_v) {
1257 struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
1258 paranoid_invariant(svextra->tag==setval_tag);
1259 paranoid_invariant(!svextra->did_set_val);
1260 svextra->did_set_val = true;
1261
1262 {
1263 // can't leave scope until toku_ft_bn_apply_msg_once if
1264 // this is a delete
1265 DBT val;
1266 ft_msg msg(
1267 svextra->key,
1268 new_val ? new_val : toku_init_dbt(&val),
1269 new_val ? FT_INSERT : FT_DELETE_ANY,
1270 svextra->msn,
1271 svextra->xids);
1272 toku_ft_bn_apply_msg_once(
1273 svextra->bn,
1274 msg,
1275 svextra->idx,
1276 svextra->le_keylen,
1277 svextra->le,
1278 svextra->gc_info,
1279 svextra->workdone,
1280 svextra->stats_to_update,
1281 svextra->logical_rows_delta);
1282 svextra->setval_r = 0;
1283 }
1284}
1285
1286// We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls
1287// do_update()), so capturing the msn in the setval_extra_s is not strictly
1288// required. The alternative would be to put a dummy msn in the messages
1289// created by setval_fun(), but preserving the original msn seems cleaner and
1290// it preserves accountability at a lower layer.
1291static int do_update(
1292 ft_update_func update_fun,
1293 const DESCRIPTOR_S* desc,
1294 BASEMENTNODE bn,
1295 const ft_msg &msg,
1296 uint32_t idx,
1297 LEAFENTRY le,
1298 void* keydata,
1299 uint32_t keylen,
1300 txn_gc_info* gc_info,
1301 uint64_t* workdone,
1302 STAT64INFO stats_to_update,
1303 int64_t* logical_rows_delta) {
1304
1305 LEAFENTRY le_for_update;
1306 DBT key;
1307 const DBT *keyp;
1308 const DBT *update_function_extra;
1309 DBT vdbt;
1310 const DBT *vdbtp;
1311
1312 // the location of data depends whether this is a regular or
1313 // broadcast update
1314 if (msg.type() == FT_UPDATE) {
1315 // key is passed in with command (should be same as from le)
1316 // update function extra is passed in with command
1317 keyp = msg.kdbt();
1318 update_function_extra = msg.vdbt();
1319 } else {
1320 invariant(msg.type() == FT_UPDATE_BROADCAST_ALL);
1321 // key is not passed in with broadcast, it comes from le
1322 // update function extra is passed in with command
1323 paranoid_invariant(le); // for broadcast updates, we just hit all leafentries
1324 // so this cannot be null
1325 paranoid_invariant(keydata);
1326 paranoid_invariant(keylen);
1327 paranoid_invariant(msg.kdbt()->size == 0);
1328 keyp = toku_fill_dbt(&key, keydata, keylen);
1329 update_function_extra = msg.vdbt();
1330 }
1331 toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL);
1332
1333 if (le && !le_latest_is_del(le)) {
1334 // if the latest val exists, use it, and we'll use the leafentry later
1335 uint32_t vallen;
1336 void *valp = le_latest_val_and_len(le, &vallen);
1337 vdbtp = toku_fill_dbt(&vdbt, valp, vallen);
1338 } else {
1339 // otherwise, the val and leafentry are both going to be null
1340 vdbtp = NULL;
1341 }
1342 le_for_update = le;
1343
1344 struct setval_extra_s setval_extra = {
1345 setval_tag,
1346 false,
1347 0,
1348 bn,
1349 msg.msn(),
1350 msg.xids(),
1351 keyp,
1352 idx,
1353 keylen,
1354 le_for_update,
1355 gc_info,
1356 workdone,
1357 stats_to_update,
1358 logical_rows_delta
1359 };
1360 // call handlerton's ft->update_fun(), which passes setval_extra
1361 // to setval_fun()
1362 FAKE_DB(db, desc);
1363 int r = update_fun(
1364 &db,
1365 keyp,
1366 vdbtp,
1367 update_function_extra,
1368 setval_fun,
1369 &setval_extra);
1370
1371 if (r == 0) { r = setval_extra.setval_r; }
1372 return r;
1373}
1374
1375// Should be renamed as something like "apply_msg_to_basement()."
1376void toku_ft_bn_apply_msg(
1377 const toku::comparator& cmp,
1378 ft_update_func update_fun,
1379 BASEMENTNODE bn,
1380 const ft_msg& msg,
1381 txn_gc_info* gc_info,
1382 uint64_t* workdone,
1383 STAT64INFO stats_to_update,
1384 int64_t* logical_rows_delta) {
1385// Effect:
1386// Put a msg into a leaf.
1387// Calculate work done by message on leafnode and add it to caller's
1388// workdone counter.
1389// The leaf could end up "too big" or "too small". The caller must fix that up.
1390 LEAFENTRY storeddata;
1391 void* key = NULL;
1392 uint32_t keylen = 0;
1393
1394 uint32_t num_klpairs;
1395 int r;
1396 struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt());
1397
1398 unsigned int doing_seqinsert = bn->seqinsert;
1399 bn->seqinsert = 0;
1400
1401 switch (msg.type()) {
1402 case FT_INSERT_NO_OVERWRITE:
1403 case FT_INSERT: {
1404 uint32_t idx;
1405 if (doing_seqinsert) {
1406 idx = bn->data_buffer.num_klpairs();
1407 DBT kdbt;
1408 r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data);
1409 if (r != 0) goto fz;
1410 int c = toku_msg_leafval_heaviside(kdbt, be);
1411 if (c >= 0) goto fz;
1412 r = DB_NOTFOUND;
1413 } else {
1414 fz:
1415 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1416 be,
1417 &storeddata,
1418 &key,
1419 &keylen,
1420 &idx
1421 );
1422 }
1423 if (r==DB_NOTFOUND) {
1424 storeddata = 0;
1425 } else {
1426 assert_zero(r);
1427 }
1428 toku_ft_bn_apply_msg_once(
1429 bn,
1430 msg,
1431 idx,
1432 keylen,
1433 storeddata,
1434 gc_info,
1435 workdone,
1436 stats_to_update,
1437 logical_rows_delta);
1438
1439 // if the insertion point is within a window of the right edge of
1440 // the leaf then it is sequential
1441 // window = min(32, number of leaf entries/16)
1442 {
1443 uint32_t s = bn->data_buffer.num_klpairs();
1444 uint32_t w = s / 16;
1445 if (w == 0) w = 1;
1446 if (w > 32) w = 32;
1447
1448 // within the window?
1449 if (s - idx <= w)
1450 bn->seqinsert = doing_seqinsert + 1;
1451 }
1452 break;
1453 }
1454 case FT_DELETE_ANY:
1455 case FT_ABORT_ANY:
1456 case FT_COMMIT_ANY: {
1457 uint32_t idx;
1458 // Apply to all the matches
1459
1460 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1461 be,
1462 &storeddata,
1463 &key,
1464 &keylen,
1465 &idx);
1466 if (r == DB_NOTFOUND) break;
1467 assert_zero(r);
1468 toku_ft_bn_apply_msg_once(
1469 bn,
1470 msg,
1471 idx,
1472 keylen,
1473 storeddata,
1474 gc_info,
1475 workdone,
1476 stats_to_update,
1477 logical_rows_delta);
1478 break;
1479 }
1480 case FT_OPTIMIZE_FOR_UPGRADE:
1481 // fall through so that optimize_for_upgrade performs rest of the optimize logic
1482 case FT_COMMIT_BROADCAST_ALL:
1483 case FT_OPTIMIZE:
1484 // Apply to all leafentries
1485 num_klpairs = bn->data_buffer.num_klpairs();
1486 for (uint32_t idx = 0; idx < num_klpairs; ) {
1487 void* curr_keyp = NULL;
1488 uint32_t curr_keylen = 0;
1489 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1490 assert_zero(r);
1491 int deleted = 0;
1492 if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
1493 // message application code needs a key in order to determine
1494 // how much work was done by this message. since this is a
1495 // broadcast message, we have to create a new message whose
1496 // key is the current le's key.
1497 DBT curr_keydbt;
1498 ft_msg curr_msg(
1499 toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1500 msg.vdbt(),
1501 msg.type(),
1502 msg.msn(),
1503 msg.xids());
1504 toku_ft_bn_apply_msg_once(
1505 bn,
1506 curr_msg,
1507 idx,
1508 curr_keylen,
1509 storeddata,
1510 gc_info,
1511 workdone,
1512 stats_to_update,
1513 logical_rows_delta);
1514 // at this point, we cannot trust msg.kdbt to be valid.
1515 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1516 if (new_dmt_size != num_klpairs) {
1517 paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1518 //Item was deleted.
1519 deleted = 1;
1520 }
1521 }
1522 if (deleted)
1523 num_klpairs--;
1524 else
1525 idx++;
1526 }
1527 paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1528
1529 break;
1530 case FT_COMMIT_BROADCAST_TXN:
1531 case FT_ABORT_BROADCAST_TXN:
1532 // Apply to all leafentries if txn is represented
1533 num_klpairs = bn->data_buffer.num_klpairs();
1534 for (uint32_t idx = 0; idx < num_klpairs; ) {
1535 void* curr_keyp = NULL;
1536 uint32_t curr_keylen = 0;
1537 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1538 assert_zero(r);
1539 int deleted = 0;
1540 if (le_has_xids(storeddata, msg.xids())) {
1541 // message application code needs a key in order to determine
1542 // how much work was done by this message. since this is a
1543 // broadcast message, we have to create a new message whose key
1544 // is the current le's key.
1545 DBT curr_keydbt;
1546 ft_msg curr_msg(
1547 toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1548 msg.vdbt(),
1549 msg.type(),
1550 msg.msn(),
1551 msg.xids());
1552 toku_ft_bn_apply_msg_once(
1553 bn,
1554 curr_msg,
1555 idx,
1556 curr_keylen,
1557 storeddata,
1558 gc_info,
1559 workdone,
1560 stats_to_update,
1561 logical_rows_delta);
1562 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1563 if (new_dmt_size != num_klpairs) {
1564 paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1565 //Item was deleted.
1566 deleted = 1;
1567 }
1568 }
1569 if (deleted)
1570 num_klpairs--;
1571 else
1572 idx++;
1573 }
1574 paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1575
1576 break;
1577 case FT_UPDATE: {
1578 uint32_t idx;
1579 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1580 be,
1581 &storeddata,
1582 &key,
1583 &keylen,
1584 &idx
1585 );
1586 if (r==DB_NOTFOUND) {
1587 {
1588 //Point to msg's copy of the key so we don't worry about le being freed
1589 //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled
1590 key = msg.kdbt()->data;
1591 keylen = msg.kdbt()->size;
1592 }
1593 r = do_update(
1594 update_fun,
1595 cmp.get_descriptor(),
1596 bn,
1597 msg,
1598 idx,
1599 NULL,
1600 NULL,
1601 0,
1602 gc_info,
1603 workdone,
1604 stats_to_update,
1605 logical_rows_delta);
1606 } else if (r==0) {
1607 r = do_update(
1608 update_fun,
1609 cmp.get_descriptor(),
1610 bn,
1611 msg,
1612 idx,
1613 storeddata,
1614 key,
1615 keylen,
1616 gc_info,
1617 workdone,
1618 stats_to_update,
1619 logical_rows_delta);
1620 } // otherwise, a worse error, just return it
1621 break;
1622 }
1623 case FT_UPDATE_BROADCAST_ALL: {
1624 // apply to all leafentries.
1625 uint32_t idx = 0;
1626 uint32_t num_leafentries_before;
1627 // This is used to avoid having the logical row count changed on apply
1628 // of this message since it will return a negative number of the number
1629 // of leaf entries visited and cause the ft header value to go to 0;
1630 // This message will not change the number of rows, so just use the
1631 // bogus value.
1632 int64_t temp_logical_rows_delta = 0;
1633 while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1634 void* curr_key = nullptr;
1635 uint32_t curr_keylen = 0;
1636 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key);
1637 assert_zero(r);
1638
1639 //TODO: 46 replace this with something better than cloning key
1640 // TODO: (Zardosht) This may be unnecessary now, due to how the key
1641 // is handled in the bndata. Investigate and determine
1642 char clone_mem[curr_keylen]; // only lasts one loop, alloca would overflow (end of function)
1643 memcpy((void*)clone_mem, curr_key, curr_keylen);
1644 curr_key = (void*)clone_mem;
1645
1646 // This is broken below. Have a compilation error checked
1647 // in as a reminder
1648 r = do_update(
1649 update_fun,
1650 cmp.get_descriptor(),
1651 bn,
1652 msg,
1653 idx,
1654 storeddata,
1655 curr_key,
1656 curr_keylen,
1657 gc_info,
1658 workdone,
1659 stats_to_update,
1660 &temp_logical_rows_delta);
1661 assert_zero(r);
1662
1663 if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1664 // we didn't delete something, so increment the index.
1665 idx++;
1666 }
1667 }
1668 break;
1669 }
1670 case FT_NONE: break; // don't do anything
1671 }
1672
1673 return;
1674}
1675
1676static inline int
1677key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) {
1678 int r = cmp(a, b);
1679 if (r == 0) {
1680 if (amsn.msn > bmsn.msn) {
1681 r = +1;
1682 } else if (amsn.msn < bmsn.msn) {
1683 r = -1;
1684 } else {
1685 r = 0;
1686 }
1687 }
1688 return r;
1689}
1690
1691int toku_msg_buffer_key_msn_heaviside(const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &extra) {
1692 MSN query_msn;
1693 DBT query_key;
1694 extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn);
1695 return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp);
1696}
1697
1698int toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo) {
1699 MSN amsn, bmsn;
1700 DBT akey, bkey;
1701 extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn);
1702 extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn);
1703 return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp);
1704}
1705
1706// Effect: Enqueue the message represented by the parameters into the
1707// bnc's buffer, and put it in either the fresh or stale message tree,
1708// or the broadcast list.
1709static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) {
1710 int r = 0;
1711 int32_t offset;
1712 bnc->msg_buffer.enqueue(msg, is_fresh, &offset);
1713 enum ft_msg_type type = msg.type();
1714 if (ft_msg_type_applies_once(type)) {
1715 DBT key;
1716 toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size);
1717 struct toku_msg_buffer_key_msn_heaviside_extra extra(cmp, &bnc->msg_buffer, &key, msg.msn());
1718 if (is_fresh) {
1719 r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1720 assert_zero(r);
1721 } else {
1722 r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1723 assert_zero(r);
1724 }
1725 } else {
1726 invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
1727 const uint32_t idx = bnc->broadcast_list.size();
1728 r = bnc->broadcast_list.insert_at(offset, idx);
1729 assert_zero(r);
1730 }
1731}
1732
1733// This is only exported for tests.
1734void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp)
1735{
1736 DBT k, v;
1737 ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids);
1738 bnc_insert_msg(bnc, msg, is_fresh, cmp);
1739}
1740
1741// append a msg to a nonleaf node's child buffer
1742static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node,
1743 int childnum, const ft_msg &msg, bool is_fresh) {
1744 paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
1745 bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp);
1746 node->dirty = 1;
1747}
1748
1749// This is only exported for tests.
1750void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
1751 ft_msg msg(key, val, type, msn, xids);
1752 ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1753}
1754
1755static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1756// Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here.
1757// Also we don't worry about the node getting overfull here. It's the caller's problem.
1758{
1759 unsigned int childnum = (target_childnum >= 0
1760 ? target_childnum
1761 : toku_ftnode_which_child(node, msg.kdbt(), cmp));
1762 ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1763 NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1764 bnc->flow[0] += flow_deltas[0];
1765 bnc->flow[1] += flow_deltas[1];
1766}
1767
1768// TODO: Remove me, I'm boring.
1769static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) {
1770 return cmp(key, pivot);
1771}
1772
1773/* Find the leftmost child that may contain the key.
1774 * If the key exists it will be in the child whose number
1775 * is the return value of this function.
1776 */
1777int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1778 // a funny case of no pivots
1779 if (node->n_children <= 1) return 0;
1780
1781 DBT pivot;
1782
1783 // check the last key to optimize seq insertions
1784 int n = node->n_children-1;
1785 int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot));
1786 if (c > 0) return n;
1787
1788 // binary search the pivots
1789 int lo = 0;
1790 int hi = n-1; // skip the last one, we checked it above
1791 int mi;
1792 while (lo < hi) {
1793 mi = (lo + hi) / 2;
1794 c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1795 if (c > 0) {
1796 lo = mi+1;
1797 continue;
1798 }
1799 if (c < 0) {
1800 hi = mi;
1801 continue;
1802 }
1803 return mi;
1804 }
1805 return lo;
1806}
1807
1808// Used for HOT.
1809int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1810 DBT pivot;
1811 int low = 0;
1812 int hi = node->n_children - 1;
1813 int mi;
1814 while (low < hi) {
1815 mi = (low + hi) / 2;
1816 int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1817 if (r > 0) {
1818 low = mi + 1;
1819 } else if (r < 0) {
1820 hi = mi;
1821 } else {
1822 // if they were exactly equal, then we want the sub-tree under
1823 // the next pivot.
1824 return mi + 1;
1825 }
1826 }
1827 invariant(low == hi);
1828 return low;
1829}
1830
1831void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
1832 FTNODE CAST_FROM_VOIDP(node, value_data);
1833 node->ct_pair = p;
1834}
1835
1836static void
1837ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1838// Effect: Put the message into a nonleaf node. We put it into all children, possibly causing the children to become reactive.
1839// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
1840// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
1841{
1842 for (int i = 0; i < node->n_children; i++) {
1843 ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas);
1844 }
1845}
1846
1847static void
1848ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1849// Effect: Put the message into a nonleaf node. We may put it into a child, possibly causing the child to become reactive.
1850// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
1851// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
1852//
1853{
1854
1855 //
1856 // see comments in toku_ft_leaf_apply_msg
1857 // to understand why we handle setting
1858 // node->max_msn_applied_to_node_on_disk here,
1859 // and don't do it in toku_ftnode_put_msg
1860 //
1861 MSN msg_msn = msg.msn();
1862 invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
1863 node->max_msn_applied_to_node_on_disk = msg_msn;
1864
1865 if (ft_msg_type_applies_once(msg.type())) {
1866 ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas);
1867 } else if (ft_msg_type_applies_all(msg.type())) {
1868 ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas);
1869 } else {
1870 paranoid_invariant(ft_msg_type_does_nothing(msg.type()));
1871 }
1872}
1873
1874// Garbage collect one leaf entry.
1875static void
1876ft_basement_node_gc_once(BASEMENTNODE bn,
1877 uint32_t index,
1878 void* keyp,
1879 uint32_t keylen,
1880 LEAFENTRY leaf_entry,
1881 txn_gc_info *gc_info,
1882 STAT64INFO_S * delta)
1883{
1884 paranoid_invariant(leaf_entry);
1885
1886 // Don't run garbage collection on non-mvcc leaf entries.
1887 if (leaf_entry->type != LE_MVCC) {
1888 goto exit;
1889 }
1890
1891 // Don't run garbage collection if this leafentry decides it's not worth it.
1892 if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) {
1893 goto exit;
1894 }
1895
1896 LEAFENTRY new_leaf_entry;
1897 new_leaf_entry = NULL;
1898
1899 // The mempool doesn't free itself. When it allocates new memory,
1900 // this pointer will be set to the older memory that must now be
1901 // freed.
1902 void * maybe_free;
1903 maybe_free = NULL;
1904
1905 // These will represent the number of bytes and rows changed as
1906 // part of the garbage collection.
1907 int64_t numbytes_delta;
1908 int64_t numrows_delta;
1909 toku_le_garbage_collect(leaf_entry,
1910 &bn->data_buffer,
1911 index,
1912 keyp,
1913 keylen,
1914 gc_info,
1915 &new_leaf_entry,
1916 &numbytes_delta);
1917
1918 numrows_delta = 0;
1919 if (new_leaf_entry) {
1920 numrows_delta = 0;
1921 } else {
1922 numrows_delta = -1;
1923 }
1924
1925 // If we created a new mempool buffer we must free the
1926 // old/original buffer.
1927 if (maybe_free) {
1928 toku_free(maybe_free);
1929 }
1930
1931 // Update stats.
1932 bn->stat64_delta.numrows += numrows_delta;
1933 bn->stat64_delta.numbytes += numbytes_delta;
1934 delta->numrows += numrows_delta;
1935 delta->numbytes += numbytes_delta;
1936
1937exit:
1938 return;
1939}
1940
1941// Garbage collect all leaf entries for a given basement node.
1942static void
1943basement_node_gc_all_les(BASEMENTNODE bn,
1944 txn_gc_info *gc_info,
1945 STAT64INFO_S * delta)
1946{
1947 int r = 0;
1948 uint32_t index = 0;
1949 uint32_t num_leafentries_before;
1950 while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1951 void* keyp = NULL;
1952 uint32_t keylen = 0;
1953 LEAFENTRY leaf_entry;
1954 r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp);
1955 assert_zero(r);
1956 ft_basement_node_gc_once(
1957 bn,
1958 index,
1959 keyp,
1960 keylen,
1961 leaf_entry,
1962 gc_info,
1963 delta
1964 );
1965 // Check if the leaf entry was deleted or not.
1966 if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1967 ++index;
1968 }
1969 }
1970}
1971
1972// Garbage collect all leaf entires in all basement nodes.
1973static void
1974ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info)
1975{
1976 toku_ftnode_assert_fully_in_memory(node);
1977 paranoid_invariant_zero(node->height);
1978 // Loop through each leaf entry, garbage collecting as we go.
1979 for (int i = 0; i < node->n_children; ++i) {
1980 // Perform the garbage collection.
1981 BASEMENTNODE bn = BLB(node, i);
1982 STAT64INFO_S delta;
1983 delta.numrows = 0;
1984 delta.numbytes = 0;
1985 basement_node_gc_all_les(bn, gc_info, &delta);
1986 toku_ft_update_stats(&ft->in_memory_stats, delta);
1987 }
1988}
1989
1990void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) {
1991 TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1992 if (logger) {
1993 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
1994 txn_manager_state txn_state_for_gc(txn_manager);
1995 txn_state_for_gc.init();
1996 TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1997
1998 // Perform full garbage collection.
1999 //
2000 // - txn_state_for_gc
2001 // a fresh snapshot of the transaction system.
2002 // - oldest_referenced_xid_for_simple_gc
2003 // the oldest xid in any live list as of right now - suitible for simple gc
2004 // - node->oldest_referenced_xid_known
2005 // the last known oldest referenced xid for this node and any unapplied messages.
2006 // it is a lower bound on the actual oldest referenced xid - but becasue there
2007 // may be abort messages above us, we need to be careful to only use this value
2008 // for implicit promotion (as opposed to the oldest referenced xid for simple gc)
2009 //
2010 // The node has its own oldest referenced xid because it must be careful not to implicitly promote
2011 // provisional entries for transactions that are no longer live, but may have abort messages
2012 // somewhere above us in the tree.
2013 txn_gc_info gc_info(&txn_state_for_gc,
2014 oldest_referenced_xid_for_simple_gc,
2015 node->oldest_referenced_xid_known,
2016 true);
2017 ft_leaf_gc_all_les(ft, node, &gc_info);
2018 }
2019}
2020
2021void toku_ftnode_put_msg(
2022 const toku::comparator &cmp,
2023 ft_update_func update_fun,
2024 FTNODE node,
2025 int target_childnum,
2026 const ft_msg &msg,
2027 bool is_fresh,
2028 txn_gc_info* gc_info,
2029 size_t flow_deltas[],
2030 STAT64INFO stats_to_update,
2031 int64_t* logical_rows_delta) {
2032// Effect: Push message into the subtree rooted at NODE.
2033// If NODE is a leaf, then
2034// put message into leaf, applying it to the leafentries
2035// If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
2036// The node may become overfull. That's not our problem.
2037 toku_ftnode_assert_fully_in_memory(node);
2038 //
2039 // see comments in toku_ft_leaf_apply_msg
2040 // to understand why we don't handle setting
2041 // node->max_msn_applied_to_node_on_disk here,
2042 // and instead defer to these functions
2043 //
2044 if (node->height==0) {
2045 toku_ft_leaf_apply_msg(
2046 cmp,
2047 update_fun,
2048 node,
2049 target_childnum, msg,
2050 gc_info,
2051 nullptr,
2052 stats_to_update,
2053 logical_rows_delta);
2054 } else {
2055 ft_nonleaf_put_msg(
2056 cmp,
2057 node,
2058 target_childnum,
2059 msg,
2060 is_fresh,
2061 flow_deltas);
2062 }
2063}
2064
2065// Effect: applies the message to the leaf if the appropriate basement node is
2066// in memory. This function is called during message injection and/or
2067// flushing, so the entire node MUST be in memory.
2068void toku_ft_leaf_apply_msg(
2069 const toku::comparator& cmp,
2070 ft_update_func update_fun,
2071 FTNODE node,
2072 int target_childnum, // which child to inject to, or -1 if unknown
2073 const ft_msg& msg,
2074 txn_gc_info* gc_info,
2075 uint64_t* workdone,
2076 STAT64INFO stats_to_update,
2077 int64_t* logical_rows_delta) {
2078
2079 VERIFY_NODE(t, node);
2080 toku_ftnode_assert_fully_in_memory(node);
2081
2082 //
2083 // Because toku_ft_leaf_apply_msg is called with the intent of permanently
2084 // applying a message to a leaf node (meaning the message is permanently applied
2085 // and will be purged from the system after this call, as opposed to
2086 // toku_apply_ancestors_messages_to_node, which applies a message
2087 // for a query, but the message may still reside in the system and
2088 // be reapplied later), we mark the node as dirty and
2089 // take the opportunity to update node->max_msn_applied_to_node_on_disk.
2090 //
2091 node->dirty = 1;
2092
2093 //
2094 // we cannot blindly update node->max_msn_applied_to_node_on_disk,
2095 // we must check to see if the msn is greater that the one already stored,
2096 // because the message may have already been applied earlier (via
2097 // toku_apply_ancestors_messages_to_node) to answer a query
2098 //
2099 // This is why we handle node->max_msn_applied_to_node_on_disk both here
2100 // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg.
2101 //
2102 MSN msg_msn = msg.msn();
2103 if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) {
2104 node->max_msn_applied_to_node_on_disk = msg_msn;
2105 }
2106
2107 if (ft_msg_type_applies_once(msg.type())) {
2108 unsigned int childnum = (target_childnum >= 0
2109 ? target_childnum
2110 : toku_ftnode_which_child(node, msg.kdbt(), cmp));
2111 BASEMENTNODE bn = BLB(node, childnum);
2112 if (msg.msn().msn > bn->max_msn_applied.msn) {
2113 bn->max_msn_applied = msg.msn();
2114 toku_ft_bn_apply_msg(
2115 cmp,
2116 update_fun,
2117 bn,
2118 msg,
2119 gc_info,
2120 workdone,
2121 stats_to_update,
2122 logical_rows_delta);
2123 } else {
2124 toku_ft_status_note_msn_discard();
2125 }
2126 } else if (ft_msg_type_applies_all(msg.type())) {
2127 for (int childnum=0; childnum<node->n_children; childnum++) {
2128 if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) {
2129 BLB(node, childnum)->max_msn_applied = msg.msn();
2130 toku_ft_bn_apply_msg(
2131 cmp,
2132 update_fun,
2133 BLB(node, childnum),
2134 msg,
2135 gc_info,
2136 workdone,
2137 stats_to_update,
2138 logical_rows_delta);
2139 } else {
2140 toku_ft_status_note_msn_discard();
2141 }
2142 }
2143 } else if (!ft_msg_type_does_nothing(msg.type())) {
2144 invariant(ft_msg_type_does_nothing(msg.type()));
2145 }
2146 VERIFY_NODE(t, node);
2147}
2148
2149