1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include "ft/ft.h"
41#include "ft/ft-cachetable-wrappers.h"
42#include "ft/ft-internal.h"
43#include "ft/ft-flusher.h"
44#include "ft/ft-flusher-internal.h"
45#include "ft/node.h"
46#include "ft/serialize/block_table.h"
47#include "ft/serialize/ft_node-serialize.h"
48#include "portability/toku_assert.h"
49#include "portability/toku_atomic.h"
50#include "util/status.h"
51#include "util/context.h"
52
53
54void toku_ft_flusher_get_status(FT_FLUSHER_STATUS status) {
55 fl_status.init();
56 *status = fl_status;
57}
58
59//
60// For test purposes only.
61// These callbacks are never used in production code, only as a way
62// to test the system (for example, by causing crashes at predictable times).
63//
64static void (*flusher_thread_callback)(int, void*) = NULL;
65static void *flusher_thread_callback_extra = NULL;
66
67void toku_flusher_thread_set_callback(void (*callback_f)(int, void*),
68 void* extra) {
69 flusher_thread_callback = callback_f;
70 flusher_thread_callback_extra = extra;
71}
72
73static void call_flusher_thread_callback(int flt_state) {
74 if (flusher_thread_callback) {
75 flusher_thread_callback(flt_state, flusher_thread_callback_extra);
76 }
77}
78
79static int
80find_heaviest_child(FTNODE node)
81{
82 int max_child = 0;
83 uint64_t max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
84
85 invariant(node->n_children > 0);
86 for (int i = 1; i < node->n_children; i++) {
87 uint64_t bytes_in_buf = toku_bnc_nbytesinbuf(BNC(node, i));
88 uint64_t workdone = BP_WORKDONE(node, i);
89 if (workdone > 0) {
90 invariant(bytes_in_buf > 0);
91 }
92 uint64_t this_weight = bytes_in_buf + workdone;
93 if (max_weight < this_weight) {
94 max_child = i;
95 max_weight = this_weight;
96 }
97 }
98 return max_child;
99}
100
101static void
102update_flush_status(FTNODE child, int cascades) {
103 FL_STATUS_VAL(FT_FLUSHER_FLUSH_TOTAL)++;
104 if (cascades > 0) {
105 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES)++;
106 switch (cascades) {
107 case 1:
108 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_1)++; break;
109 case 2:
110 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_2)++; break;
111 case 3:
112 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_3)++; break;
113 case 4:
114 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_4)++; break;
115 case 5:
116 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_5)++; break;
117 default:
118 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_GT_5)++; break;
119 }
120 }
121 bool flush_needs_io = false;
122 for (int i = 0; !flush_needs_io && i < child->n_children; ++i) {
123 if (BP_STATE(child, i) == PT_ON_DISK) {
124 flush_needs_io = true;
125 }
126 }
127 if (flush_needs_io) {
128 FL_STATUS_VAL(FT_FLUSHER_FLUSH_NEEDED_IO)++;
129 } else {
130 FL_STATUS_VAL(FT_FLUSHER_FLUSH_IN_MEMORY)++;
131 }
132}
133
134static void
135maybe_destroy_child_blbs(FTNODE node, FTNODE child, FT ft)
136{
137 // If the node is already fully in memory, as in upgrade, we don't
138 // need to destroy the basement nodes because they are all equally
139 // up to date.
140 if (child->n_children > 1 &&
141 child->height == 0 &&
142 !child->dirty) {
143 for (int i = 0; i < child->n_children; ++i) {
144 if (BP_STATE(child, i) == PT_AVAIL &&
145 node->max_msn_applied_to_node_on_disk.msn < BLB_MAX_MSN_APPLIED(child, i).msn)
146 {
147 toku_evict_bn_from_memory(child, i, ft);
148 }
149 }
150 }
151}
152
153static void
154ft_merge_child(
155 FT ft,
156 FTNODE node,
157 int childnum_to_merge,
158 bool *did_react,
159 struct flusher_advice *fa);
160
161static int
162pick_heaviest_child(FT UU(ft),
163 FTNODE parent,
164 void* UU(extra))
165{
166 int childnum = find_heaviest_child(parent);
167 paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
168 return childnum;
169}
170
171bool
172dont_destroy_basement_nodes(void* UU(extra))
173{
174 return false;
175}
176
177static bool
178do_destroy_basement_nodes(void* UU(extra))
179{
180 return true;
181}
182
183bool
184always_recursively_flush(FTNODE UU(child), void* UU(extra))
185{
186 return true;
187}
188
189bool
190never_recursively_flush(FTNODE UU(child), void* UU(extra))
191{
192 return false;
193}
194
195/**
196 * Flusher thread ("normal" flushing) implementation.
197 */
198struct flush_status_update_extra {
199 int cascades;
200 uint32_t nodesize;
201};
202
203static bool
204recurse_if_child_is_gorged(FTNODE child, void* extra)
205{
206 struct flush_status_update_extra *fste = (flush_status_update_extra *)extra;
207 return toku_ftnode_nonleaf_is_gorged(child, fste->nodesize);
208}
209
210int
211default_pick_child_after_split(FT UU(ft),
212 FTNODE UU(parent),
213 int UU(childnuma),
214 int UU(childnumb),
215 void* UU(extra))
216{
217 return -1;
218}
219
220void
221default_merge_child(struct flusher_advice *fa,
222 FT ft,
223 FTNODE parent,
224 int childnum,
225 FTNODE child,
226 void* UU(extra))
227{
228 //
229 // There is probably a way to pass FTNODE child
230 // into ft_merge_child, but for simplicity for now,
231 // we are just going to unpin child and
232 // let ft_merge_child pin it again
233 //
234 toku_unpin_ftnode(ft, child);
235 //
236 //
237 // it is responsibility of ft_merge_child to unlock parent
238 //
239 bool did_react;
240 ft_merge_child(ft, parent, childnum, &did_react, fa);
241}
242
243void
244flusher_advice_init(
245 struct flusher_advice *fa,
246 FA_PICK_CHILD pick_child,
247 FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,
248 FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,
249 FA_MAYBE_MERGE_CHILD maybe_merge_child,
250 FA_UPDATE_STATUS update_status,
251 FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,
252 void* extra
253 )
254{
255 fa->pick_child = pick_child;
256 fa->should_destroy_basement_nodes = should_destroy_basement_nodes;
257 fa->should_recursively_flush = should_recursively_flush;
258 fa->maybe_merge_child = maybe_merge_child;
259 fa->update_status = update_status;
260 fa->pick_child_after_split = pick_child_after_split;
261 fa->extra = extra;
262}
263
264static void
265flt_update_status(FTNODE child,
266 int UU(dirtied),
267 void* extra)
268{
269 struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
270 update_flush_status(child, fste->cascades);
271 // If `toku_ft_flush_some_child` decides to recurse after this, we'll need
272 // cascades to increase. If not it doesn't matter.
273 fste->cascades++;
274}
275
276static void
277flt_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra *fste, uint32_t nodesize)
278{
279 fste->cascades = 0;
280 fste->nodesize = nodesize;
281 flusher_advice_init(fa,
282 pick_heaviest_child,
283 dont_destroy_basement_nodes,
284 recurse_if_child_is_gorged,
285 default_merge_child,
286 flt_update_status,
287 default_pick_child_after_split,
288 fste);
289}
290
291struct ctm_extra {
292 bool is_last_child;
293 DBT target_key;
294};
295
296static int
297ctm_pick_child(FT ft,
298 FTNODE parent,
299 void* extra)
300{
301 struct ctm_extra* ctme = (struct ctm_extra *) extra;
302 int childnum;
303 if (parent->height == 1 && ctme->is_last_child) {
304 childnum = parent->n_children - 1;
305 } else {
306 childnum = toku_ftnode_which_child(parent, &ctme->target_key, ft->cmp);
307 }
308 return childnum;
309}
310
311static void
312ctm_update_status(
313 FTNODE UU(child),
314 int dirtied,
315 void* UU(extra)
316 )
317{
318 FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE) += dirtied;
319}
320
321static void
322ctm_maybe_merge_child(struct flusher_advice *fa,
323 FT ft,
324 FTNODE parent,
325 int childnum,
326 FTNODE child,
327 void *extra)
328{
329 if (child->height == 0) {
330 (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
331 }
332 default_merge_child(fa, ft, parent, childnum, child, extra);
333}
334
335static void
336ct_maybe_merge_child(struct flusher_advice *fa,
337 FT ft,
338 FTNODE parent,
339 int childnum,
340 FTNODE child,
341 void* extra)
342{
343 if (child->height > 0) {
344 default_merge_child(fa, ft, parent, childnum, child, extra);
345 }
346 else {
347 struct ctm_extra ctme;
348 paranoid_invariant(parent->n_children > 1);
349 int pivot_to_save;
350 //
351 // we have two cases, one where the childnum
352 // is the last child, and therefore the pivot we
353 // save is not of the pivot which we wish to descend
354 // and another where it is not the last child,
355 // so the pivot is sufficient for identifying the leaf
356 // to be merged
357 //
358 if (childnum == (parent->n_children - 1)) {
359 ctme.is_last_child = true;
360 pivot_to_save = childnum - 1;
361 }
362 else {
363 ctme.is_last_child = false;
364 pivot_to_save = childnum;
365 }
366 toku_clone_dbt(&ctme.target_key, parent->pivotkeys.get_pivot(pivot_to_save));
367
368 // at this point, ctme is properly setup, now we can do the merge
369 struct flusher_advice new_fa;
370 flusher_advice_init(
371 &new_fa,
372 ctm_pick_child,
373 dont_destroy_basement_nodes,
374 always_recursively_flush,
375 ctm_maybe_merge_child,
376 ctm_update_status,
377 default_pick_child_after_split,
378 &ctme);
379
380 toku_unpin_ftnode(ft, parent);
381 toku_unpin_ftnode(ft, child);
382
383 FTNODE root_node = NULL;
384 {
385 uint32_t fullhash;
386 CACHEKEY root;
387 toku_calculate_root_offset_pointer(ft, &root, &fullhash);
388 ftnode_fetch_extra bfe;
389 bfe.create_for_full_read(ft);
390 toku_pin_ftnode(ft, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, &root_node, true);
391 toku_ftnode_assert_fully_in_memory(root_node);
392 }
393
394 (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
395 (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
396
397 toku_ft_flush_some_child(ft, root_node, &new_fa);
398
399 (void) toku_sync_fetch_and_sub(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
400
401 toku_destroy_dbt(&ctme.target_key);
402 }
403}
404
405static void
406ct_update_status(FTNODE child,
407 int dirtied,
408 void* extra)
409{
410 struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
411 update_flush_status(child, fste->cascades);
412 FL_STATUS_VAL(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
413 // Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
414 fste->cascades++;
415}
416
417static void
418ct_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra* fste, uint32_t nodesize)
419{
420 fste->cascades = 0;
421 fste->nodesize = nodesize;
422 flusher_advice_init(fa,
423 pick_heaviest_child,
424 do_destroy_basement_nodes,
425 recurse_if_child_is_gorged,
426 ct_maybe_merge_child,
427 ct_update_status,
428 default_pick_child_after_split,
429 fste);
430}
431
432//
433// This returns true if the node MAY be reactive,
434// false is we are absolutely sure that it is NOT reactive.
435// The reason for inaccuracy is that the node may be
436// a leaf node that is not entirely in memory. If so, then
437// we cannot be sure if the node is reactive.
438//
439static bool ft_ftnode_may_be_reactive(FT ft, FTNODE node)
440{
441 if (node->height == 0) {
442 return true;
443 } else {
444 return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout) != RE_STABLE;
445 }
446}
447
448/* NODE is a node with a child.
449 * childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
450 * We must slide things around, & move things from the old table to the new tables.
451 * Requires: the CHILDNUMth buffer of node is empty.
452 * We don't push anything down to children. We split the node, and things land wherever they land.
453 * We must delete the old buffer (but the old child is already deleted.)
454 * On return, the new children and node STAY PINNED.
455 */
456static void
457handle_split_of_child(
458 FT ft,
459 FTNODE node,
460 int childnum,
461 FTNODE childa,
462 FTNODE childb,
463 DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
464 )
465{
466 paranoid_invariant(node->height>0);
467 paranoid_invariant(0 <= childnum);
468 paranoid_invariant(childnum < node->n_children);
469 toku_ftnode_assert_fully_in_memory(node);
470 toku_ftnode_assert_fully_in_memory(childa);
471 toku_ftnode_assert_fully_in_memory(childb);
472 NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
473 paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
474 WHEN_NOT_GCOV(
475 if (toku_ft_debug_mode) {
476 printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
477 printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
478 for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
479 printf("\n");
480 }
481 )
482
483 node->dirty = 1;
484
485 XREALLOC_N(node->n_children+1, node->bp);
486 // Slide the children over.
487 // suppose n_children is 10 and childnum is 5, meaning node->childnum[5] just got split
488 // this moves node->bp[6] through node->bp[9] over to
489 // node->bp[7] through node->bp[10]
490 for (int cnum=node->n_children; cnum>childnum+1; cnum--) {
491 node->bp[cnum] = node->bp[cnum-1];
492 }
493 memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
494 node->n_children++;
495
496 paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->blocknum.b); // use the same child
497
498 // We never set the rightmost blocknum to be the root.
499 // Instead, we wait for the root to split and let promotion initialize the rightmost
500 // blocknum to be the first non-root leaf node on the right extreme to receive an insert.
501 BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
502 invariant(ft->h->root_blocknum.b != rightmost_blocknum.b);
503 if (childa->blocknum.b == rightmost_blocknum.b) {
504 // The rightmost leaf (a) split into (a) and (b). We want (b) to swap pair values
505 // with (a), now that it is the new rightmost leaf. This keeps the rightmost blocknum
506 // constant, the same the way we keep the root blocknum constant.
507 toku_ftnode_swap_pair_values(childa, childb);
508 BP_BLOCKNUM(node, childnum) = childa->blocknum;
509 }
510
511 BP_BLOCKNUM(node, childnum+1) = childb->blocknum;
512 BP_WORKDONE(node, childnum+1) = 0;
513 BP_STATE(node,childnum+1) = PT_AVAIL;
514
515 NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
516 for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
517 // just split the flows in half for now, can't guess much better
518 // at the moment
519 new_bnc->flow[i] = old_bnc->flow[i] / 2;
520 old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
521 }
522 set_BNC(node, childnum+1, new_bnc);
523
524 // Insert the new split key , sliding the other keys over
525 node->pivotkeys.insert_at(splitk, childnum);
526
527 WHEN_NOT_GCOV(
528 if (toku_ft_debug_mode) {
529 printf("%s:%d splitkeys:", __FILE__, __LINE__);
530 for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
531 printf("\n");
532 }
533 )
534
535 /* Keep pushing to the children, but not if the children would require a pushdown */
536 toku_ftnode_assert_fully_in_memory(node);
537 toku_ftnode_assert_fully_in_memory(childa);
538 toku_ftnode_assert_fully_in_memory(childb);
539
540 VERIFY_NODE(t, node);
541 VERIFY_NODE(t, childa);
542 VERIFY_NODE(t, childb);
543}
544
545static void
546verify_all_in_mempool(FTNODE UU() node)
547{
548#ifdef TOKU_DEBUG_PARANOID
549 if (node->height==0) {
550 for (int i = 0; i < node->n_children; i++) {
551 invariant(BP_STATE(node,i) == PT_AVAIL);
552 BLB_DATA(node, i)->verify_mempool();
553 }
554 }
555#endif
556}
557
558static uint64_t
559ftleaf_disk_size(FTNODE node)
560// Effect: get the disk size of a leafentry
561{
562 paranoid_invariant(node->height == 0);
563 toku_ftnode_assert_fully_in_memory(node);
564 uint64_t retval = 0;
565 for (int i = 0; i < node->n_children; i++) {
566 retval += BLB_DATA(node, i)->get_disk_size();
567 }
568 return retval;
569}
570
571static void
572ftleaf_get_split_loc(
573 FTNODE node,
574 enum split_mode split_mode,
575 int *num_left_bns, // which basement within leaf
576 int *num_left_les // which key within basement
577 )
578// Effect: Find the location within a leaf node where we want to perform a split
579// num_left_bns is how many basement nodes (which OMT) should be split to the left.
580// num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
581{
582 switch (split_mode) {
583 case SPLIT_LEFT_HEAVY: {
584 *num_left_bns = node->n_children;
585 *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
586 if (*num_left_les == 0) {
587 *num_left_bns = node->n_children - 1;
588 *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
589 }
590 goto exit;
591 }
592 case SPLIT_RIGHT_HEAVY: {
593 *num_left_bns = 1;
594 *num_left_les = BLB_DATA(node, 0)->num_klpairs() ? 1 : 0;
595 goto exit;
596 }
597 case SPLIT_EVENLY: {
598 paranoid_invariant(node->height == 0);
599 // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
600 uint64_t sumlesizes = ftleaf_disk_size(node);
601 uint32_t size_so_far = 0;
602 for (int i = 0; i < node->n_children; i++) {
603 bn_data* bd = BLB_DATA(node, i);
604 uint32_t n_leafentries = bd->num_klpairs();
605 for (uint32_t j=0; j < n_leafentries; j++) {
606 size_t size_this_le;
607 int rr = bd->fetch_klpair_disksize(j, &size_this_le);
608 invariant_zero(rr);
609 size_so_far += size_this_le;
610 if (size_so_far >= sumlesizes/2) {
611 *num_left_bns = i + 1;
612 *num_left_les = j + 1;
613 if (*num_left_bns == node->n_children &&
614 (unsigned int) *num_left_les == n_leafentries) {
615 // need to correct for when we're splitting after the
616 // last element, that makes no sense
617 if (*num_left_les > 1) {
618 (*num_left_les)--;
619 } else if (*num_left_bns > 1) {
620 (*num_left_bns)--;
621 *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
622 } else {
623 // we are trying to split a leaf with only one
624 // leafentry in it
625 abort();
626 }
627 }
628 goto exit;
629 }
630 }
631 }
632 }
633 }
634 abort();
635exit:
636 return;
637}
638
639static void
640move_leafentries(
641 BASEMENTNODE dest_bn,
642 BASEMENTNODE src_bn,
643 uint32_t lbi, //lower bound inclusive
644 uint32_t ube //upper bound exclusive
645 )
646//Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
647{
648 invariant(ube == src_bn->data_buffer.num_klpairs());
649 src_bn->data_buffer.split_klpairs(&dest_bn->data_buffer, lbi);
650}
651
652static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
653// Effect: Finalizes a split by updating some bits and dirtying both nodes
654 toku_ftnode_assert_fully_in_memory(node);
655 toku_ftnode_assert_fully_in_memory(B);
656 verify_all_in_mempool(node);
657 verify_all_in_mempool(B);
658
659 node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
660 B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
661
662 // The new node in the split inherits the oldest known reference xid
663 B->oldest_referenced_xid_known = node->oldest_referenced_xid_known;
664
665 node->dirty = 1;
666 B->dirty = 1;
667}
668
669void
670ftleaf_split(
671 FT ft,
672 FTNODE node,
673 FTNODE *nodea,
674 FTNODE *nodeb,
675 DBT *splitk,
676 bool create_new_node,
677 enum split_mode split_mode,
678 uint32_t num_dependent_nodes,
679 FTNODE* dependent_nodes)
680// Effect: Split a leaf node.
681// Argument "node" is node to be split.
682// Upon return:
683// nodea and nodeb point to new nodes that result from split of "node"
684// nodea is the left node that results from the split
685// splitk is the right-most key of nodea
686{
687
688 paranoid_invariant(node->height == 0);
689 FL_STATUS_VAL(FT_FLUSHER_SPLIT_LEAF)++;
690 if (node->n_children) {
691 // First move all the accumulated stat64info deltas into the first basement.
692 // After the split, either both nodes or neither node will be included in the next checkpoint.
693 // The accumulated stats in the dictionary will be correct in either case.
694 // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
695 // correct information for a basement that is divided between two leafnodes (i.e. when split is
696 // not on a basement boundary).
697 STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
698 BASEMENTNODE bn = BLB(node,0);
699 bn->stat64_delta = delta_for_leafnode;
700 }
701
702
703 FTNODE B = nullptr;
704 uint32_t fullhash;
705 BLOCKNUM name;
706
707 if (create_new_node) {
708 // put value in cachetable and do checkpointing
709 // of dependent nodes
710 //
711 // We do this here, before evaluating the last_bn_on_left
712 // and last_le_on_left_within_bn because this operation
713 // may write to disk the dependent nodes.
714 // While doing so, we may rebalance the leaf node
715 // we are splitting, thereby invalidating the
716 // values of last_bn_on_left and last_le_on_left_within_bn.
717 // So, we must call this before evaluating
718 // those two values
719 cachetable_put_empty_node_with_dep_nodes(
720 ft,
721 num_dependent_nodes,
722 dependent_nodes,
723 &name,
724 &fullhash,
725 &B
726 );
727 // GCC 4.8 seems to get confused and think B is maybe uninitialized at link time.
728 // TODO(leif): figure out why it thinks this and actually fix it.
729 invariant_notnull(B);
730 }
731
732
733 paranoid_invariant(node->height==0);
734 toku_ftnode_assert_fully_in_memory(node);
735 verify_all_in_mempool(node);
736 MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
737
738 // variables that say where we will do the split.
739 // After the split, there will be num_left_bns basement nodes in the left node,
740 // and the last basement node in the left node will have num_left_les leafentries.
741 int num_left_bns;
742 int num_left_les;
743 ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
744 {
745 // did we split right on the boundary between basement nodes?
746 const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) BLB_DATA(node, num_left_bns - 1)->num_klpairs());
747 // Now we know where we are going to break it
748 // the two nodes will have a total of n_children+1 basement nodes
749 // and n_children-1 pivots
750 // the left node, node, will have last_bn_on_left+1 basement nodes
751 // the right node, B, will have n_children-last_bn_on_left basement nodes
752 // the pivots of node will be the first last_bn_on_left pivots that originally exist
753 // the pivots of B will be the last (n_children - 1 - last_bn_on_left) pivots that originally exist
754
755 // Note: The basements will not be rebalanced. Only the mempool of the basement that is split
756 // (if split_on_boundary is false) will be affected. All other mempools will remain intact. ???
757
758 //set up the basement nodes in the new node
759 int num_children_in_node = num_left_bns;
760 // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
761 // while it's not on the boundary, we do need node->n_children
762 // children in B.
763 int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
764 if (num_children_in_b == 0) {
765 // for uneven split, make sure we have at least 1 bn
766 paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
767 num_children_in_b = 1;
768 }
769 paranoid_invariant(num_children_in_node > 0);
770 if (create_new_node) {
771 toku_initialize_empty_ftnode(
772 B,
773 name,
774 0,
775 num_children_in_b,
776 ft->h->layout_version,
777 ft->h->flags);
778 B->fullhash = fullhash;
779 }
780 else {
781 B = *nodeb;
782 REALLOC_N(num_children_in_b, B->bp);
783 B->n_children = num_children_in_b;
784 for (int i = 0; i < num_children_in_b; i++) {
785 BP_BLOCKNUM(B,i).b = 0;
786 BP_STATE(B,i) = PT_AVAIL;
787 BP_WORKDONE(B,i) = 0;
788 set_BLB(B, i, toku_create_empty_bn());
789 }
790 }
791
792 // now move all the data
793
794 int curr_src_bn_index = num_left_bns - 1;
795 int curr_dest_bn_index = 0;
796
797 // handle the move of a subset of data in last_bn_on_left from node to B
798 if (!split_on_boundary) {
799 BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
800 destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
801 set_BNULL(B, curr_dest_bn_index);
802 set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
803 move_leafentries(BLB(B, curr_dest_bn_index),
804 BLB(node, curr_src_bn_index),
805 num_left_les, // first row to be moved to B
806 BLB_DATA(node, curr_src_bn_index)->num_klpairs() // number of rows in basement to be split
807 );
808 BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
809 curr_dest_bn_index++;
810 }
811 curr_src_bn_index++;
812
813 paranoid_invariant(B->n_children >= curr_dest_bn_index);
814 paranoid_invariant(node->n_children >= curr_src_bn_index);
815
816 // move the rest of the basement nodes
817 for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
818 destroy_basement_node(BLB(B, curr_dest_bn_index));
819 set_BNULL(B, curr_dest_bn_index);
820 B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
821 }
822 if (curr_dest_bn_index < B->n_children) {
823 // B already has an empty basement node here.
824 BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
825 }
826
827 //
828 // now handle the pivots
829 //
830
831 // the child index in the original node that corresponds to the
832 // first node in the right node of the split
833 int split_idx = num_left_bns - (split_on_boundary ? 0 : 1);
834 node->pivotkeys.split_at(split_idx, &B->pivotkeys);
835 if (split_on_boundary && num_left_bns < node->n_children && splitk) {
836 toku_copyref_dbt(splitk, node->pivotkeys.get_pivot(num_left_bns - 1));
837 } else if (splitk) {
838 bn_data* bd = BLB_DATA(node, num_left_bns - 1);
839 uint32_t keylen;
840 void *key;
841 int rr = bd->fetch_key_and_len(bd->num_klpairs() - 1, &keylen, &key);
842 invariant_zero(rr);
843 toku_memdup_dbt(splitk, key, keylen);
844 }
845
846 node->n_children = num_children_in_node;
847 REALLOC_N(num_children_in_node, node->bp);
848 }
849
850 ftnode_finalize_split(node, B, max_msn_applied_to_node);
851 *nodea = node;
852 *nodeb = B;
853} // end of ftleaf_split()
854
855void
856ft_nonleaf_split(
857 FT ft,
858 FTNODE node,
859 FTNODE *nodea,
860 FTNODE *nodeb,
861 DBT *splitk,
862 uint32_t num_dependent_nodes,
863 FTNODE* dependent_nodes)
864{
865 //VERIFY_NODE(t,node);
866 FL_STATUS_VAL(FT_FLUSHER_SPLIT_NONLEAF)++;
867 toku_ftnode_assert_fully_in_memory(node);
868 int old_n_children = node->n_children;
869 int n_children_in_a = old_n_children/2;
870 int n_children_in_b = old_n_children-n_children_in_a;
871 MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
872 FTNODE B;
873 paranoid_invariant(node->height>0);
874 paranoid_invariant(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
875 create_new_ftnode_with_dep_nodes(ft, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
876 {
877 /* The first n_children_in_a go into node a.
878 * That means that the first n_children_in_a-1 keys go into node a.
879 * The splitter key is key number n_children_in_a */
880 for (int i = n_children_in_a; i<old_n_children; i++) {
881 int targchild = i-n_children_in_a;
882 // TODO: Figure out better way to handle this
883 // the problem is that create_new_ftnode_with_dep_nodes for B creates
884 // all the data structures, whereas we really don't want it to fill
885 // in anything for the bp's.
886 // Now we have to go free what it just created so we can
887 // slide the bp over
888 destroy_nonleaf_childinfo(BNC(B, targchild));
889 // now move the bp over
890 B->bp[targchild] = node->bp[i];
891 memset(&node->bp[i], 0, sizeof(node->bp[0]));
892 }
893
894 // the split key for our parent is the rightmost pivot key in node
895 node->pivotkeys.split_at(n_children_in_a, &B->pivotkeys);
896 toku_clone_dbt(splitk, node->pivotkeys.get_pivot(n_children_in_a - 1));
897 node->pivotkeys.delete_at(n_children_in_a - 1);
898
899 node->n_children = n_children_in_a;
900 REALLOC_N(node->n_children, node->bp);
901 }
902
903 ftnode_finalize_split(node, B, max_msn_applied_to_node);
904 *nodea = node;
905 *nodeb = B;
906}
907
908//
909// responsibility of ft_split_child is to take locked FTNODEs node and child
910// and do the following:
911// - split child,
912// - fix node,
913// - release lock on node
914// - possibly flush either new children created from split, otherwise unlock children
915//
916static void
917ft_split_child(
918 FT ft,
919 FTNODE node,
920 int childnum,
921 FTNODE child,
922 enum split_mode split_mode,
923 struct flusher_advice *fa)
924{
925 paranoid_invariant(node->height>0);
926 paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
927 FTNODE nodea, nodeb;
928 DBT splitk;
929
930 // for test
931 call_flusher_thread_callback(flt_flush_before_split);
932
933 FTNODE dep_nodes[2];
934 dep_nodes[0] = node;
935 dep_nodes[1] = child;
936 if (child->height==0) {
937 ftleaf_split(ft, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
938 } else {
939 ft_nonleaf_split(ft, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
940 }
941 // printf("%s:%d child did split\n", __FILE__, __LINE__);
942 handle_split_of_child (ft, node, childnum, nodea, nodeb, &splitk);
943
944 // for test
945 call_flusher_thread_callback(flt_flush_during_split);
946
947 // at this point, the split is complete
948 // now we need to unlock node,
949 // and possibly continue
950 // flushing one of the children
951 int picked_child = fa->pick_child_after_split(ft, node, childnum, childnum + 1, fa->extra);
952 toku_unpin_ftnode(ft, node);
953 if (picked_child == childnum ||
954 (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
955 toku_unpin_ftnode(ft, nodeb);
956 toku_ft_flush_some_child(ft, nodea, fa);
957 }
958 else if (picked_child == childnum + 1 ||
959 (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
960 toku_unpin_ftnode(ft, nodea);
961 toku_ft_flush_some_child(ft, nodeb, fa);
962 }
963 else {
964 toku_unpin_ftnode(ft, nodea);
965 toku_unpin_ftnode(ft, nodeb);
966 }
967
968 toku_destroy_dbt(&splitk);
969}
970
971static void bring_node_fully_into_memory(FTNODE node, FT ft) {
972 if (!toku_ftnode_fully_in_memory(node)) {
973 ftnode_fetch_extra bfe;
974 bfe.create_for_full_read(ft);
975 toku_cachetable_pf_pinned_pair(
976 node,
977 toku_ftnode_pf_callback,
978 &bfe,
979 ft->cf,
980 node->blocknum,
981 toku_cachetable_hash(ft->cf, node->blocknum)
982 );
983 }
984}
985
986static void
987flush_this_child(
988 FT ft,
989 FTNODE node,
990 FTNODE child,
991 int childnum,
992 struct flusher_advice *fa)
993// Effect: Push everything in the CHILDNUMth buffer of node down into the child.
994{
995 update_flush_status(child, 0);
996 toku_ftnode_assert_fully_in_memory(node);
997 if (fa->should_destroy_basement_nodes(fa)) {
998 maybe_destroy_child_blbs(node, child, ft);
999 }
1000 bring_node_fully_into_memory(child, ft);
1001 toku_ftnode_assert_fully_in_memory(child);
1002 paranoid_invariant(node->height>0);
1003 paranoid_invariant(child->blocknum.b!=0);
1004 // VERIFY_NODE does not work off client thread as of now
1005 //VERIFY_NODE(t, child);
1006 node->dirty = 1;
1007 child->dirty = 1;
1008
1009 BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
1010 NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1011 set_BNC(node, childnum, toku_create_empty_nl());
1012
1013 // now we have a bnc to flush to the child. pass down the parent's
1014 // oldest known referenced xid as we flush down to the child.
1015 toku_bnc_flush_to_child(ft, bnc, child, node->oldest_referenced_xid_known);
1016 destroy_nonleaf_childinfo(bnc);
1017}
1018
1019static void
1020merge_leaf_nodes(FTNODE a, FTNODE b)
1021{
1022 FL_STATUS_VAL(FT_FLUSHER_MERGE_LEAF)++;
1023 toku_ftnode_assert_fully_in_memory(a);
1024 toku_ftnode_assert_fully_in_memory(b);
1025 paranoid_invariant(a->height == 0);
1026 paranoid_invariant(b->height == 0);
1027 paranoid_invariant(a->n_children > 0);
1028 paranoid_invariant(b->n_children > 0);
1029
1030 // Mark nodes as dirty before moving basements from b to a.
1031 // This way, whatever deltas are accumulated in the basements are
1032 // applied to the in_memory_stats in the header if they have not already
1033 // been (if nodes are clean).
1034 // TODO(leif): this is no longer the way in_memory_stats is
1035 // maintained. verify that it's ok to move this just before the unpin
1036 // and then do that.
1037 a->dirty = 1;
1038 b->dirty = 1;
1039
1040 bn_data* a_last_bd = BLB_DATA(a, a->n_children-1);
1041 // this bool states if the last basement node in a has any items or not
1042 // If it does, then it stays in the merge. If it does not, the last basement node
1043 // of a gets eliminated because we do not have a pivot to store for it (because it has no elements)
1044 const bool a_has_tail = a_last_bd->num_klpairs() > 0;
1045
1046 int num_children = a->n_children + b->n_children;
1047 if (!a_has_tail) {
1048 int lastchild = a->n_children - 1;
1049 BASEMENTNODE bn = BLB(a, lastchild);
1050
1051 // verify that last basement in a is empty, then destroy mempool
1052 size_t used_space = a_last_bd->get_disk_size();
1053 invariant_zero(used_space);
1054 destroy_basement_node(bn);
1055 set_BNULL(a, lastchild);
1056 num_children--;
1057 if (lastchild < a->pivotkeys.num_pivots()) {
1058 a->pivotkeys.delete_at(lastchild);
1059 }
1060 } else {
1061 // fill in pivot for what used to be max of node 'a', if it is needed
1062 uint32_t keylen;
1063 void *key;
1064 int r = a_last_bd->fetch_key_and_len(a_last_bd->num_klpairs() - 1, &keylen, &key);
1065 invariant_zero(r);
1066 DBT pivotkey;
1067 toku_fill_dbt(&pivotkey, key, keylen);
1068 a->pivotkeys.replace_at(&pivotkey, a->n_children - 1);
1069 }
1070
1071 // realloc basement nodes in `a'
1072 REALLOC_N(num_children, a->bp);
1073
1074 // move each basement node from b to a
1075 uint32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
1076 for (int i = 0; i < b->n_children; i++) {
1077 a->bp[i + offset] = b->bp[i];
1078 memset(&b->bp[i], 0, sizeof(b->bp[0]));
1079 }
1080
1081 // append b's pivots to a's pivots
1082 a->pivotkeys.append(b->pivotkeys);
1083
1084 // now that all the data has been moved from b to a, we can destroy the data in b
1085 a->n_children = num_children;
1086 b->pivotkeys.destroy();
1087 b->n_children = 0;
1088}
1089
1090static void balance_leaf_nodes(
1091 FTNODE a,
1092 FTNODE b,
1093 DBT *splitk)
1094// Effect:
1095// If b is bigger then move stuff from b to a until b is the smaller.
1096// If a is bigger then move stuff from a to b until a is the smaller.
1097{
1098 FL_STATUS_VAL(FT_FLUSHER_BALANCE_LEAF)++;
1099 // first merge all the data into a
1100 merge_leaf_nodes(a,b);
1101 // now split them
1102 // because we are not creating a new node, we can pass in no dependent nodes
1103 ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
1104}
1105
1106static void
1107maybe_merge_pinned_leaf_nodes(
1108 FTNODE a,
1109 FTNODE b,
1110 const DBT *parent_splitk,
1111 bool *did_merge,
1112 bool *did_rebalance,
1113 DBT *splitk,
1114 uint32_t nodesize
1115 )
1116// Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = true.
1117// (We do this if the resulting node is not fissible)
1118// or distribute the leafentries evenly between a and b, and set *did_rebalance = true.
1119// (If a and be are already evenly distributed, we may do nothing.)
1120{
1121 unsigned int sizea = toku_serialize_ftnode_size(a);
1122 unsigned int sizeb = toku_serialize_ftnode_size(b);
1123 uint32_t num_leafentries = toku_ftnode_leaf_num_entries(a) + toku_ftnode_leaf_num_entries(b);
1124 if (num_leafentries > 1 && (sizea + sizeb)*4 > (nodesize*3)) {
1125 // the combined size is more than 3/4 of a node, so don't merge them.
1126 *did_merge = false;
1127 if (sizea*4 > nodesize && sizeb*4 > nodesize) {
1128 // no need to do anything if both are more than 1/4 of a node.
1129 *did_rebalance = false;
1130 toku_clone_dbt(splitk, *parent_splitk);
1131 return;
1132 }
1133 // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
1134 *did_rebalance = true;
1135 balance_leaf_nodes(a, b, splitk);
1136 } else {
1137 // we are merging them.
1138 *did_merge = true;
1139 *did_rebalance = false;
1140 toku_init_dbt(splitk);
1141 merge_leaf_nodes(a, b);
1142 }
1143}
1144
1145static void
1146maybe_merge_pinned_nonleaf_nodes(
1147 const DBT *parent_splitk,
1148 FTNODE a,
1149 FTNODE b,
1150 bool *did_merge,
1151 bool *did_rebalance,
1152 DBT *splitk)
1153{
1154 toku_ftnode_assert_fully_in_memory(a);
1155 toku_ftnode_assert_fully_in_memory(b);
1156 invariant_notnull(parent_splitk->data);
1157
1158 int old_n_children = a->n_children;
1159 int new_n_children = old_n_children + b->n_children;
1160
1161 XREALLOC_N(new_n_children, a->bp);
1162 memcpy(a->bp + old_n_children, b->bp, b->n_children * sizeof(b->bp[0]));
1163 memset(b->bp, 0, b->n_children * sizeof(b->bp[0]));
1164
1165 a->pivotkeys.insert_at(parent_splitk, old_n_children - 1);
1166 a->pivotkeys.append(b->pivotkeys);
1167 a->n_children = new_n_children;
1168 b->n_children = 0;
1169
1170 a->dirty = 1;
1171 b->dirty = 1;
1172
1173 *did_merge = true;
1174 *did_rebalance = false;
1175 toku_init_dbt(splitk);
1176
1177 FL_STATUS_VAL(FT_FLUSHER_MERGE_NONLEAF)++;
1178}
1179
1180static void
1181maybe_merge_pinned_nodes(
1182 FTNODE parent,
1183 const DBT *parent_splitk,
1184 FTNODE a,
1185 FTNODE b,
1186 bool *did_merge,
1187 bool *did_rebalance,
1188 DBT *splitk,
1189 uint32_t nodesize
1190 )
1191// Effect: either merge a and b into one node (merge them into a) and set *did_merge = true.
1192// (We do this if the resulting node is not fissible)
1193// or distribute a and b evenly and set *did_merge = false and *did_rebalance = true
1194// (If a and be are already evenly distributed, we may do nothing.)
1195// If we distribute:
1196// For leaf nodes, we distribute the leafentries evenly.
1197// For nonleaf nodes, we distribute the children evenly. That may leave one or both of the nodes overfull, but that's OK.
1198// If we distribute, we set *splitk to a malloced pivot key.
1199// Parameters:
1200// t The FT.
1201// parent The parent of the two nodes to be split.
1202// parent_splitk The pivot key between a and b. This is either free()'d or returned in *splitk.
1203// a The first node to merge.
1204// b The second node to merge.
1205// logger The logger.
1206// did_merge (OUT): Did the two nodes actually get merged?
1207// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
1208{
1209 MSN msn_max;
1210 paranoid_invariant(a->height == b->height);
1211 toku_ftnode_assert_fully_in_memory(parent);
1212 toku_ftnode_assert_fully_in_memory(a);
1213 toku_ftnode_assert_fully_in_memory(b);
1214 parent->dirty = 1; // just to make sure
1215 {
1216 MSN msna = a->max_msn_applied_to_node_on_disk;
1217 MSN msnb = b->max_msn_applied_to_node_on_disk;
1218 msn_max = (msna.msn > msnb.msn) ? msna : msnb;
1219 }
1220 if (a->height == 0) {
1221 maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
1222 } else {
1223 maybe_merge_pinned_nonleaf_nodes(parent_splitk, a, b, did_merge, did_rebalance, splitk);
1224 }
1225 if (*did_merge || *did_rebalance) {
1226 // accurate for leaf nodes because all msgs above have been
1227 // applied, accurate for non-leaf nodes because buffer immediately
1228 // above each node has been flushed
1229 a->max_msn_applied_to_node_on_disk = msn_max;
1230 b->max_msn_applied_to_node_on_disk = msn_max;
1231 }
1232}
1233
1234static void merge_remove_key_callback(BLOCKNUM *bp, bool for_checkpoint, void *extra) {
1235 FT ft = (FT) extra;
1236 ft->blocktable.free_blocknum(bp, ft, for_checkpoint);
1237}
1238
1239//
1240// Takes as input a locked node and a childnum_to_merge
1241// As output, two of node's children are merged or rebalanced, and node is unlocked
1242//
1243static void
1244ft_merge_child(
1245 FT ft,
1246 FTNODE node,
1247 int childnum_to_merge,
1248 bool *did_react,
1249 struct flusher_advice *fa)
1250{
1251 // this function should not be called
1252 // if the child is not mergable
1253 paranoid_invariant(node->n_children > 1);
1254 toku_ftnode_assert_fully_in_memory(node);
1255
1256 int childnuma,childnumb;
1257 if (childnum_to_merge > 0) {
1258 childnuma = childnum_to_merge-1;
1259 childnumb = childnum_to_merge;
1260 } else {
1261 childnuma = childnum_to_merge;
1262 childnumb = childnum_to_merge+1;
1263 }
1264 paranoid_invariant(0 <= childnuma);
1265 paranoid_invariant(childnuma+1 == childnumb);
1266 paranoid_invariant(childnumb < node->n_children);
1267
1268 paranoid_invariant(node->height>0);
1269
1270 // We suspect that at least one of the children is fusible, but they might not be.
1271 // for test
1272 call_flusher_thread_callback(flt_flush_before_merge);
1273
1274 FTNODE childa, childb;
1275 {
1276 uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnuma);
1277 ftnode_fetch_extra bfe;
1278 bfe.create_for_full_read(ft);
1279 toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa, true);
1280 }
1281 // for test
1282 call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
1283 {
1284 FTNODE dep_nodes[2];
1285 dep_nodes[0] = node;
1286 dep_nodes[1] = childa;
1287 uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnumb);
1288 ftnode_fetch_extra bfe;
1289 bfe.create_for_full_read(ft);
1290 toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb, true);
1291 }
1292
1293 if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
1294 flush_this_child(ft, node, childa, childnuma, fa);
1295 }
1296 if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
1297 flush_this_child(ft, node, childb, childnumb, fa);
1298 }
1299
1300 // now we have both children pinned in main memory, and cachetable locked,
1301 // so no checkpoints will occur.
1302
1303 bool did_merge, did_rebalance;
1304 {
1305 DBT splitk;
1306 toku_init_dbt(&splitk);
1307 const DBT old_split_key = node->pivotkeys.get_pivot(childnuma);
1308 maybe_merge_pinned_nodes(node, &old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
1309 //toku_verify_estimates(t,childa);
1310 // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
1311 *did_react = (bool)(did_merge || did_rebalance);
1312
1313 if (did_merge) {
1314 invariant_null(splitk.data);
1315 NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
1316 NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
1317 for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
1318 remaining_bnc->flow[i] += merged_bnc->flow[i];
1319 }
1320 destroy_nonleaf_childinfo(merged_bnc);
1321 set_BNULL(node, childnumb);
1322 node->n_children--;
1323 memmove(&node->bp[childnumb],
1324 &node->bp[childnumb+1],
1325 (node->n_children-childnumb)*sizeof(node->bp[0]));
1326 REALLOC_N(node->n_children, node->bp);
1327 node->pivotkeys.delete_at(childnuma);
1328
1329 // Handle a merge of the rightmost leaf node.
1330 BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
1331 if (did_merge && childb->blocknum.b == rightmost_blocknum.b) {
1332 invariant(childb->blocknum.b != ft->h->root_blocknum.b);
1333 toku_ftnode_swap_pair_values(childa, childb);
1334 BP_BLOCKNUM(node, childnuma) = childa->blocknum;
1335 }
1336
1337 paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->blocknum.b);
1338 childa->dirty = 1; // just to make sure
1339 childb->dirty = 1; // just to make sure
1340 } else {
1341 // flow will be inaccurate for a while, oh well. the children
1342 // are leaves in this case so it's not a huge deal (we're
1343 // pretty far down the tree)
1344
1345 // If we didn't merge the nodes, then we need the correct pivot.
1346 invariant_notnull(splitk.data);
1347 node->pivotkeys.replace_at(&splitk, childnuma);
1348 node->dirty = 1;
1349 }
1350 toku_destroy_dbt(&splitk);
1351 }
1352 //
1353 // now we possibly flush the children
1354 //
1355 if (did_merge) {
1356 // for test
1357 call_flusher_thread_callback(flt_flush_before_unpin_remove);
1358
1359 // merge_remove_key_callback will free the blocknum
1360 int rrb = toku_cachetable_unpin_and_remove(
1361 ft->cf,
1362 childb->ct_pair,
1363 merge_remove_key_callback,
1364 ft
1365 );
1366 assert_zero(rrb);
1367
1368 // for test
1369 call_flusher_thread_callback(ft_flush_aflter_merge);
1370
1371 // unlock the parent
1372 paranoid_invariant(node->dirty);
1373 toku_unpin_ftnode(ft, node);
1374 }
1375 else {
1376 // for test
1377 call_flusher_thread_callback(ft_flush_aflter_rebalance);
1378
1379 // unlock the parent
1380 paranoid_invariant(node->dirty);
1381 toku_unpin_ftnode(ft, node);
1382 toku_unpin_ftnode(ft, childb);
1383 }
1384 if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
1385 toku_ft_flush_some_child(ft, childa, fa);
1386 }
1387 else {
1388 toku_unpin_ftnode(ft, childa);
1389 }
1390}
1391
1392void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
1393// Effect: This function does the following:
1394// - Pick a child of parent (the heaviest child),
1395// - flush from parent to child,
1396// - possibly split/merge child.
1397// - if child is gorged, recursively proceed with child
1398// Note that parent is already locked
1399// Upon exit of this function, parent is unlocked and no new
1400// new nodes (such as a child) remain locked
1401{
1402 int dirtied = 0;
1403 NONLEAF_CHILDINFO bnc = NULL;
1404 paranoid_invariant(parent->height>0);
1405 toku_ftnode_assert_fully_in_memory(parent);
1406 TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1407
1408 // pick the child we want to flush to
1409 int childnum = fa->pick_child(ft, parent, fa->extra);
1410
1411 // for test
1412 call_flusher_thread_callback(flt_flush_before_child_pin);
1413
1414 // get the child into memory
1415 BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
1416 ft->blocktable.verify_blocknum_allocated(targetchild);
1417 uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1418 FTNODE child;
1419 ftnode_fetch_extra bfe;
1420 // Note that we don't read the entire node into memory yet.
1421 // The idea is let's try to do the minimum work before releasing the parent lock
1422 bfe.create_for_min_read(ft);
1423 toku_pin_ftnode_with_dep_nodes(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child, true);
1424
1425 // for test
1426 call_flusher_thread_callback(ft_flush_aflter_child_pin);
1427
1428 if (fa->should_destroy_basement_nodes(fa)) {
1429 maybe_destroy_child_blbs(parent, child, ft);
1430 }
1431
1432 //Note that at this point, we don't have the entire child in.
1433 // Let's do a quick check to see if the child may be reactive
1434 // If the child cannot be reactive, then we can safely unlock
1435 // the parent before finishing reading in the entire child node.
1436 bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1437
1438 paranoid_invariant(child->blocknum.b!=0);
1439
1440 // only do the following work if there is a flush to perform
1441 if (toku_bnc_n_entries(BNC(parent, childnum)) > 0 || parent->height == 1) {
1442 if (!parent->dirty) {
1443 dirtied++;
1444 parent->dirty = 1;
1445 }
1446 // detach buffer
1447 BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
1448 bnc = BNC(parent, childnum);
1449 NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1450 memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1451 set_BNC(parent, childnum, new_bnc);
1452 }
1453
1454 //
1455 // at this point, the buffer has been detached from the parent
1456 // and a new empty buffer has been placed in its stead
1457 // so, if we are absolutely sure that the child is not
1458 // reactive, we can unpin the parent
1459 //
1460 if (!may_child_be_reactive) {
1461 toku_unpin_ftnode(ft, parent);
1462 parent = NULL;
1463 }
1464
1465 //
1466 // now, if necessary, read/decompress the rest of child into memory,
1467 // so that we can proceed and apply the flush
1468 //
1469 bring_node_fully_into_memory(child, ft);
1470
1471 // It is possible after reading in the entire child,
1472 // that we now know that the child is not reactive
1473 // if so, we can unpin parent right now
1474 // we won't be splitting/merging child
1475 // and we have already replaced the bnc
1476 // for the root with a fresh one
1477 enum reactivity child_re = toku_ftnode_get_reactivity(ft, child);
1478 if (parent && child_re == RE_STABLE) {
1479 toku_unpin_ftnode(ft, parent);
1480 parent = NULL;
1481 }
1482
1483 // from above, we know at this point that either the bnc
1484 // is detached from the parent (which may be unpinned),
1485 // and we have to apply the flush, or there was no data
1486 // in the buffer to flush, and as a result, flushing is not necessary
1487 // and bnc is NULL
1488 if (bnc != NULL) {
1489 if (!child->dirty) {
1490 dirtied++;
1491 child->dirty = 1;
1492 }
1493 // do the actual flush
1494 toku_bnc_flush_to_child(
1495 ft,
1496 bnc,
1497 child,
1498 parent_oldest_referenced_xid_known
1499 );
1500 destroy_nonleaf_childinfo(bnc);
1501 }
1502
1503 fa->update_status(child, dirtied, fa->extra);
1504 // let's get the reactivity of the child again,
1505 // it is possible that the flush got rid of some values
1506 // and now the parent is no longer reactive
1507 child_re = toku_ftnode_get_reactivity(ft, child);
1508 // if the parent has been unpinned above, then
1509 // this is our only option, even if the child is not stable
1510 // if the child is not stable, we'll handle it the next
1511 // time we need to flush to the child
1512 if (!parent ||
1513 child_re == RE_STABLE ||
1514 (child_re == RE_FUSIBLE && parent->n_children == 1)
1515 )
1516 {
1517 if (parent) {
1518 toku_unpin_ftnode(ft, parent);
1519 parent = NULL;
1520 }
1521 //
1522 // it is the responsibility of toku_ft_flush_some_child to unpin child
1523 //
1524 if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
1525 toku_ft_flush_some_child(ft, child, fa);
1526 }
1527 else {
1528 toku_unpin_ftnode(ft, child);
1529 }
1530 }
1531 else if (child_re == RE_FISSIBLE) {
1532 //
1533 // it is responsibility of `ft_split_child` to unlock nodes of
1534 // parent and child as it sees fit
1535 //
1536 paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1537 ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
1538 }
1539 else if (child_re == RE_FUSIBLE) {
1540 //
1541 // it is responsibility of `maybe_merge_child to unlock nodes of
1542 // parent and child as it sees fit
1543 //
1544 paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1545 fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
1546 }
1547 else {
1548 abort();
1549 }
1550}
1551
1552void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID parent_oldest_referenced_xid_known) {
1553 paranoid_invariant(bnc);
1554
1555 TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1556 TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
1557 TXNID oldest_referenced_xid_for_simple_gc = TXNID_NONE;
1558
1559 txn_manager_state txn_state_for_gc(txn_manager);
1560 bool do_garbage_collection = child->height == 0 && txn_manager != nullptr;
1561 if (do_garbage_collection) {
1562 txn_state_for_gc.init();
1563 oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1564 }
1565 txn_gc_info gc_info(&txn_state_for_gc,
1566 oldest_referenced_xid_for_simple_gc,
1567 child->oldest_referenced_xid_known,
1568 true);
1569 struct flush_msg_fn {
1570 FT ft;
1571 FTNODE child;
1572 NONLEAF_CHILDINFO bnc;
1573 txn_gc_info *gc_info;
1574
1575 STAT64INFO_S stats_delta;
1576 int64_t logical_rows_delta = 0;
1577 size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
1578
1579 flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
1580 ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
1581 stats_delta = { 0, 0 };
1582 }
1583 int operator()(const ft_msg &msg, bool is_fresh) {
1584 size_t flow_deltas[] = { 0, 0 };
1585 size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
1586 if (remaining_memsize <= bnc->flow[0]) {
1587 // this message is in the current checkpoint's worth of
1588 // the end of the message buffer
1589 flow_deltas[0] = memsize_in_buffer;
1590 } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
1591 // this message is in the last checkpoint's worth of the
1592 // end of the message buffer
1593 flow_deltas[1] = memsize_in_buffer;
1594 }
1595 toku_ftnode_put_msg(
1596 ft->cmp,
1597 ft->update_fun,
1598 child,
1599 -1,
1600 msg,
1601 is_fresh,
1602 gc_info,
1603 flow_deltas,
1604 &stats_delta,
1605 &logical_rows_delta);
1606 remaining_memsize -= memsize_in_buffer;
1607 return 0;
1608 }
1609 } flush_fn(ft, child, bnc, &gc_info);
1610 bnc->msg_buffer.iterate(flush_fn);
1611
1612 child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1613
1614 invariant(flush_fn.remaining_memsize == 0);
1615 if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
1616 toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
1617 }
1618 toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
1619 if (do_garbage_collection) {
1620 size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
1621 // may be misleading if there's a broadcast message in there
1622 toku_ft_status_note_msg_bytes_out(buffsize);
1623 }
1624}
1625
1626static void
1627update_cleaner_status(
1628 FTNODE node,
1629 int childnum)
1630{
1631 FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_NODES)++;
1632 if (node->height == 1) {
1633 FL_STATUS_VAL(FT_FLUSHER_CLEANER_H1_NODES)++;
1634 } else {
1635 FL_STATUS_VAL(FT_FLUSHER_CLEANER_HGT1_NODES)++;
1636 }
1637
1638 unsigned int nbytesinbuf = toku_bnc_nbytesinbuf(BNC(node, childnum));
1639 if (nbytesinbuf == 0) {
1640 FL_STATUS_VAL(FT_FLUSHER_CLEANER_EMPTY_NODES)++;
1641 } else {
1642 if (nbytesinbuf > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE)) {
1643 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE) = nbytesinbuf;
1644 }
1645 if (nbytesinbuf < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE)) {
1646 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = nbytesinbuf;
1647 }
1648 FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE) += nbytesinbuf;
1649
1650 uint64_t workdone = BP_WORKDONE(node, childnum);
1651 if (workdone > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE)) {
1652 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE) = workdone;
1653 }
1654 if (workdone < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE)) {
1655 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = workdone;
1656 }
1657 FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE) += workdone;
1658 }
1659}
1660
1661static void
1662dummy_update_status(
1663 FTNODE UU(child),
1664 int UU(dirtied),
1665 void* UU(extra)
1666 )
1667{
1668}
1669
1670static int
1671dummy_pick_heaviest_child(FT UU(h),
1672 FTNODE UU(parent),
1673 void* UU(extra))
1674{
1675 abort();
1676 return -1;
1677}
1678
1679void toku_ft_split_child(
1680 FT ft,
1681 FTNODE node,
1682 int childnum,
1683 FTNODE child,
1684 enum split_mode split_mode
1685 )
1686{
1687 struct flusher_advice fa;
1688 flusher_advice_init(
1689 &fa,
1690 dummy_pick_heaviest_child,
1691 dont_destroy_basement_nodes,
1692 never_recursively_flush,
1693 default_merge_child,
1694 dummy_update_status,
1695 default_pick_child_after_split,
1696 NULL
1697 );
1698 ft_split_child(
1699 ft,
1700 node,
1701 childnum, // childnum to split
1702 child,
1703 split_mode,
1704 &fa
1705 );
1706}
1707
1708void toku_ft_merge_child(
1709 FT ft,
1710 FTNODE node,
1711 int childnum
1712 )
1713{
1714 struct flusher_advice fa;
1715 flusher_advice_init(
1716 &fa,
1717 dummy_pick_heaviest_child,
1718 dont_destroy_basement_nodes,
1719 never_recursively_flush,
1720 default_merge_child,
1721 dummy_update_status,
1722 default_pick_child_after_split,
1723 NULL
1724 );
1725 bool did_react;
1726 ft_merge_child(
1727 ft,
1728 node,
1729 childnum, // childnum to merge
1730 &did_react,
1731 &fa
1732 );
1733}
1734
1735int
1736toku_ftnode_cleaner_callback(
1737 void *ftnode_pv,
1738 BLOCKNUM blocknum,
1739 uint32_t fullhash,
1740 void *extraargs)
1741{
1742 FTNODE node = (FTNODE) ftnode_pv;
1743 invariant(node->blocknum.b == blocknum.b);
1744 invariant(node->fullhash == fullhash);
1745 invariant(node->height > 0); // we should never pick a leaf node (for now at least)
1746 FT ft = (FT) extraargs;
1747 bring_node_fully_into_memory(node, ft);
1748 int childnum = find_heaviest_child(node);
1749 update_cleaner_status(node, childnum);
1750
1751 // Either toku_ft_flush_some_child will unlock the node, or we do it here.
1752 if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
1753 struct flusher_advice fa;
1754 struct flush_status_update_extra fste;
1755 ct_flusher_advice_init(&fa, &fste, ft->h->nodesize);
1756 toku_ft_flush_some_child(ft, node, &fa);
1757 } else {
1758 toku_unpin_ftnode(ft, node);
1759 }
1760 return 0;
1761}
1762
1763struct flusher_extra {
1764 FT ft;
1765 FTNODE node;
1766 NONLEAF_CHILDINFO bnc;
1767 TXNID parent_oldest_referenced_xid_known;
1768};
1769
1770//
1771// This is the function that gets called by a
1772// background thread. Its purpose is to complete
1773// a flush, and possibly do a split/merge.
1774//
1775static void flush_node_fun(void *fe_v)
1776{
1777 toku::context flush_ctx(CTX_FLUSH);
1778 struct flusher_extra* fe = (struct flusher_extra *) fe_v;
1779 // The node that has been placed on the background
1780 // thread may not be fully in memory. Some message
1781 // buffers may be compressed. Before performing
1782 // any operations, we must first make sure
1783 // the node is fully in memory
1784 //
1785 // If we have a bnc, that means fe->node is a child, and we've already
1786 // destroyed its basement nodes if necessary, so we now need to either
1787 // read them back in, or just do the regular partial fetch. If we
1788 // don't, that means fe->node is a parent, so we need to do this anyway.
1789 bring_node_fully_into_memory(fe->node,fe->ft);
1790 fe->node->dirty = 1;
1791
1792 struct flusher_advice fa;
1793 struct flush_status_update_extra fste;
1794 flt_flusher_advice_init(&fa, &fste, fe->ft->h->nodesize);
1795
1796 if (fe->bnc) {
1797 // In this case, we have a bnc to flush to a node
1798
1799 // for test purposes
1800 call_flusher_thread_callback(flt_flush_before_applying_inbox);
1801
1802 toku_bnc_flush_to_child(
1803 fe->ft,
1804 fe->bnc,
1805 fe->node,
1806 fe->parent_oldest_referenced_xid_known
1807 );
1808 destroy_nonleaf_childinfo(fe->bnc);
1809
1810 // after the flush has completed, now check to see if the node needs flushing
1811 // If so, call toku_ft_flush_some_child on the node (because this flush intends to
1812 // pass a meaningful oldest referenced xid for simple garbage collection), and it is the
1813 // responsibility of the flush to unlock the node. otherwise, we unlock it here.
1814 if (fe->node->height > 0 && toku_ftnode_nonleaf_is_gorged(fe->node, fe->ft->h->nodesize)) {
1815 toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1816 }
1817 else {
1818 toku_unpin_ftnode(fe->ft,fe->node);
1819 }
1820 }
1821 else {
1822 // In this case, we were just passed a node with no
1823 // bnc, which means we are tasked with flushing some
1824 // buffer in the node.
1825 // It is the responsibility of flush some child to unlock the node
1826 toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1827 }
1828 remove_background_job_from_cf(fe->ft->cf);
1829 toku_free(fe);
1830}
1831
1832static void
1833place_node_and_bnc_on_background_thread(
1834 FT ft,
1835 FTNODE node,
1836 NONLEAF_CHILDINFO bnc,
1837 TXNID parent_oldest_referenced_xid_known)
1838{
1839 struct flusher_extra *XMALLOC(fe);
1840 fe->ft = ft;
1841 fe->node = node;
1842 fe->bnc = bnc;
1843 fe->parent_oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1844 cachefile_kibbutz_enq(ft->cf, flush_node_fun, fe);
1845}
1846
1847//
1848// This takes as input a gorged, locked, non-leaf node named parent
1849// and sets up a flush to be done in the background.
1850// The flush is setup like this:
1851// - We call maybe_get_and_pin_clean on the child we want to flush to in order to try to lock the child
1852// - if we successfully pin the child, and the child does not need to be split or merged
1853// then we detach the buffer, place the child and buffer onto a background thread, and
1854// have the flush complete in the background, and unlock the parent. The child will be
1855// unlocked on the background thread
1856// - if any of the above does not happen (child cannot be locked,
1857// child needs to be split/merged), then we place the parent on the background thread.
1858// The parent will be unlocked on the background thread
1859//
1860void toku_ft_flush_node_on_background_thread(FT ft, FTNODE parent)
1861{
1862 toku::context flush_ctx(CTX_FLUSH);
1863 TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1864 //
1865 // first let's see if we can detach buffer on client thread
1866 // and pick the child we want to flush to
1867 //
1868 int childnum = find_heaviest_child(parent);
1869 paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
1870 //
1871 // see if we can pin the child
1872 //
1873 FTNODE child;
1874 uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1875 int r = toku_maybe_pin_ftnode_clean(ft, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
1876 if (r != 0) {
1877 // In this case, we could not lock the child, so just place the parent on the background thread
1878 // In the callback, we will use toku_ft_flush_some_child, which checks to
1879 // see if we should blow away the old basement nodes.
1880 place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1881 }
1882 else {
1883 //
1884 // successfully locked child
1885 //
1886 bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1887 if (!may_child_be_reactive) {
1888 // We're going to unpin the parent, so before we do, we must
1889 // check to see if we need to blow away the basement nodes to
1890 // keep the MSN invariants intact.
1891 maybe_destroy_child_blbs(parent, child, ft);
1892
1893 //
1894 // can detach buffer and unpin root here
1895 //
1896 parent->dirty = 1;
1897 BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
1898 NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
1899 NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1900 memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1901 set_BNC(parent, childnum, new_bnc);
1902
1903 //
1904 // at this point, the buffer has been detached from the parent
1905 // and a new empty buffer has been placed in its stead
1906 // so, because we know for sure the child is not
1907 // reactive, we can unpin the parent
1908 //
1909 place_node_and_bnc_on_background_thread(ft, child, bnc, parent_oldest_referenced_xid_known);
1910 toku_unpin_ftnode(ft, parent);
1911 }
1912 else {
1913 // because the child may be reactive, we need to
1914 // put parent on background thread.
1915 // As a result, we unlock the child here.
1916 toku_unpin_ftnode(ft, child);
1917 // Again, we'll have the parent on the background thread, so
1918 // we don't need to destroy the basement nodes yet.
1919 place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1920 }
1921 }
1922}
1923
1924#include <toku_race_tools.h>
1925void __attribute__((__constructor__)) toku_ft_flusher_helgrind_ignore(void);
1926void
1927toku_ft_flusher_helgrind_ignore(void) {
1928 TOKU_VALGRIND_HG_DISABLE_CHECKING(&fl_status, sizeof fl_status);
1929}
1930