| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | /*====== |
| 4 | This file is part of PerconaFT. |
| 5 | |
| 6 | |
| 7 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 8 | |
| 9 | PerconaFT is free software: you can redistribute it and/or modify |
| 10 | it under the terms of the GNU General Public License, version 2, |
| 11 | as published by the Free Software Foundation. |
| 12 | |
| 13 | PerconaFT is distributed in the hope that it will be useful, |
| 14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 16 | GNU General Public License for more details. |
| 17 | |
| 18 | You should have received a copy of the GNU General Public License |
| 19 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 20 | |
| 21 | ---------------------------------------- |
| 22 | |
| 23 | PerconaFT is free software: you can redistribute it and/or modify |
| 24 | it under the terms of the GNU Affero General Public License, version 3, |
| 25 | as published by the Free Software Foundation. |
| 26 | |
| 27 | PerconaFT is distributed in the hope that it will be useful, |
| 28 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 29 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 30 | GNU Affero General Public License for more details. |
| 31 | |
| 32 | You should have received a copy of the GNU Affero General Public License |
| 33 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 34 | ======= */ |
| 35 | |
| 36 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 37 | |
| 38 | #include <my_global.h> |
| 39 | #include "ft/ft.h" |
| 40 | #include "ft/ft-internal.h" |
| 41 | #include "ft/serialize/ft_node-serialize.h" |
| 42 | #include "ft/node.h" |
| 43 | #include "ft/serialize/rbuf.h" |
| 44 | #include "ft/serialize/wbuf.h" |
| 45 | #include "util/scoped_malloc.h" |
| 46 | #include "util/sort.h" |
| 47 | |
| 48 | // Effect: Fill in N as an empty ftnode. |
| 49 | // TODO: Rename toku_ftnode_create |
| 50 | void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) { |
| 51 | paranoid_invariant(layout_version != 0); |
| 52 | paranoid_invariant(height >= 0); |
| 53 | |
| 54 | n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others |
| 55 | n->flags = flags; |
| 56 | n->blocknum = blocknum; |
| 57 | n->layout_version = layout_version; |
| 58 | n->layout_version_original = layout_version; |
| 59 | n->layout_version_read_from_disk = layout_version; |
| 60 | n->height = height; |
| 61 | n->pivotkeys.create_empty(); |
| 62 | n->bp = 0; |
| 63 | n->n_children = num_children; |
| 64 | n->oldest_referenced_xid_known = TXNID_NONE; |
| 65 | |
| 66 | if (num_children > 0) { |
| 67 | XMALLOC_N(num_children, n->bp); |
| 68 | for (int i = 0; i < num_children; i++) { |
| 69 | BP_BLOCKNUM(n,i).b=0; |
| 70 | BP_STATE(n,i) = PT_INVALID; |
| 71 | BP_WORKDONE(n,i) = 0; |
| 72 | BP_INIT_TOUCHED_CLOCK(n, i); |
| 73 | set_BNULL(n,i); |
| 74 | if (height > 0) { |
| 75 | set_BNC(n, i, toku_create_empty_nl()); |
| 76 | } else { |
| 77 | set_BLB(n, i, toku_create_empty_bn()); |
| 78 | } |
| 79 | } |
| 80 | } |
| 81 | n->dirty = 1; // special case exception, it's okay to mark as dirty because the basements are empty |
| 82 | |
| 83 | toku_ft_status_note_ftnode(height, true); |
| 84 | } |
| 85 | |
| 86 | // destroys the internals of the ftnode, but it does not free the values |
| 87 | // that are stored |
| 88 | // this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf |
| 89 | // MUST NOT do anything besides free the structures that have been allocated |
| 90 | void toku_destroy_ftnode_internals(FTNODE node) { |
| 91 | node->pivotkeys.destroy(); |
| 92 | for (int i = 0; i < node->n_children; i++) { |
| 93 | if (BP_STATE(node,i) == PT_AVAIL) { |
| 94 | if (node->height > 0) { |
| 95 | destroy_nonleaf_childinfo(BNC(node,i)); |
| 96 | } else { |
| 97 | paranoid_invariant(BLB_LRD(node, i) == 0); |
| 98 | destroy_basement_node(BLB(node, i)); |
| 99 | } |
| 100 | } else if (BP_STATE(node,i) == PT_COMPRESSED) { |
| 101 | SUB_BLOCK sb = BSB(node,i); |
| 102 | toku_free(sb->compressed_ptr); |
| 103 | toku_free(sb); |
| 104 | } else { |
| 105 | paranoid_invariant(is_BNULL(node, i)); |
| 106 | } |
| 107 | set_BNULL(node, i); |
| 108 | } |
| 109 | toku_free(node->bp); |
| 110 | node->bp = NULL; |
| 111 | } |
| 112 | |
| 113 | /* Frees a node, including all the stuff in the hash table. */ |
| 114 | void toku_ftnode_free(FTNODE *nodep) { |
| 115 | FTNODE node = *nodep; |
| 116 | toku_ft_status_note_ftnode(node->height, false); |
| 117 | toku_destroy_ftnode_internals(node); |
| 118 | toku_free(node); |
| 119 | *nodep = nullptr; |
| 120 | } |
| 121 | |
| 122 | void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) { |
| 123 | STAT64INFO_S deltas = ZEROSTATS; |
| 124 | // capture deltas before rebalancing basements for serialization |
| 125 | deltas = toku_get_and_clear_basement_stats(ftnode); |
| 126 | // locking not necessary here with respect to checkpointing |
| 127 | // in Clayface (because of the pending lock and cachetable lock |
| 128 | // in toku_cachetable_begin_checkpoint) |
| 129 | // essentially, if we are dealing with a for_checkpoint |
| 130 | // parameter in a function that is called by the flush_callback, |
| 131 | // then the cachetable needs to ensure that this is called in a safe |
| 132 | // manner that does not interfere with the beginning |
| 133 | // of a checkpoint, which it does with the cachetable lock |
| 134 | // and pending lock |
| 135 | toku_ft_update_stats(&ft->h->on_disk_stats, deltas); |
| 136 | if (for_checkpoint) { |
| 137 | toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas); |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) { |
| 142 | for (int i = 0; i < node->n_children; i++) { |
| 143 | BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i); |
| 144 | paranoid_invariant(BP_STATE(node,i) == PT_AVAIL); |
| 145 | BP_STATE(cloned_node,i) = PT_AVAIL; |
| 146 | BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i); |
| 147 | if (node->height == 0) { |
| 148 | set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i))); |
| 149 | } else { |
| 150 | set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i))); |
| 151 | } |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) { |
| 156 | // free the basement node |
| 157 | assert(!node->dirty); |
| 158 | BASEMENTNODE bn = BLB(node, childnum); |
| 159 | toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta); |
| 160 | destroy_basement_node(bn); |
| 161 | set_BNULL(node, childnum); |
| 162 | BP_STATE(node, childnum) = PT_ON_DISK; |
| 163 | } |
| 164 | |
| 165 | BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) { |
| 166 | assert(BP_STATE(node, childnum) == PT_AVAIL); |
| 167 | BASEMENTNODE bn = BLB(node, childnum); |
| 168 | set_BNULL(node, childnum); |
| 169 | BP_STATE(node, childnum) = PT_ON_DISK; |
| 170 | return bn; |
| 171 | } |
| 172 | |
| 173 | // |
| 174 | // Orthopush |
| 175 | // |
| 176 | |
| 177 | struct { |
| 178 | int32_t *; |
| 179 | int ; |
| 180 | }; |
| 181 | |
| 182 | int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const ) __attribute__((nonnull(3))); |
| 183 | int (const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const ) |
| 184 | { |
| 185 | extra->offsets[extra->i] = offset; |
| 186 | extra->i++; |
| 187 | return 0; |
| 188 | } |
| 189 | |
| 190 | /** |
| 191 | * Given pointers to offsets within a message buffer where we can find messages, |
| 192 | * figure out the MSN of each message, and compare those MSNs. Returns 1, |
| 193 | * 0, or -1 if a is larger than, equal to, or smaller than b. |
| 194 | */ |
| 195 | int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo); |
| 196 | int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo) |
| 197 | { |
| 198 | MSN amsn, bmsn; |
| 199 | msg_buffer.get_message_key_msn(ao, nullptr, &amsn); |
| 200 | msg_buffer.get_message_key_msn(bo, nullptr, &bmsn); |
| 201 | if (amsn.msn > bmsn.msn) { |
| 202 | return +1; |
| 203 | } |
| 204 | if (amsn.msn < bmsn.msn) { |
| 205 | return -1; |
| 206 | } |
| 207 | return 0; |
| 208 | } |
| 209 | |
| 210 | /** |
| 211 | * Given a message buffer and and offset, apply the message with |
| 212 | * toku_ft_bn_apply_msg, or discard it, |
| 213 | * based on its MSN and the MSN of the basement node. |
| 214 | */ |
| 215 | static void do_bn_apply_msg( |
| 216 | FT_HANDLE ft_handle, |
| 217 | BASEMENTNODE bn, |
| 218 | message_buffer* msg_buffer, |
| 219 | int32_t offset, |
| 220 | txn_gc_info* gc_info, |
| 221 | uint64_t* workdone, |
| 222 | STAT64INFO stats_to_update, |
| 223 | int64_t* logical_rows_delta) { |
| 224 | |
| 225 | DBT k, v; |
| 226 | ft_msg msg = msg_buffer->get_message(offset, &k, &v); |
| 227 | |
| 228 | // The messages are being iterated over in (key,msn) order or just in |
| 229 | // msn order, so all the messages for one key, from one buffer, are in |
| 230 | // ascending msn order. So it's ok that we don't update the basement |
| 231 | // node's msn until the end. |
| 232 | if (msg.msn().msn > bn->max_msn_applied.msn) { |
| 233 | toku_ft_bn_apply_msg( |
| 234 | ft_handle->ft->cmp, |
| 235 | ft_handle->ft->update_fun, |
| 236 | bn, |
| 237 | msg, |
| 238 | gc_info, |
| 239 | workdone, |
| 240 | stats_to_update, |
| 241 | logical_rows_delta); |
| 242 | } else { |
| 243 | toku_ft_status_note_msn_discard(); |
| 244 | } |
| 245 | |
| 246 | // We must always mark message as stale since it has been marked |
| 247 | // (using omt::iterate_and_mark_range) |
| 248 | // It is possible to call do_bn_apply_msg even when it won't apply the |
| 249 | // message because the node containing it could have been evicted and |
| 250 | // brought back in. |
| 251 | msg_buffer->set_freshness(offset, false); |
| 252 | } |
| 253 | |
| 254 | |
| 255 | struct { |
| 256 | FT_HANDLE ; |
| 257 | BASEMENTNODE ; |
| 258 | NONLEAF_CHILDINFO ; |
| 259 | txn_gc_info *; |
| 260 | uint64_t *; |
| 261 | STAT64INFO ; |
| 262 | int64_t *; |
| 263 | }; |
| 264 | |
| 265 | int iterate_do_bn_apply_msg( |
| 266 | const int32_t &offset, |
| 267 | const uint32_t UU(idx), |
| 268 | struct iterate_do_bn_apply_msg_extra* const e) |
| 269 | __attribute__((nonnull(3))); |
| 270 | |
| 271 | int ( |
| 272 | const int32_t &offset, |
| 273 | const uint32_t UU(idx), |
| 274 | struct iterate_do_bn_apply_msg_extra* const e) |
| 275 | { |
| 276 | do_bn_apply_msg( |
| 277 | e->t, |
| 278 | e->bn, |
| 279 | &e->bnc->msg_buffer, |
| 280 | offset, |
| 281 | e->gc_info, |
| 282 | e->workdone, |
| 283 | e->stats_to_update, |
| 284 | e->logical_rows_delta); |
| 285 | return 0; |
| 286 | } |
| 287 | |
| 288 | /** |
| 289 | * Given the bounds of the basement node to which we will apply messages, |
| 290 | * find the indexes within message_tree which contain the range of |
| 291 | * relevant messages. |
| 292 | * |
| 293 | * The message tree contains offsets into the buffer, where messages are |
| 294 | * found. The pivot_bounds are the lower bound exclusive and upper bound |
| 295 | * inclusive, because they come from pivot keys in the tree. We want OMT |
| 296 | * indices, which must have the lower bound be inclusive and the upper |
| 297 | * bound exclusive. We will get these by telling omt::find to look |
| 298 | * for something strictly bigger than each of our pivot bounds. |
| 299 | * |
| 300 | * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper |
| 301 | * bound exclusive). |
| 302 | */ |
| 303 | template<typename find_bounds_omt_t> |
| 304 | static void |
| 305 | find_bounds_within_message_tree( |
| 306 | const toku::comparator &cmp, |
| 307 | const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices |
| 308 | message_buffer *msg_buffer, /// message buffer in which messages are found |
| 309 | const pivot_bounds &bounds, /// key bounds within the basement node we're applying messages to |
| 310 | uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree) |
| 311 | uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree) |
| 312 | ) |
| 313 | { |
| 314 | int r = 0; |
| 315 | |
| 316 | if (!toku_dbt_is_empty(bounds.lbe())) { |
| 317 | // By setting msn to MAX_MSN and by using direction of +1, we will |
| 318 | // get the first message greater than (in (key, msn) order) any |
| 319 | // message (with any msn) with the key lower_bound_exclusive. |
| 320 | // This will be a message we want to try applying, so it is the |
| 321 | // "lower bound inclusive" within the message_tree. |
| 322 | struct toku_msg_buffer_key_msn_heaviside_extra (cmp, msg_buffer, bounds.lbe(), MAX_MSN); |
| 323 | int32_t found_lb; |
| 324 | r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi); |
| 325 | if (r == DB_NOTFOUND) { |
| 326 | // There is no relevant data (the lower bound is bigger than |
| 327 | // any message in this tree), so we have no range and we're |
| 328 | // done. |
| 329 | *lbi = 0; |
| 330 | *ube = 0; |
| 331 | return; |
| 332 | } |
| 333 | if (!toku_dbt_is_empty(bounds.ubi())) { |
| 334 | // Check if what we found for lbi is greater than the upper |
| 335 | // bound inclusive that we have. If so, there are no relevant |
| 336 | // messages between these bounds. |
| 337 | const DBT *ubi = bounds.ubi(); |
| 338 | const int32_t offset = found_lb; |
| 339 | DBT found_lbidbt; |
| 340 | msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr); |
| 341 | int c = cmp(&found_lbidbt, ubi); |
| 342 | // These DBTs really are both inclusive bounds, so we need |
| 343 | // strict inequality in order to determine that there's |
| 344 | // nothing between them. If they're equal, then we actually |
| 345 | // need to apply the message pointed to by lbi, and also |
| 346 | // anything with the same key but a bigger msn. |
| 347 | if (c > 0) { |
| 348 | *lbi = 0; |
| 349 | *ube = 0; |
| 350 | return; |
| 351 | } |
| 352 | } |
| 353 | } else { |
| 354 | // No lower bound given, it's negative infinity, so we start at |
| 355 | // the first message in the OMT. |
| 356 | *lbi = 0; |
| 357 | } |
| 358 | if (!toku_dbt_is_empty(bounds.ubi())) { |
| 359 | // Again, we use an msn of MAX_MSN and a direction of +1 to get |
| 360 | // the first thing bigger than the upper_bound_inclusive key. |
| 361 | // This is therefore the smallest thing we don't want to apply, |
| 362 | // and omt::iterate_on_range will not examine it. |
| 363 | struct toku_msg_buffer_key_msn_heaviside_extra (cmp, msg_buffer, bounds.ubi(), MAX_MSN); |
| 364 | r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube); |
| 365 | if (r == DB_NOTFOUND) { |
| 366 | // Couldn't find anything in the buffer bigger than our key, |
| 367 | // so we need to look at everything up to the end of |
| 368 | // message_tree. |
| 369 | *ube = message_tree.size(); |
| 370 | } |
| 371 | } else { |
| 372 | // No upper bound given, it's positive infinity, so we need to go |
| 373 | // through the end of the OMT. |
| 374 | *ube = message_tree.size(); |
| 375 | } |
| 376 | } |
| 377 | |
| 378 | // For each message in the ancestor's buffer (determined by childnum) that |
| 379 | // is key-wise between lower_bound_exclusive and upper_bound_inclusive, |
| 380 | // apply the message to the basement node. We treat the bounds as minus |
| 381 | // or plus infinity respectively if they are NULL. Do not mark the node |
| 382 | // as dirty (preserve previous state of 'dirty' bit). |
| 383 | static void bnc_apply_messages_to_basement_node( |
| 384 | FT_HANDLE t, // used for comparison function |
| 385 | BASEMENTNODE bn, // where to apply messages |
| 386 | FTNODE ancestor, // the ancestor node where we can find messages to apply |
| 387 | int childnum, // which child buffer of ancestor contains messages we want |
| 388 | const pivot_bounds & |
| 389 | bounds, // contains pivot key bounds of this basement node |
| 390 | txn_gc_info *gc_info, |
| 391 | bool *msgs_applied) { |
| 392 | int r; |
| 393 | NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum); |
| 394 | |
| 395 | // Determine the offsets in the message trees between which we need to |
| 396 | // apply messages from this buffer |
| 397 | STAT64INFO_S stats_delta = {0, 0}; |
| 398 | uint64_t workdone_this_ancestor = 0; |
| 399 | int64_t logical_rows_delta = 0; |
| 400 | |
| 401 | uint32_t stale_lbi, stale_ube; |
| 402 | if (!bn->stale_ancestor_messages_applied) { |
| 403 | find_bounds_within_message_tree(t->ft->cmp, |
| 404 | bnc->stale_message_tree, |
| 405 | &bnc->msg_buffer, |
| 406 | bounds, |
| 407 | &stale_lbi, |
| 408 | &stale_ube); |
| 409 | } else { |
| 410 | stale_lbi = 0; |
| 411 | stale_ube = 0; |
| 412 | } |
| 413 | uint32_t fresh_lbi, fresh_ube; |
| 414 | find_bounds_within_message_tree(t->ft->cmp, |
| 415 | bnc->fresh_message_tree, |
| 416 | &bnc->msg_buffer, |
| 417 | bounds, |
| 418 | &fresh_lbi, |
| 419 | &fresh_ube); |
| 420 | |
| 421 | // We now know where all the messages we must apply are, so one of the |
| 422 | // following 4 cases will do the application, depending on which of |
| 423 | // the lists contains relevant messages: |
| 424 | // |
| 425 | // 1. broadcast messages and anything else, or a mix of fresh and stale |
| 426 | // 2. only fresh messages |
| 427 | // 3. only stale messages |
| 428 | if (bnc->broadcast_list.size() > 0 || |
| 429 | (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) { |
| 430 | // We have messages in multiple trees, so we grab all |
| 431 | // the relevant messages' offsets and sort them by MSN, then apply |
| 432 | // them in MSN order. |
| 433 | const int buffer_size = |
| 434 | ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + |
| 435 | bnc->broadcast_list.size()); |
| 436 | toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t)); |
| 437 | int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get()); |
| 438 | struct store_msg_buffer_offset_extra = {.offsets = offsets, |
| 439 | .i = 0}; |
| 440 | |
| 441 | // Populate offsets array with offsets to stale messages |
| 442 | r = bnc->stale_message_tree |
| 443 | .iterate_on_range<struct store_msg_buffer_offset_extra, |
| 444 | store_msg_buffer_offset>( |
| 445 | stale_lbi, stale_ube, &sfo_extra); |
| 446 | assert_zero(r); |
| 447 | |
| 448 | // Then store fresh offsets, and mark them to be moved to stale later. |
| 449 | r = bnc->fresh_message_tree |
| 450 | .iterate_and_mark_range<struct store_msg_buffer_offset_extra, |
| 451 | store_msg_buffer_offset>( |
| 452 | fresh_lbi, fresh_ube, &sfo_extra); |
| 453 | assert_zero(r); |
| 454 | |
| 455 | // Store offsets of all broadcast messages. |
| 456 | r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra, |
| 457 | store_msg_buffer_offset>(&sfo_extra); |
| 458 | assert_zero(r); |
| 459 | invariant(sfo_extra.i == buffer_size); |
| 460 | |
| 461 | // Sort by MSN. |
| 462 | toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>:: |
| 463 | mergesort_r(offsets, buffer_size, bnc->msg_buffer); |
| 464 | |
| 465 | // Apply the messages in MSN order. |
| 466 | for (int i = 0; i < buffer_size; ++i) { |
| 467 | *msgs_applied = true; |
| 468 | do_bn_apply_msg(t, |
| 469 | bn, |
| 470 | &bnc->msg_buffer, |
| 471 | offsets[i], |
| 472 | gc_info, |
| 473 | &workdone_this_ancestor, |
| 474 | &stats_delta, |
| 475 | &logical_rows_delta); |
| 476 | } |
| 477 | } else if (stale_lbi == stale_ube) { |
| 478 | // No stale messages to apply, we just apply fresh messages, and mark |
| 479 | // them to be moved to stale later. |
| 480 | struct iterate_do_bn_apply_msg_extra = { |
| 481 | .t = t, |
| 482 | .bn = bn, |
| 483 | .bnc = bnc, |
| 484 | .gc_info = gc_info, |
| 485 | .workdone = &workdone_this_ancestor, |
| 486 | .stats_to_update = &stats_delta, |
| 487 | .logical_rows_delta = &logical_rows_delta}; |
| 488 | if (fresh_ube - fresh_lbi > 0) |
| 489 | *msgs_applied = true; |
| 490 | r = bnc->fresh_message_tree |
| 491 | .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra, |
| 492 | iterate_do_bn_apply_msg>( |
| 493 | fresh_lbi, fresh_ube, &iter_extra); |
| 494 | assert_zero(r); |
| 495 | } else { |
| 496 | invariant(fresh_lbi == fresh_ube); |
| 497 | // No fresh messages to apply, we just apply stale messages. |
| 498 | |
| 499 | if (stale_ube - stale_lbi > 0) |
| 500 | *msgs_applied = true; |
| 501 | struct iterate_do_bn_apply_msg_extra = { |
| 502 | .t = t, |
| 503 | .bn = bn, |
| 504 | .bnc = bnc, |
| 505 | .gc_info = gc_info, |
| 506 | .workdone = &workdone_this_ancestor, |
| 507 | .stats_to_update = &stats_delta, |
| 508 | .logical_rows_delta = &logical_rows_delta}; |
| 509 | |
| 510 | r = bnc->stale_message_tree |
| 511 | .iterate_on_range<struct iterate_do_bn_apply_msg_extra, |
| 512 | iterate_do_bn_apply_msg>( |
| 513 | stale_lbi, stale_ube, &iter_extra); |
| 514 | assert_zero(r); |
| 515 | } |
| 516 | // |
| 517 | // update stats |
| 518 | // |
| 519 | if (workdone_this_ancestor > 0) { |
| 520 | (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum), |
| 521 | workdone_this_ancestor); |
| 522 | } |
| 523 | if (stats_delta.numbytes || stats_delta.numrows) { |
| 524 | toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta); |
| 525 | } |
| 526 | toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta); |
| 527 | bn->logical_rows_delta += logical_rows_delta; |
| 528 | } |
| 529 | |
| 530 | static void |
| 531 | apply_ancestors_messages_to_bn( |
| 532 | FT_HANDLE t, |
| 533 | FTNODE node, |
| 534 | int childnum, |
| 535 | ANCESTORS ancestors, |
| 536 | const pivot_bounds &bounds, |
| 537 | txn_gc_info *gc_info, |
| 538 | bool* msgs_applied |
| 539 | ) |
| 540 | { |
| 541 | BASEMENTNODE curr_bn = BLB(node, childnum); |
| 542 | const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum); |
| 543 | for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { |
| 544 | if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) { |
| 545 | paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); |
| 546 | bnc_apply_messages_to_basement_node( |
| 547 | t, |
| 548 | curr_bn, |
| 549 | curr_ancestors->node, |
| 550 | curr_ancestors->childnum, |
| 551 | curr_bounds, |
| 552 | gc_info, |
| 553 | msgs_applied |
| 554 | ); |
| 555 | // We don't want to check this ancestor node again if the |
| 556 | // next time we query it, the msn hasn't changed. |
| 557 | curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk; |
| 558 | } |
| 559 | } |
| 560 | // At this point, we know all the stale messages above this |
| 561 | // basement node have been applied, and any new messages will be |
| 562 | // fresh, so we don't need to look at stale messages for this |
| 563 | // basement node, unless it gets evicted (and this field becomes |
| 564 | // false when it's read in again). |
| 565 | curr_bn->stale_ancestor_messages_applied = true; |
| 566 | } |
| 567 | |
| 568 | void |
| 569 | toku_apply_ancestors_messages_to_node ( |
| 570 | FT_HANDLE t, |
| 571 | FTNODE node, |
| 572 | ANCESTORS ancestors, |
| 573 | const pivot_bounds &bounds, |
| 574 | bool* msgs_applied, |
| 575 | int child_to_read |
| 576 | ) |
| 577 | // Effect: |
| 578 | // Bring a leaf node up-to-date according to all the messages in the ancestors. |
| 579 | // If the leaf node is already up-to-date then do nothing. |
| 580 | // If the leaf node is not already up-to-date, then record the work done |
| 581 | // for that leaf in each ancestor. |
| 582 | // Requires: |
| 583 | // This is being called when pinning a leaf node for the query path. |
| 584 | // The entire root-to-leaf path is pinned and appears in the ancestors list. |
| 585 | { |
| 586 | VERIFY_NODE(t, node); |
| 587 | paranoid_invariant(node->height == 0); |
| 588 | |
| 589 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t); |
| 590 | txn_manager_state txn_state_for_gc(txn_manager); |
| 591 | |
| 592 | TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t); |
| 593 | txn_gc_info gc_info(&txn_state_for_gc, |
| 594 | oldest_referenced_xid_for_simple_gc, |
| 595 | node->oldest_referenced_xid_known, |
| 596 | true); |
| 597 | if (!node->dirty && child_to_read >= 0) { |
| 598 | paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL); |
| 599 | apply_ancestors_messages_to_bn( |
| 600 | t, |
| 601 | node, |
| 602 | child_to_read, |
| 603 | ancestors, |
| 604 | bounds, |
| 605 | &gc_info, |
| 606 | msgs_applied |
| 607 | ); |
| 608 | } |
| 609 | else { |
| 610 | // know we are a leaf node |
| 611 | // An important invariant: |
| 612 | // We MUST bring every available basement node for a dirty node up to date. |
| 613 | // flushing on the cleaner thread depends on this. This invariant |
| 614 | // allows the cleaner thread to just pick an internal node and flush it |
| 615 | // as opposed to being forced to start from the root. |
| 616 | for (int i = 0; i < node->n_children; i++) { |
| 617 | if (BP_STATE(node, i) != PT_AVAIL) { continue; } |
| 618 | apply_ancestors_messages_to_bn( |
| 619 | t, |
| 620 | node, |
| 621 | i, |
| 622 | ancestors, |
| 623 | bounds, |
| 624 | &gc_info, |
| 625 | msgs_applied |
| 626 | ); |
| 627 | } |
| 628 | } |
| 629 | VERIFY_NODE(t, node); |
| 630 | } |
| 631 | |
| 632 | static bool bn_needs_ancestors_messages( |
| 633 | FT ft, |
| 634 | FTNODE node, |
| 635 | int childnum, |
| 636 | const pivot_bounds &bounds, |
| 637 | ANCESTORS ancestors, |
| 638 | MSN* max_msn_applied |
| 639 | ) |
| 640 | { |
| 641 | BASEMENTNODE bn = BLB(node, childnum); |
| 642 | const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum); |
| 643 | bool needs_ancestors_messages = false; |
| 644 | for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { |
| 645 | if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) { |
| 646 | paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); |
| 647 | NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum); |
| 648 | if (bnc->broadcast_list.size() > 0) { |
| 649 | needs_ancestors_messages = true; |
| 650 | goto cleanup; |
| 651 | } |
| 652 | if (!bn->stale_ancestor_messages_applied) { |
| 653 | uint32_t stale_lbi, stale_ube; |
| 654 | find_bounds_within_message_tree(ft->cmp, |
| 655 | bnc->stale_message_tree, |
| 656 | &bnc->msg_buffer, |
| 657 | curr_bounds, |
| 658 | &stale_lbi, |
| 659 | &stale_ube); |
| 660 | if (stale_lbi < stale_ube) { |
| 661 | needs_ancestors_messages = true; |
| 662 | goto cleanup; |
| 663 | } |
| 664 | } |
| 665 | uint32_t fresh_lbi, fresh_ube; |
| 666 | find_bounds_within_message_tree(ft->cmp, |
| 667 | bnc->fresh_message_tree, |
| 668 | &bnc->msg_buffer, |
| 669 | curr_bounds, |
| 670 | &fresh_lbi, |
| 671 | &fresh_ube); |
| 672 | if (fresh_lbi < fresh_ube) { |
| 673 | needs_ancestors_messages = true; |
| 674 | goto cleanup; |
| 675 | } |
| 676 | if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) { |
| 677 | max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn; |
| 678 | } |
| 679 | } |
| 680 | } |
| 681 | cleanup: |
| 682 | return needs_ancestors_messages; |
| 683 | } |
| 684 | |
| 685 | bool toku_ft_leaf_needs_ancestors_messages( |
| 686 | FT ft, |
| 687 | FTNODE node, |
| 688 | ANCESTORS ancestors, |
| 689 | const pivot_bounds &bounds, |
| 690 | MSN *const max_msn_in_path, |
| 691 | int child_to_read |
| 692 | ) |
| 693 | // Effect: Determine whether there are messages in a node's ancestors |
| 694 | // which must be applied to it. These messages are in the correct |
| 695 | // keyrange for any available basement nodes, and are in nodes with the |
| 696 | // correct max_msn_applied_to_node_on_disk. |
| 697 | // Notes: |
| 698 | // This is an approximate query. |
| 699 | // Output: |
| 700 | // max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over |
| 701 | // ancestors. This is used later to update basement nodes' |
| 702 | // max_msn_applied values in case we don't do the full algorithm. |
| 703 | // Returns: |
| 704 | // true if there may be some such messages |
| 705 | // false only if there are definitely no such messages |
| 706 | // Rationale: |
| 707 | // When we pin a node with a read lock, we want to quickly determine if |
| 708 | // we should exchange it for a write lock in preparation for applying |
| 709 | // messages. If there are no messages, we don't need the write lock. |
| 710 | { |
| 711 | paranoid_invariant(node->height == 0); |
| 712 | bool needs_ancestors_messages = false; |
| 713 | // child_to_read may be -1 in test cases |
| 714 | if (!node->dirty && child_to_read >= 0) { |
| 715 | paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL); |
| 716 | needs_ancestors_messages = bn_needs_ancestors_messages( |
| 717 | ft, |
| 718 | node, |
| 719 | child_to_read, |
| 720 | bounds, |
| 721 | ancestors, |
| 722 | max_msn_in_path |
| 723 | ); |
| 724 | } |
| 725 | else { |
| 726 | for (int i = 0; i < node->n_children; ++i) { |
| 727 | if (BP_STATE(node, i) != PT_AVAIL) { continue; } |
| 728 | needs_ancestors_messages = bn_needs_ancestors_messages( |
| 729 | ft, |
| 730 | node, |
| 731 | i, |
| 732 | bounds, |
| 733 | ancestors, |
| 734 | max_msn_in_path |
| 735 | ); |
| 736 | if (needs_ancestors_messages) { |
| 737 | goto cleanup; |
| 738 | } |
| 739 | } |
| 740 | } |
| 741 | cleanup: |
| 742 | return needs_ancestors_messages; |
| 743 | } |
| 744 | |
| 745 | void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) { |
| 746 | invariant(node->height == 0); |
| 747 | if (!node->dirty && child_to_read >= 0) { |
| 748 | paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL); |
| 749 | BASEMENTNODE bn = BLB(node, child_to_read); |
| 750 | if (max_msn_applied.msn > bn->max_msn_applied.msn) { |
| 751 | // see comment below |
| 752 | (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); |
| 753 | } |
| 754 | } |
| 755 | else { |
| 756 | for (int i = 0; i < node->n_children; ++i) { |
| 757 | if (BP_STATE(node, i) != PT_AVAIL) { continue; } |
| 758 | BASEMENTNODE bn = BLB(node, i); |
| 759 | if (max_msn_applied.msn > bn->max_msn_applied.msn) { |
| 760 | // This function runs in a shared access context, so to silence tools |
| 761 | // like DRD, we use a CAS and ignore the result. |
| 762 | // Any threads trying to update these basement nodes should be |
| 763 | // updating them to the same thing (since they all have a read lock on |
| 764 | // the same root-to-leaf path) so this is safe. |
| 765 | (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); |
| 766 | } |
| 767 | } |
| 768 | } |
| 769 | } |
| 770 | |
| 771 | struct { |
| 772 | FT ; |
| 773 | NONLEAF_CHILDINFO ; |
| 774 | }; |
| 775 | |
| 776 | int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const ) __attribute__((nonnull(3))); |
| 777 | int (const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const ) |
| 778 | { |
| 779 | MSN msn; |
| 780 | DBT key; |
| 781 | extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn); |
| 782 | struct toku_msg_buffer_key_msn_heaviside_extra (extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn); |
| 783 | int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr); |
| 784 | invariant_zero(r); |
| 785 | return 0; |
| 786 | } |
| 787 | |
| 788 | void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) { |
| 789 | struct copy_to_stale_extra = { .ft = ft, .bnc = bnc }; |
| 790 | int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra); |
| 791 | invariant_zero(r); |
| 792 | bnc->fresh_message_tree.delete_all_marked(); |
| 793 | } |
| 794 | |
| 795 | void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) { |
| 796 | invariant(node->height > 0); |
| 797 | for (int i = 0; i < node->n_children; ++i) { |
| 798 | if (BP_STATE(node, i) != PT_AVAIL) { |
| 799 | continue; |
| 800 | } |
| 801 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
| 802 | // We can't delete things out of the fresh tree inside the above |
| 803 | // procedures because we're still looking at the fresh tree. Instead |
| 804 | // we have to move messages after we're done looking at it. |
| 805 | toku_ft_bnc_move_messages_to_stale(ft, bnc); |
| 806 | } |
| 807 | } |
| 808 | |
| 809 | // |
| 810 | // Balance // Availibility // Size |
| 811 | |
| 812 | struct rebalance_array_info { |
| 813 | uint32_t offset; |
| 814 | LEAFENTRY *le_array; |
| 815 | uint32_t *key_sizes_array; |
| 816 | const void **key_ptr_array; |
| 817 | static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le, |
| 818 | const uint32_t idx, struct rebalance_array_info *const ai) { |
| 819 | ai->le_array[idx+ai->offset] = le; |
| 820 | ai->key_sizes_array[idx+ai->offset] = keylen; |
| 821 | ai->key_ptr_array[idx+ai->offset] = key; |
| 822 | return 0; |
| 823 | } |
| 824 | }; |
| 825 | |
| 826 | // There must still be at least one child |
| 827 | // Requires that all messages in buffers above have been applied. |
| 828 | // Because all messages above have been applied, setting msn of all new basements |
| 829 | // to max msn of existing basements is correct. (There cannot be any messages in |
| 830 | // buffers above that still need to be applied.) |
| 831 | void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) { |
| 832 | |
| 833 | assert(node->height == 0); |
| 834 | assert(node->dirty); |
| 835 | |
| 836 | uint32_t num_orig_basements = node->n_children; |
| 837 | // Count number of leaf entries in this leaf (num_le). |
| 838 | uint32_t num_le = 0; |
| 839 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
| 840 | num_le += BLB_DATA(node, i)->num_klpairs(); |
| 841 | } |
| 842 | |
| 843 | uint32_t num_alloc = num_le ? num_le : 1; // simplify logic below by always having at least one entry per array |
| 844 | |
| 845 | // Create an array of OMTVALUE's that store all the pointers to all the data. |
| 846 | // Each element in leafpointers is a pointer to a leaf. |
| 847 | toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc); |
| 848 | LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get()); |
| 849 | leafpointers[0] = NULL; |
| 850 | |
| 851 | toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc); |
| 852 | const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get()); |
| 853 | key_pointers[0] = NULL; |
| 854 | |
| 855 | toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc); |
| 856 | uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get()); |
| 857 | |
| 858 | // Capture pointers to old mempools' buffers (so they can be destroyed) |
| 859 | toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements); |
| 860 | BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get()); |
| 861 | old_bns[0] = NULL; |
| 862 | |
| 863 | uint32_t curr_le = 0; |
| 864 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
| 865 | bn_data* bd = BLB_DATA(node, i); |
| 866 | struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers }; |
| 867 | bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai); |
| 868 | curr_le += bd->num_klpairs(); |
| 869 | } |
| 870 | |
| 871 | // Create an array that will store indexes of new pivots. |
| 872 | // Each element in new_pivots is the index of a pivot key. |
| 873 | // (Allocating num_le of them is overkill, but num_le is an upper bound.) |
| 874 | toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc); |
| 875 | uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get()); |
| 876 | new_pivots[0] = 0; |
| 877 | |
| 878 | // Each element in le_sizes is the size of the leafentry pointed to by leafpointers. |
| 879 | toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc); |
| 880 | size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get()); |
| 881 | le_sizes[0] = 0; |
| 882 | |
| 883 | // Create an array that will store the size of each basement. |
| 884 | // This is the sum of the leaf sizes of all the leaves in that basement. |
| 885 | // We don't know how many basements there will be, so we use num_le as the upper bound. |
| 886 | |
| 887 | // Sum of all le sizes in a single basement |
| 888 | toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc); |
| 889 | size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get()); |
| 890 | |
| 891 | // Sum of all key sizes in a single basement |
| 892 | toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc); |
| 893 | size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get()); |
| 894 | |
| 895 | // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les). |
| 896 | // Each entry is the number of leafentries in this basement. (Again, num_le is overkill upper baound.) |
| 897 | toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc); |
| 898 | uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get()); |
| 899 | num_les_this_bn[0] = 0; |
| 900 | |
| 901 | // Figure out the new pivots. |
| 902 | // We need the index of each pivot, and for each basement we need |
| 903 | // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement). |
| 904 | uint32_t curr_pivot = 0; |
| 905 | uint32_t num_le_in_curr_bn = 0; |
| 906 | uint32_t bn_size_so_far = 0; |
| 907 | for (uint32_t i = 0; i < num_le; i++) { |
| 908 | uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]); |
| 909 | le_sizes[i] = curr_le_size; |
| 910 | if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) { |
| 911 | // cap off the current basement node to end with the element before i |
| 912 | new_pivots[curr_pivot] = i-1; |
| 913 | curr_pivot++; |
| 914 | num_le_in_curr_bn = 0; |
| 915 | bn_size_so_far = 0; |
| 916 | } |
| 917 | num_le_in_curr_bn++; |
| 918 | num_les_this_bn[curr_pivot] = num_le_in_curr_bn; |
| 919 | bn_le_sizes[curr_pivot] += curr_le_size; |
| 920 | bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i]; // uint32_t le_offset |
| 921 | bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i]; |
| 922 | } |
| 923 | // curr_pivot is now the total number of pivot keys in the leaf node |
| 924 | int num_pivots = curr_pivot; |
| 925 | int num_children = num_pivots + 1; |
| 926 | |
| 927 | // now we need to fill in the new basement nodes and pivots |
| 928 | |
| 929 | // TODO: (Zardosht) this is an ugly thing right now |
| 930 | // Need to figure out how to properly deal with seqinsert. |
| 931 | // I am not happy with how this is being |
| 932 | // handled with basement nodes |
| 933 | uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1); |
| 934 | |
| 935 | // choose the max msn applied to any basement as the max msn applied to all new basements |
| 936 | MSN max_msn = ZERO_MSN; |
| 937 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
| 938 | MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i); |
| 939 | max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn; |
| 940 | } |
| 941 | // remove the basement node in the node, we've saved a copy |
| 942 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
| 943 | // save a reference to the old basement nodes |
| 944 | // we will need them to ensure that the memory |
| 945 | // stays intact |
| 946 | old_bns[i] = toku_detach_bn(node, i); |
| 947 | } |
| 948 | // Now destroy the old basements, but do not destroy leaves |
| 949 | toku_destroy_ftnode_internals(node); |
| 950 | |
| 951 | // now reallocate pieces and start filling them in |
| 952 | invariant(num_children > 0); |
| 953 | |
| 954 | node->n_children = num_children; |
| 955 | XCALLOC_N(num_children, node->bp); // allocate pointers to basements (bp) |
| 956 | for (int i = 0; i < num_children; i++) { |
| 957 | set_BLB(node, i, toku_create_empty_bn()); // allocate empty basements and set bp pointers |
| 958 | } |
| 959 | |
| 960 | // now we start to fill in the data |
| 961 | |
| 962 | // first the pivots |
| 963 | toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT)); |
| 964 | DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get()); |
| 965 | for (int i = 0; i < num_pivots; i++) { |
| 966 | uint32_t size = key_sizes[new_pivots[i]]; |
| 967 | const void *key = key_pointers[new_pivots[i]]; |
| 968 | toku_fill_dbt(&pivotkeys[i], key, size); |
| 969 | } |
| 970 | node->pivotkeys.create_from_dbts(pivotkeys, num_pivots); |
| 971 | |
| 972 | uint32_t baseindex_this_bn = 0; |
| 973 | // now the basement nodes |
| 974 | for (int i = 0; i < num_children; i++) { |
| 975 | // put back seqinsert |
| 976 | BLB_SEQINSERT(node, i) = tmp_seqinsert; |
| 977 | |
| 978 | // create start (inclusive) and end (exclusive) boundaries for data of basement node |
| 979 | uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1; // index of first leaf in basement |
| 980 | uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1; // index of first leaf in next basement |
| 981 | uint32_t num_in_bn = curr_end - curr_start; // number of leaves in this basement |
| 982 | |
| 983 | // create indexes for new basement |
| 984 | invariant(baseindex_this_bn == curr_start); |
| 985 | uint32_t num_les_to_copy = num_les_this_bn[i]; |
| 986 | invariant(num_les_to_copy == num_in_bn); |
| 987 | |
| 988 | bn_data* bd = BLB_DATA(node, i); |
| 989 | bd->set_contents_as_clone_of_sorted_array( |
| 990 | num_les_to_copy, |
| 991 | &key_pointers[baseindex_this_bn], |
| 992 | &key_sizes[baseindex_this_bn], |
| 993 | &leafpointers[baseindex_this_bn], |
| 994 | &le_sizes[baseindex_this_bn], |
| 995 | bn_key_sizes[i], // Total key sizes |
| 996 | bn_le_sizes[i] // total le sizes |
| 997 | ); |
| 998 | |
| 999 | BP_STATE(node,i) = PT_AVAIL; |
| 1000 | BP_TOUCH_CLOCK(node,i); |
| 1001 | BLB_MAX_MSN_APPLIED(node,i) = max_msn; |
| 1002 | baseindex_this_bn += num_les_to_copy; // set to index of next bn |
| 1003 | } |
| 1004 | node->max_msn_applied_to_node_on_disk = max_msn; |
| 1005 | |
| 1006 | // destroy buffers of old mempools |
| 1007 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
| 1008 | destroy_basement_node(old_bns[i]); |
| 1009 | } |
| 1010 | } |
| 1011 | |
| 1012 | bool toku_ftnode_fully_in_memory(FTNODE node) { |
| 1013 | for (int i = 0; i < node->n_children; i++) { |
| 1014 | if (BP_STATE(node,i) != PT_AVAIL) { |
| 1015 | return false; |
| 1016 | } |
| 1017 | } |
| 1018 | return true; |
| 1019 | } |
| 1020 | |
| 1021 | void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) { |
| 1022 | paranoid_invariant(toku_ftnode_fully_in_memory(node)); |
| 1023 | } |
| 1024 | |
| 1025 | uint32_t toku_ftnode_leaf_num_entries(FTNODE node) { |
| 1026 | toku_ftnode_assert_fully_in_memory(node); |
| 1027 | uint32_t num_entries = 0; |
| 1028 | for (int i = 0; i < node->n_children; i++) { |
| 1029 | num_entries += BLB_DATA(node, i)->num_klpairs(); |
| 1030 | } |
| 1031 | return num_entries; |
| 1032 | } |
| 1033 | |
| 1034 | enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) { |
| 1035 | enum reactivity re = RE_STABLE; |
| 1036 | toku_ftnode_assert_fully_in_memory(node); |
| 1037 | paranoid_invariant(node->height==0); |
| 1038 | unsigned int size = toku_serialize_ftnode_size(node); |
| 1039 | if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) { |
| 1040 | re = RE_FISSIBLE; |
| 1041 | } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) { |
| 1042 | re = RE_FUSIBLE; |
| 1043 | } |
| 1044 | return re; |
| 1045 | } |
| 1046 | |
| 1047 | enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) { |
| 1048 | paranoid_invariant(node->height > 0); |
| 1049 | int n_children = node->n_children; |
| 1050 | if (n_children > (int) fanout) { |
| 1051 | return RE_FISSIBLE; |
| 1052 | } |
| 1053 | if (n_children * 4 < (int) fanout) { |
| 1054 | return RE_FUSIBLE; |
| 1055 | } |
| 1056 | return RE_STABLE; |
| 1057 | } |
| 1058 | |
| 1059 | enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) { |
| 1060 | toku_ftnode_assert_fully_in_memory(node); |
| 1061 | if (node->height == 0) { |
| 1062 | return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize); |
| 1063 | } else { |
| 1064 | return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout); |
| 1065 | } |
| 1066 | } |
| 1067 | |
| 1068 | unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) { |
| 1069 | return bnc->msg_buffer.buffer_size_in_use(); |
| 1070 | } |
| 1071 | |
| 1072 | // Return true if the size of the buffers plus the amount of work done is large enough. |
| 1073 | // Return false if there is nothing to be flushed (the buffers empty). |
| 1074 | bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) { |
| 1075 | uint64_t size = toku_serialize_ftnode_size(node); |
| 1076 | |
| 1077 | bool buffers_are_empty = true; |
| 1078 | toku_ftnode_assert_fully_in_memory(node); |
| 1079 | // |
| 1080 | // the nonleaf node is gorged if the following holds true: |
| 1081 | // - the buffers are non-empty |
| 1082 | // - the total workdone by the buffers PLUS the size of the buffers |
| 1083 | // is greater than nodesize (which as of Maxwell should be |
| 1084 | // 4MB) |
| 1085 | // |
| 1086 | paranoid_invariant(node->height > 0); |
| 1087 | for (int child = 0; child < node->n_children; ++child) { |
| 1088 | size += BP_WORKDONE(node, child); |
| 1089 | } |
| 1090 | for (int child = 0; child < node->n_children; ++child) { |
| 1091 | if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) { |
| 1092 | buffers_are_empty = false; |
| 1093 | break; |
| 1094 | } |
| 1095 | } |
| 1096 | return ((size > nodesize) |
| 1097 | && |
| 1098 | (!buffers_are_empty)); |
| 1099 | } |
| 1100 | |
| 1101 | int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) { |
| 1102 | return bnc->msg_buffer.num_entries(); |
| 1103 | } |
| 1104 | |
| 1105 | // how much memory does this child buffer consume? |
| 1106 | long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) { |
| 1107 | return (sizeof(*bnc) + |
| 1108 | bnc->msg_buffer.memory_footprint() + |
| 1109 | bnc->fresh_message_tree.memory_size() + |
| 1110 | bnc->stale_message_tree.memory_size() + |
| 1111 | bnc->broadcast_list.memory_size()); |
| 1112 | } |
| 1113 | |
| 1114 | // how much memory in this child buffer holds useful data? |
| 1115 | // originally created solely for use by test program(s). |
| 1116 | long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) { |
| 1117 | return (sizeof(*bnc) + |
| 1118 | bnc->msg_buffer.memory_size_in_use() + |
| 1119 | bnc->fresh_message_tree.memory_size() + |
| 1120 | bnc->stale_message_tree.memory_size() + |
| 1121 | bnc->broadcast_list.memory_size()); |
| 1122 | } |
| 1123 | |
| 1124 | // |
| 1125 | // Garbage collection |
| 1126 | // Message injection |
| 1127 | // Message application |
| 1128 | // |
| 1129 | |
| 1130 | // Used only by test programs: append a child node to a parent node |
| 1131 | void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) { |
| 1132 | int childnum = node->n_children; |
| 1133 | node->n_children++; |
| 1134 | REALLOC_N(node->n_children, node->bp); |
| 1135 | BP_BLOCKNUM(node,childnum) = child->blocknum; |
| 1136 | BP_STATE(node,childnum) = PT_AVAIL; |
| 1137 | BP_WORKDONE(node, childnum) = 0; |
| 1138 | set_BNC(node, childnum, toku_create_empty_nl()); |
| 1139 | if (pivotkey) { |
| 1140 | invariant(childnum > 0); |
| 1141 | node->pivotkeys.insert_at(pivotkey, childnum - 1); |
| 1142 | } |
| 1143 | node->dirty = 1; |
| 1144 | } |
| 1145 | |
| 1146 | void |
| 1147 | toku_ft_bn_apply_msg_once ( |
| 1148 | BASEMENTNODE bn, |
| 1149 | const ft_msg &msg, |
| 1150 | uint32_t idx, |
| 1151 | uint32_t le_keylen, |
| 1152 | LEAFENTRY le, |
| 1153 | txn_gc_info *gc_info, |
| 1154 | uint64_t *workdone, |
| 1155 | STAT64INFO stats_to_update, |
| 1156 | int64_t *logical_rows_delta |
| 1157 | ) |
| 1158 | // Effect: Apply msg to leafentry (msn is ignored) |
| 1159 | // Calculate work done by message on leafentry and add it to caller's workdone counter. |
| 1160 | // idx is the location where it goes |
| 1161 | // le is old leafentry |
| 1162 | { |
| 1163 | size_t newsize=0, oldsize=0, workdone_this_le=0; |
| 1164 | LEAFENTRY new_le=0; |
| 1165 | // how many bytes of user data (not including overhead) were added or |
| 1166 | // deleted from this row |
| 1167 | int64_t numbytes_delta = 0; |
| 1168 | // will be +1 or -1 or 0 (if row was added or deleted or not) |
| 1169 | int64_t numrows_delta = 0; |
| 1170 | // will be +1, -1 or 0 if a message that was accounted for logically has |
| 1171 | // changed in meaning such as an insert changed to an update or a delete |
| 1172 | // changed to a noop |
| 1173 | int64_t logical_rows_delta_le = 0; |
| 1174 | uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t); |
| 1175 | if (le) { |
| 1176 | oldsize = leafentry_memsize(le) + key_storage_size; |
| 1177 | } |
| 1178 | |
| 1179 | // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt() |
| 1180 | // to allocate more space. That means le is guaranteed to not cause a |
| 1181 | // sigsegv but it may point to a mempool that is no longer in use. |
| 1182 | // We'll have to release the old mempool later. |
| 1183 | logical_rows_delta_le = toku_le_apply_msg( |
| 1184 | msg, |
| 1185 | le, |
| 1186 | &bn->data_buffer, |
| 1187 | idx, |
| 1188 | le_keylen, |
| 1189 | gc_info, |
| 1190 | &new_le, |
| 1191 | &numbytes_delta); |
| 1192 | |
| 1193 | // at this point, we cannot trust cmd->u.id.key to be valid. |
| 1194 | // The dmt may have realloced its mempool and freed the one containing key. |
| 1195 | |
| 1196 | newsize = new_le ? (leafentry_memsize(new_le) + + key_storage_size) : 0; |
| 1197 | if (le && new_le) { |
| 1198 | workdone_this_le = (oldsize > newsize ? oldsize : newsize); // work done is max of le size before and after message application |
| 1199 | |
| 1200 | } else { // we did not just replace a row, so ... |
| 1201 | if (le) { |
| 1202 | // ... we just deleted a row ... |
| 1203 | workdone_this_le = oldsize; |
| 1204 | numrows_delta = -1; |
| 1205 | } |
| 1206 | if (new_le) { |
| 1207 | // ... or we just added a row |
| 1208 | workdone_this_le = newsize; |
| 1209 | numrows_delta = 1; |
| 1210 | } |
| 1211 | } |
| 1212 | if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL |
| 1213 | *workdone += workdone_this_le; |
| 1214 | } |
| 1215 | |
| 1216 | if (FT_LIKELY(logical_rows_delta != NULL)) { |
| 1217 | *logical_rows_delta += logical_rows_delta_le; |
| 1218 | } |
| 1219 | // now update stat64 statistics |
| 1220 | bn->stat64_delta.numrows += numrows_delta; |
| 1221 | bn->stat64_delta.numbytes += numbytes_delta; |
| 1222 | // the only reason stats_to_update may be null is for tests |
| 1223 | if (FT_LIKELY(stats_to_update != NULL)) { |
| 1224 | stats_to_update->numrows += numrows_delta; |
| 1225 | stats_to_update->numbytes += numbytes_delta; |
| 1226 | } |
| 1227 | } |
| 1228 | |
| 1229 | static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in. |
| 1230 | struct { |
| 1231 | uint32_t ; |
| 1232 | bool ; |
| 1233 | // any error code that setval_fun wants to return goes here. |
| 1234 | int ; |
| 1235 | // need arguments for toku_ft_bn_apply_msg_once |
| 1236 | BASEMENTNODE ; |
| 1237 | // captured from original message, not currently used |
| 1238 | MSN ; |
| 1239 | XIDS ; |
| 1240 | const DBT* ; |
| 1241 | uint32_t ; |
| 1242 | uint32_t ; |
| 1243 | LEAFENTRY ; |
| 1244 | txn_gc_info* ; |
| 1245 | uint64_t* ; // set by toku_ft_bn_apply_msg_once() |
| 1246 | STAT64INFO ; |
| 1247 | int64_t* ; |
| 1248 | }; |
| 1249 | |
| 1250 | /* |
| 1251 | * If new_val == NULL, we send a delete message instead of an insert. |
| 1252 | * This happens here instead of in do_delete() for consistency. |
| 1253 | * setval_fun() is called from handlerton, passing in svextra_v |
| 1254 | * from setval_extra_s input arg to ft->update_fun(). |
| 1255 | */ |
| 1256 | static void setval_fun (const DBT *new_val, void *) { |
| 1257 | struct setval_extra_s *CAST_FROM_VOIDP(, svextra_v); |
| 1258 | paranoid_invariant(svextra->tag==setval_tag); |
| 1259 | paranoid_invariant(!svextra->did_set_val); |
| 1260 | svextra->did_set_val = true; |
| 1261 | |
| 1262 | { |
| 1263 | // can't leave scope until toku_ft_bn_apply_msg_once if |
| 1264 | // this is a delete |
| 1265 | DBT val; |
| 1266 | ft_msg msg( |
| 1267 | svextra->key, |
| 1268 | new_val ? new_val : toku_init_dbt(&val), |
| 1269 | new_val ? FT_INSERT : FT_DELETE_ANY, |
| 1270 | svextra->msn, |
| 1271 | svextra->xids); |
| 1272 | toku_ft_bn_apply_msg_once( |
| 1273 | svextra->bn, |
| 1274 | msg, |
| 1275 | svextra->idx, |
| 1276 | svextra->le_keylen, |
| 1277 | svextra->le, |
| 1278 | svextra->gc_info, |
| 1279 | svextra->workdone, |
| 1280 | svextra->stats_to_update, |
| 1281 | svextra->logical_rows_delta); |
| 1282 | svextra->setval_r = 0; |
| 1283 | } |
| 1284 | } |
| 1285 | |
| 1286 | // We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls |
| 1287 | // do_update()), so capturing the msn in the setval_extra_s is not strictly |
| 1288 | // required. The alternative would be to put a dummy msn in the messages |
| 1289 | // created by setval_fun(), but preserving the original msn seems cleaner and |
| 1290 | // it preserves accountability at a lower layer. |
| 1291 | static int do_update( |
| 1292 | ft_update_func update_fun, |
| 1293 | const DESCRIPTOR_S* desc, |
| 1294 | BASEMENTNODE bn, |
| 1295 | const ft_msg &msg, |
| 1296 | uint32_t idx, |
| 1297 | LEAFENTRY le, |
| 1298 | void* keydata, |
| 1299 | uint32_t keylen, |
| 1300 | txn_gc_info* gc_info, |
| 1301 | uint64_t* workdone, |
| 1302 | STAT64INFO stats_to_update, |
| 1303 | int64_t* logical_rows_delta) { |
| 1304 | |
| 1305 | LEAFENTRY le_for_update; |
| 1306 | DBT key; |
| 1307 | const DBT *keyp; |
| 1308 | const DBT *; |
| 1309 | DBT vdbt; |
| 1310 | const DBT *vdbtp; |
| 1311 | |
| 1312 | // the location of data depends whether this is a regular or |
| 1313 | // broadcast update |
| 1314 | if (msg.type() == FT_UPDATE) { |
| 1315 | // key is passed in with command (should be same as from le) |
| 1316 | // update function extra is passed in with command |
| 1317 | keyp = msg.kdbt(); |
| 1318 | update_function_extra = msg.vdbt(); |
| 1319 | } else { |
| 1320 | invariant(msg.type() == FT_UPDATE_BROADCAST_ALL); |
| 1321 | // key is not passed in with broadcast, it comes from le |
| 1322 | // update function extra is passed in with command |
| 1323 | paranoid_invariant(le); // for broadcast updates, we just hit all leafentries |
| 1324 | // so this cannot be null |
| 1325 | paranoid_invariant(keydata); |
| 1326 | paranoid_invariant(keylen); |
| 1327 | paranoid_invariant(msg.kdbt()->size == 0); |
| 1328 | keyp = toku_fill_dbt(&key, keydata, keylen); |
| 1329 | update_function_extra = msg.vdbt(); |
| 1330 | } |
| 1331 | toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL); |
| 1332 | |
| 1333 | if (le && !le_latest_is_del(le)) { |
| 1334 | // if the latest val exists, use it, and we'll use the leafentry later |
| 1335 | uint32_t vallen; |
| 1336 | void *valp = le_latest_val_and_len(le, &vallen); |
| 1337 | vdbtp = toku_fill_dbt(&vdbt, valp, vallen); |
| 1338 | } else { |
| 1339 | // otherwise, the val and leafentry are both going to be null |
| 1340 | vdbtp = NULL; |
| 1341 | } |
| 1342 | le_for_update = le; |
| 1343 | |
| 1344 | struct setval_extra_s = { |
| 1345 | setval_tag, |
| 1346 | false, |
| 1347 | 0, |
| 1348 | bn, |
| 1349 | msg.msn(), |
| 1350 | msg.xids(), |
| 1351 | keyp, |
| 1352 | idx, |
| 1353 | keylen, |
| 1354 | le_for_update, |
| 1355 | gc_info, |
| 1356 | workdone, |
| 1357 | stats_to_update, |
| 1358 | logical_rows_delta |
| 1359 | }; |
| 1360 | // call handlerton's ft->update_fun(), which passes setval_extra |
| 1361 | // to setval_fun() |
| 1362 | FAKE_DB(db, desc); |
| 1363 | int r = update_fun( |
| 1364 | &db, |
| 1365 | keyp, |
| 1366 | vdbtp, |
| 1367 | update_function_extra, |
| 1368 | setval_fun, |
| 1369 | &setval_extra); |
| 1370 | |
| 1371 | if (r == 0) { r = setval_extra.setval_r; } |
| 1372 | return r; |
| 1373 | } |
| 1374 | |
| 1375 | // Should be renamed as something like "apply_msg_to_basement()." |
| 1376 | void toku_ft_bn_apply_msg( |
| 1377 | const toku::comparator& cmp, |
| 1378 | ft_update_func update_fun, |
| 1379 | BASEMENTNODE bn, |
| 1380 | const ft_msg& msg, |
| 1381 | txn_gc_info* gc_info, |
| 1382 | uint64_t* workdone, |
| 1383 | STAT64INFO stats_to_update, |
| 1384 | int64_t* logical_rows_delta) { |
| 1385 | // Effect: |
| 1386 | // Put a msg into a leaf. |
| 1387 | // Calculate work done by message on leafnode and add it to caller's |
| 1388 | // workdone counter. |
| 1389 | // The leaf could end up "too big" or "too small". The caller must fix that up. |
| 1390 | LEAFENTRY storeddata; |
| 1391 | void* key = NULL; |
| 1392 | uint32_t keylen = 0; |
| 1393 | |
| 1394 | uint32_t num_klpairs; |
| 1395 | int r; |
| 1396 | struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt()); |
| 1397 | |
| 1398 | unsigned int doing_seqinsert = bn->seqinsert; |
| 1399 | bn->seqinsert = 0; |
| 1400 | |
| 1401 | switch (msg.type()) { |
| 1402 | case FT_INSERT_NO_OVERWRITE: |
| 1403 | case FT_INSERT: { |
| 1404 | uint32_t idx; |
| 1405 | if (doing_seqinsert) { |
| 1406 | idx = bn->data_buffer.num_klpairs(); |
| 1407 | DBT kdbt; |
| 1408 | r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data); |
| 1409 | if (r != 0) goto fz; |
| 1410 | int c = toku_msg_leafval_heaviside(kdbt, be); |
| 1411 | if (c >= 0) goto fz; |
| 1412 | r = DB_NOTFOUND; |
| 1413 | } else { |
| 1414 | fz: |
| 1415 | r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>( |
| 1416 | be, |
| 1417 | &storeddata, |
| 1418 | &key, |
| 1419 | &keylen, |
| 1420 | &idx |
| 1421 | ); |
| 1422 | } |
| 1423 | if (r==DB_NOTFOUND) { |
| 1424 | storeddata = 0; |
| 1425 | } else { |
| 1426 | assert_zero(r); |
| 1427 | } |
| 1428 | toku_ft_bn_apply_msg_once( |
| 1429 | bn, |
| 1430 | msg, |
| 1431 | idx, |
| 1432 | keylen, |
| 1433 | storeddata, |
| 1434 | gc_info, |
| 1435 | workdone, |
| 1436 | stats_to_update, |
| 1437 | logical_rows_delta); |
| 1438 | |
| 1439 | // if the insertion point is within a window of the right edge of |
| 1440 | // the leaf then it is sequential |
| 1441 | // window = min(32, number of leaf entries/16) |
| 1442 | { |
| 1443 | uint32_t s = bn->data_buffer.num_klpairs(); |
| 1444 | uint32_t w = s / 16; |
| 1445 | if (w == 0) w = 1; |
| 1446 | if (w > 32) w = 32; |
| 1447 | |
| 1448 | // within the window? |
| 1449 | if (s - idx <= w) |
| 1450 | bn->seqinsert = doing_seqinsert + 1; |
| 1451 | } |
| 1452 | break; |
| 1453 | } |
| 1454 | case FT_DELETE_ANY: |
| 1455 | case FT_ABORT_ANY: |
| 1456 | case FT_COMMIT_ANY: { |
| 1457 | uint32_t idx; |
| 1458 | // Apply to all the matches |
| 1459 | |
| 1460 | r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>( |
| 1461 | be, |
| 1462 | &storeddata, |
| 1463 | &key, |
| 1464 | &keylen, |
| 1465 | &idx); |
| 1466 | if (r == DB_NOTFOUND) break; |
| 1467 | assert_zero(r); |
| 1468 | toku_ft_bn_apply_msg_once( |
| 1469 | bn, |
| 1470 | msg, |
| 1471 | idx, |
| 1472 | keylen, |
| 1473 | storeddata, |
| 1474 | gc_info, |
| 1475 | workdone, |
| 1476 | stats_to_update, |
| 1477 | logical_rows_delta); |
| 1478 | break; |
| 1479 | } |
| 1480 | case FT_OPTIMIZE_FOR_UPGRADE: |
| 1481 | // fall through so that optimize_for_upgrade performs rest of the optimize logic |
| 1482 | case FT_COMMIT_BROADCAST_ALL: |
| 1483 | case FT_OPTIMIZE: |
| 1484 | // Apply to all leafentries |
| 1485 | num_klpairs = bn->data_buffer.num_klpairs(); |
| 1486 | for (uint32_t idx = 0; idx < num_klpairs; ) { |
| 1487 | void* curr_keyp = NULL; |
| 1488 | uint32_t curr_keylen = 0; |
| 1489 | r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp); |
| 1490 | assert_zero(r); |
| 1491 | int deleted = 0; |
| 1492 | if (!le_is_clean(storeddata)) { //If already clean, nothing to do. |
| 1493 | // message application code needs a key in order to determine |
| 1494 | // how much work was done by this message. since this is a |
| 1495 | // broadcast message, we have to create a new message whose |
| 1496 | // key is the current le's key. |
| 1497 | DBT curr_keydbt; |
| 1498 | ft_msg curr_msg( |
| 1499 | toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), |
| 1500 | msg.vdbt(), |
| 1501 | msg.type(), |
| 1502 | msg.msn(), |
| 1503 | msg.xids()); |
| 1504 | toku_ft_bn_apply_msg_once( |
| 1505 | bn, |
| 1506 | curr_msg, |
| 1507 | idx, |
| 1508 | curr_keylen, |
| 1509 | storeddata, |
| 1510 | gc_info, |
| 1511 | workdone, |
| 1512 | stats_to_update, |
| 1513 | logical_rows_delta); |
| 1514 | // at this point, we cannot trust msg.kdbt to be valid. |
| 1515 | uint32_t new_dmt_size = bn->data_buffer.num_klpairs(); |
| 1516 | if (new_dmt_size != num_klpairs) { |
| 1517 | paranoid_invariant(new_dmt_size + 1 == num_klpairs); |
| 1518 | //Item was deleted. |
| 1519 | deleted = 1; |
| 1520 | } |
| 1521 | } |
| 1522 | if (deleted) |
| 1523 | num_klpairs--; |
| 1524 | else |
| 1525 | idx++; |
| 1526 | } |
| 1527 | paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs); |
| 1528 | |
| 1529 | break; |
| 1530 | case FT_COMMIT_BROADCAST_TXN: |
| 1531 | case FT_ABORT_BROADCAST_TXN: |
| 1532 | // Apply to all leafentries if txn is represented |
| 1533 | num_klpairs = bn->data_buffer.num_klpairs(); |
| 1534 | for (uint32_t idx = 0; idx < num_klpairs; ) { |
| 1535 | void* curr_keyp = NULL; |
| 1536 | uint32_t curr_keylen = 0; |
| 1537 | r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp); |
| 1538 | assert_zero(r); |
| 1539 | int deleted = 0; |
| 1540 | if (le_has_xids(storeddata, msg.xids())) { |
| 1541 | // message application code needs a key in order to determine |
| 1542 | // how much work was done by this message. since this is a |
| 1543 | // broadcast message, we have to create a new message whose key |
| 1544 | // is the current le's key. |
| 1545 | DBT curr_keydbt; |
| 1546 | ft_msg curr_msg( |
| 1547 | toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), |
| 1548 | msg.vdbt(), |
| 1549 | msg.type(), |
| 1550 | msg.msn(), |
| 1551 | msg.xids()); |
| 1552 | toku_ft_bn_apply_msg_once( |
| 1553 | bn, |
| 1554 | curr_msg, |
| 1555 | idx, |
| 1556 | curr_keylen, |
| 1557 | storeddata, |
| 1558 | gc_info, |
| 1559 | workdone, |
| 1560 | stats_to_update, |
| 1561 | logical_rows_delta); |
| 1562 | uint32_t new_dmt_size = bn->data_buffer.num_klpairs(); |
| 1563 | if (new_dmt_size != num_klpairs) { |
| 1564 | paranoid_invariant(new_dmt_size + 1 == num_klpairs); |
| 1565 | //Item was deleted. |
| 1566 | deleted = 1; |
| 1567 | } |
| 1568 | } |
| 1569 | if (deleted) |
| 1570 | num_klpairs--; |
| 1571 | else |
| 1572 | idx++; |
| 1573 | } |
| 1574 | paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs); |
| 1575 | |
| 1576 | break; |
| 1577 | case FT_UPDATE: { |
| 1578 | uint32_t idx; |
| 1579 | r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>( |
| 1580 | be, |
| 1581 | &storeddata, |
| 1582 | &key, |
| 1583 | &keylen, |
| 1584 | &idx |
| 1585 | ); |
| 1586 | if (r==DB_NOTFOUND) { |
| 1587 | { |
| 1588 | //Point to msg's copy of the key so we don't worry about le being freed |
| 1589 | //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled |
| 1590 | key = msg.kdbt()->data; |
| 1591 | keylen = msg.kdbt()->size; |
| 1592 | } |
| 1593 | r = do_update( |
| 1594 | update_fun, |
| 1595 | cmp.get_descriptor(), |
| 1596 | bn, |
| 1597 | msg, |
| 1598 | idx, |
| 1599 | NULL, |
| 1600 | NULL, |
| 1601 | 0, |
| 1602 | gc_info, |
| 1603 | workdone, |
| 1604 | stats_to_update, |
| 1605 | logical_rows_delta); |
| 1606 | } else if (r==0) { |
| 1607 | r = do_update( |
| 1608 | update_fun, |
| 1609 | cmp.get_descriptor(), |
| 1610 | bn, |
| 1611 | msg, |
| 1612 | idx, |
| 1613 | storeddata, |
| 1614 | key, |
| 1615 | keylen, |
| 1616 | gc_info, |
| 1617 | workdone, |
| 1618 | stats_to_update, |
| 1619 | logical_rows_delta); |
| 1620 | } // otherwise, a worse error, just return it |
| 1621 | break; |
| 1622 | } |
| 1623 | case FT_UPDATE_BROADCAST_ALL: { |
| 1624 | // apply to all leafentries. |
| 1625 | uint32_t idx = 0; |
| 1626 | uint32_t num_leafentries_before; |
| 1627 | // This is used to avoid having the logical row count changed on apply |
| 1628 | // of this message since it will return a negative number of the number |
| 1629 | // of leaf entries visited and cause the ft header value to go to 0; |
| 1630 | // This message will not change the number of rows, so just use the |
| 1631 | // bogus value. |
| 1632 | int64_t temp_logical_rows_delta = 0; |
| 1633 | while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) { |
| 1634 | void* curr_key = nullptr; |
| 1635 | uint32_t curr_keylen = 0; |
| 1636 | r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key); |
| 1637 | assert_zero(r); |
| 1638 | |
| 1639 | //TODO: 46 replace this with something better than cloning key |
| 1640 | // TODO: (Zardosht) This may be unnecessary now, due to how the key |
| 1641 | // is handled in the bndata. Investigate and determine |
| 1642 | char clone_mem[curr_keylen]; // only lasts one loop, alloca would overflow (end of function) |
| 1643 | memcpy((void*)clone_mem, curr_key, curr_keylen); |
| 1644 | curr_key = (void*)clone_mem; |
| 1645 | |
| 1646 | // This is broken below. Have a compilation error checked |
| 1647 | // in as a reminder |
| 1648 | r = do_update( |
| 1649 | update_fun, |
| 1650 | cmp.get_descriptor(), |
| 1651 | bn, |
| 1652 | msg, |
| 1653 | idx, |
| 1654 | storeddata, |
| 1655 | curr_key, |
| 1656 | curr_keylen, |
| 1657 | gc_info, |
| 1658 | workdone, |
| 1659 | stats_to_update, |
| 1660 | &temp_logical_rows_delta); |
| 1661 | assert_zero(r); |
| 1662 | |
| 1663 | if (num_leafentries_before == bn->data_buffer.num_klpairs()) { |
| 1664 | // we didn't delete something, so increment the index. |
| 1665 | idx++; |
| 1666 | } |
| 1667 | } |
| 1668 | break; |
| 1669 | } |
| 1670 | case FT_NONE: break; // don't do anything |
| 1671 | } |
| 1672 | |
| 1673 | return; |
| 1674 | } |
| 1675 | |
| 1676 | static inline int |
| 1677 | key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) { |
| 1678 | int r = cmp(a, b); |
| 1679 | if (r == 0) { |
| 1680 | if (amsn.msn > bmsn.msn) { |
| 1681 | r = +1; |
| 1682 | } else if (amsn.msn < bmsn.msn) { |
| 1683 | r = -1; |
| 1684 | } else { |
| 1685 | r = 0; |
| 1686 | } |
| 1687 | } |
| 1688 | return r; |
| 1689 | } |
| 1690 | |
| 1691 | int (const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &) { |
| 1692 | MSN query_msn; |
| 1693 | DBT query_key; |
| 1694 | extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn); |
| 1695 | return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp); |
| 1696 | } |
| 1697 | |
| 1698 | int (const struct toku_msg_buffer_key_msn_cmp_extra &, const int32_t &ao, const int32_t &bo) { |
| 1699 | MSN amsn, bmsn; |
| 1700 | DBT akey, bkey; |
| 1701 | extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn); |
| 1702 | extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn); |
| 1703 | return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp); |
| 1704 | } |
| 1705 | |
| 1706 | // Effect: Enqueue the message represented by the parameters into the |
| 1707 | // bnc's buffer, and put it in either the fresh or stale message tree, |
| 1708 | // or the broadcast list. |
| 1709 | static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) { |
| 1710 | int r = 0; |
| 1711 | int32_t offset; |
| 1712 | bnc->msg_buffer.enqueue(msg, is_fresh, &offset); |
| 1713 | enum ft_msg_type type = msg.type(); |
| 1714 | if (ft_msg_type_applies_once(type)) { |
| 1715 | DBT key; |
| 1716 | toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size); |
| 1717 | struct toku_msg_buffer_key_msn_heaviside_extra (cmp, &bnc->msg_buffer, &key, msg.msn()); |
| 1718 | if (is_fresh) { |
| 1719 | r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr); |
| 1720 | assert_zero(r); |
| 1721 | } else { |
| 1722 | r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr); |
| 1723 | assert_zero(r); |
| 1724 | } |
| 1725 | } else { |
| 1726 | invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)); |
| 1727 | const uint32_t idx = bnc->broadcast_list.size(); |
| 1728 | r = bnc->broadcast_list.insert_at(offset, idx); |
| 1729 | assert_zero(r); |
| 1730 | } |
| 1731 | } |
| 1732 | |
| 1733 | // This is only exported for tests. |
| 1734 | void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp) |
| 1735 | { |
| 1736 | DBT k, v; |
| 1737 | ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids); |
| 1738 | bnc_insert_msg(bnc, msg, is_fresh, cmp); |
| 1739 | } |
| 1740 | |
| 1741 | // append a msg to a nonleaf node's child buffer |
| 1742 | static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node, |
| 1743 | int childnum, const ft_msg &msg, bool is_fresh) { |
| 1744 | paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL); |
| 1745 | bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp); |
| 1746 | node->dirty = 1; |
| 1747 | } |
| 1748 | |
| 1749 | // This is only exported for tests. |
| 1750 | void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) { |
| 1751 | ft_msg msg(key, val, type, msn, xids); |
| 1752 | ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh); |
| 1753 | } |
| 1754 | |
| 1755 | static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[]) |
| 1756 | // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here. |
| 1757 | // Also we don't worry about the node getting overfull here. It's the caller's problem. |
| 1758 | { |
| 1759 | unsigned int childnum = (target_childnum >= 0 |
| 1760 | ? target_childnum |
| 1761 | : toku_ftnode_which_child(node, msg.kdbt(), cmp)); |
| 1762 | ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh); |
| 1763 | NONLEAF_CHILDINFO bnc = BNC(node, childnum); |
| 1764 | bnc->flow[0] += flow_deltas[0]; |
| 1765 | bnc->flow[1] += flow_deltas[1]; |
| 1766 | } |
| 1767 | |
| 1768 | // TODO: Remove me, I'm boring. |
| 1769 | static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) { |
| 1770 | return cmp(key, pivot); |
| 1771 | } |
| 1772 | |
| 1773 | /* Find the leftmost child that may contain the key. |
| 1774 | * If the key exists it will be in the child whose number |
| 1775 | * is the return value of this function. |
| 1776 | */ |
| 1777 | int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) { |
| 1778 | // a funny case of no pivots |
| 1779 | if (node->n_children <= 1) return 0; |
| 1780 | |
| 1781 | DBT pivot; |
| 1782 | |
| 1783 | // check the last key to optimize seq insertions |
| 1784 | int n = node->n_children-1; |
| 1785 | int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot)); |
| 1786 | if (c > 0) return n; |
| 1787 | |
| 1788 | // binary search the pivots |
| 1789 | int lo = 0; |
| 1790 | int hi = n-1; // skip the last one, we checked it above |
| 1791 | int mi; |
| 1792 | while (lo < hi) { |
| 1793 | mi = (lo + hi) / 2; |
| 1794 | c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot)); |
| 1795 | if (c > 0) { |
| 1796 | lo = mi+1; |
| 1797 | continue; |
| 1798 | } |
| 1799 | if (c < 0) { |
| 1800 | hi = mi; |
| 1801 | continue; |
| 1802 | } |
| 1803 | return mi; |
| 1804 | } |
| 1805 | return lo; |
| 1806 | } |
| 1807 | |
| 1808 | // Used for HOT. |
| 1809 | int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) { |
| 1810 | DBT pivot; |
| 1811 | int low = 0; |
| 1812 | int hi = node->n_children - 1; |
| 1813 | int mi; |
| 1814 | while (low < hi) { |
| 1815 | mi = (low + hi) / 2; |
| 1816 | int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot)); |
| 1817 | if (r > 0) { |
| 1818 | low = mi + 1; |
| 1819 | } else if (r < 0) { |
| 1820 | hi = mi; |
| 1821 | } else { |
| 1822 | // if they were exactly equal, then we want the sub-tree under |
| 1823 | // the next pivot. |
| 1824 | return mi + 1; |
| 1825 | } |
| 1826 | } |
| 1827 | invariant(low == hi); |
| 1828 | return low; |
| 1829 | } |
| 1830 | |
| 1831 | void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) { |
| 1832 | FTNODE CAST_FROM_VOIDP(node, value_data); |
| 1833 | node->ct_pair = p; |
| 1834 | } |
| 1835 | |
| 1836 | static void |
| 1837 | ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[]) |
| 1838 | // Effect: Put the message into a nonleaf node. We put it into all children, possibly causing the children to become reactive. |
| 1839 | // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. |
| 1840 | // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) |
| 1841 | { |
| 1842 | for (int i = 0; i < node->n_children; i++) { |
| 1843 | ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas); |
| 1844 | } |
| 1845 | } |
| 1846 | |
| 1847 | static void |
| 1848 | ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[]) |
| 1849 | // Effect: Put the message into a nonleaf node. We may put it into a child, possibly causing the child to become reactive. |
| 1850 | // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. |
| 1851 | // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) |
| 1852 | // |
| 1853 | { |
| 1854 | |
| 1855 | // |
| 1856 | // see comments in toku_ft_leaf_apply_msg |
| 1857 | // to understand why we handle setting |
| 1858 | // node->max_msn_applied_to_node_on_disk here, |
| 1859 | // and don't do it in toku_ftnode_put_msg |
| 1860 | // |
| 1861 | MSN msg_msn = msg.msn(); |
| 1862 | invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn); |
| 1863 | node->max_msn_applied_to_node_on_disk = msg_msn; |
| 1864 | |
| 1865 | if (ft_msg_type_applies_once(msg.type())) { |
| 1866 | ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas); |
| 1867 | } else if (ft_msg_type_applies_all(msg.type())) { |
| 1868 | ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas); |
| 1869 | } else { |
| 1870 | paranoid_invariant(ft_msg_type_does_nothing(msg.type())); |
| 1871 | } |
| 1872 | } |
| 1873 | |
| 1874 | // Garbage collect one leaf entry. |
| 1875 | static void |
| 1876 | ft_basement_node_gc_once(BASEMENTNODE bn, |
| 1877 | uint32_t index, |
| 1878 | void* keyp, |
| 1879 | uint32_t keylen, |
| 1880 | LEAFENTRY leaf_entry, |
| 1881 | txn_gc_info *gc_info, |
| 1882 | STAT64INFO_S * delta) |
| 1883 | { |
| 1884 | paranoid_invariant(leaf_entry); |
| 1885 | |
| 1886 | // Don't run garbage collection on non-mvcc leaf entries. |
| 1887 | if (leaf_entry->type != LE_MVCC) { |
| 1888 | goto exit; |
| 1889 | } |
| 1890 | |
| 1891 | // Don't run garbage collection if this leafentry decides it's not worth it. |
| 1892 | if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) { |
| 1893 | goto exit; |
| 1894 | } |
| 1895 | |
| 1896 | LEAFENTRY new_leaf_entry; |
| 1897 | new_leaf_entry = NULL; |
| 1898 | |
| 1899 | // The mempool doesn't free itself. When it allocates new memory, |
| 1900 | // this pointer will be set to the older memory that must now be |
| 1901 | // freed. |
| 1902 | void * maybe_free; |
| 1903 | maybe_free = NULL; |
| 1904 | |
| 1905 | // These will represent the number of bytes and rows changed as |
| 1906 | // part of the garbage collection. |
| 1907 | int64_t numbytes_delta; |
| 1908 | int64_t numrows_delta; |
| 1909 | toku_le_garbage_collect(leaf_entry, |
| 1910 | &bn->data_buffer, |
| 1911 | index, |
| 1912 | keyp, |
| 1913 | keylen, |
| 1914 | gc_info, |
| 1915 | &new_leaf_entry, |
| 1916 | &numbytes_delta); |
| 1917 | |
| 1918 | numrows_delta = 0; |
| 1919 | if (new_leaf_entry) { |
| 1920 | numrows_delta = 0; |
| 1921 | } else { |
| 1922 | numrows_delta = -1; |
| 1923 | } |
| 1924 | |
| 1925 | // If we created a new mempool buffer we must free the |
| 1926 | // old/original buffer. |
| 1927 | if (maybe_free) { |
| 1928 | toku_free(maybe_free); |
| 1929 | } |
| 1930 | |
| 1931 | // Update stats. |
| 1932 | bn->stat64_delta.numrows += numrows_delta; |
| 1933 | bn->stat64_delta.numbytes += numbytes_delta; |
| 1934 | delta->numrows += numrows_delta; |
| 1935 | delta->numbytes += numbytes_delta; |
| 1936 | |
| 1937 | exit: |
| 1938 | return; |
| 1939 | } |
| 1940 | |
| 1941 | // Garbage collect all leaf entries for a given basement node. |
| 1942 | static void |
| 1943 | basement_node_gc_all_les(BASEMENTNODE bn, |
| 1944 | txn_gc_info *gc_info, |
| 1945 | STAT64INFO_S * delta) |
| 1946 | { |
| 1947 | int r = 0; |
| 1948 | uint32_t index = 0; |
| 1949 | uint32_t num_leafentries_before; |
| 1950 | while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) { |
| 1951 | void* keyp = NULL; |
| 1952 | uint32_t keylen = 0; |
| 1953 | LEAFENTRY leaf_entry; |
| 1954 | r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp); |
| 1955 | assert_zero(r); |
| 1956 | ft_basement_node_gc_once( |
| 1957 | bn, |
| 1958 | index, |
| 1959 | keyp, |
| 1960 | keylen, |
| 1961 | leaf_entry, |
| 1962 | gc_info, |
| 1963 | delta |
| 1964 | ); |
| 1965 | // Check if the leaf entry was deleted or not. |
| 1966 | if (num_leafentries_before == bn->data_buffer.num_klpairs()) { |
| 1967 | ++index; |
| 1968 | } |
| 1969 | } |
| 1970 | } |
| 1971 | |
| 1972 | // Garbage collect all leaf entires in all basement nodes. |
| 1973 | static void |
| 1974 | ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info) |
| 1975 | { |
| 1976 | toku_ftnode_assert_fully_in_memory(node); |
| 1977 | paranoid_invariant_zero(node->height); |
| 1978 | // Loop through each leaf entry, garbage collecting as we go. |
| 1979 | for (int i = 0; i < node->n_children; ++i) { |
| 1980 | // Perform the garbage collection. |
| 1981 | BASEMENTNODE bn = BLB(node, i); |
| 1982 | STAT64INFO_S delta; |
| 1983 | delta.numrows = 0; |
| 1984 | delta.numbytes = 0; |
| 1985 | basement_node_gc_all_les(bn, gc_info, &delta); |
| 1986 | toku_ft_update_stats(&ft->in_memory_stats, delta); |
| 1987 | } |
| 1988 | } |
| 1989 | |
| 1990 | void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) { |
| 1991 | TOKULOGGER logger = toku_cachefile_logger(ft->cf); |
| 1992 | if (logger) { |
| 1993 | TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger); |
| 1994 | txn_manager_state txn_state_for_gc(txn_manager); |
| 1995 | txn_state_for_gc.init(); |
| 1996 | TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager); |
| 1997 | |
| 1998 | // Perform full garbage collection. |
| 1999 | // |
| 2000 | // - txn_state_for_gc |
| 2001 | // a fresh snapshot of the transaction system. |
| 2002 | // - oldest_referenced_xid_for_simple_gc |
| 2003 | // the oldest xid in any live list as of right now - suitible for simple gc |
| 2004 | // - node->oldest_referenced_xid_known |
| 2005 | // the last known oldest referenced xid for this node and any unapplied messages. |
| 2006 | // it is a lower bound on the actual oldest referenced xid - but becasue there |
| 2007 | // may be abort messages above us, we need to be careful to only use this value |
| 2008 | // for implicit promotion (as opposed to the oldest referenced xid for simple gc) |
| 2009 | // |
| 2010 | // The node has its own oldest referenced xid because it must be careful not to implicitly promote |
| 2011 | // provisional entries for transactions that are no longer live, but may have abort messages |
| 2012 | // somewhere above us in the tree. |
| 2013 | txn_gc_info gc_info(&txn_state_for_gc, |
| 2014 | oldest_referenced_xid_for_simple_gc, |
| 2015 | node->oldest_referenced_xid_known, |
| 2016 | true); |
| 2017 | ft_leaf_gc_all_les(ft, node, &gc_info); |
| 2018 | } |
| 2019 | } |
| 2020 | |
| 2021 | void toku_ftnode_put_msg( |
| 2022 | const toku::comparator &cmp, |
| 2023 | ft_update_func update_fun, |
| 2024 | FTNODE node, |
| 2025 | int target_childnum, |
| 2026 | const ft_msg &msg, |
| 2027 | bool is_fresh, |
| 2028 | txn_gc_info* gc_info, |
| 2029 | size_t flow_deltas[], |
| 2030 | STAT64INFO stats_to_update, |
| 2031 | int64_t* logical_rows_delta) { |
| 2032 | // Effect: Push message into the subtree rooted at NODE. |
| 2033 | // If NODE is a leaf, then |
| 2034 | // put message into leaf, applying it to the leafentries |
| 2035 | // If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren). |
| 2036 | // The node may become overfull. That's not our problem. |
| 2037 | toku_ftnode_assert_fully_in_memory(node); |
| 2038 | // |
| 2039 | // see comments in toku_ft_leaf_apply_msg |
| 2040 | // to understand why we don't handle setting |
| 2041 | // node->max_msn_applied_to_node_on_disk here, |
| 2042 | // and instead defer to these functions |
| 2043 | // |
| 2044 | if (node->height==0) { |
| 2045 | toku_ft_leaf_apply_msg( |
| 2046 | cmp, |
| 2047 | update_fun, |
| 2048 | node, |
| 2049 | target_childnum, msg, |
| 2050 | gc_info, |
| 2051 | nullptr, |
| 2052 | stats_to_update, |
| 2053 | logical_rows_delta); |
| 2054 | } else { |
| 2055 | ft_nonleaf_put_msg( |
| 2056 | cmp, |
| 2057 | node, |
| 2058 | target_childnum, |
| 2059 | msg, |
| 2060 | is_fresh, |
| 2061 | flow_deltas); |
| 2062 | } |
| 2063 | } |
| 2064 | |
| 2065 | // Effect: applies the message to the leaf if the appropriate basement node is |
| 2066 | // in memory. This function is called during message injection and/or |
| 2067 | // flushing, so the entire node MUST be in memory. |
| 2068 | void toku_ft_leaf_apply_msg( |
| 2069 | const toku::comparator& cmp, |
| 2070 | ft_update_func update_fun, |
| 2071 | FTNODE node, |
| 2072 | int target_childnum, // which child to inject to, or -1 if unknown |
| 2073 | const ft_msg& msg, |
| 2074 | txn_gc_info* gc_info, |
| 2075 | uint64_t* workdone, |
| 2076 | STAT64INFO stats_to_update, |
| 2077 | int64_t* logical_rows_delta) { |
| 2078 | |
| 2079 | VERIFY_NODE(t, node); |
| 2080 | toku_ftnode_assert_fully_in_memory(node); |
| 2081 | |
| 2082 | // |
| 2083 | // Because toku_ft_leaf_apply_msg is called with the intent of permanently |
| 2084 | // applying a message to a leaf node (meaning the message is permanently applied |
| 2085 | // and will be purged from the system after this call, as opposed to |
| 2086 | // toku_apply_ancestors_messages_to_node, which applies a message |
| 2087 | // for a query, but the message may still reside in the system and |
| 2088 | // be reapplied later), we mark the node as dirty and |
| 2089 | // take the opportunity to update node->max_msn_applied_to_node_on_disk. |
| 2090 | // |
| 2091 | node->dirty = 1; |
| 2092 | |
| 2093 | // |
| 2094 | // we cannot blindly update node->max_msn_applied_to_node_on_disk, |
| 2095 | // we must check to see if the msn is greater that the one already stored, |
| 2096 | // because the message may have already been applied earlier (via |
| 2097 | // toku_apply_ancestors_messages_to_node) to answer a query |
| 2098 | // |
| 2099 | // This is why we handle node->max_msn_applied_to_node_on_disk both here |
| 2100 | // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg. |
| 2101 | // |
| 2102 | MSN msg_msn = msg.msn(); |
| 2103 | if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) { |
| 2104 | node->max_msn_applied_to_node_on_disk = msg_msn; |
| 2105 | } |
| 2106 | |
| 2107 | if (ft_msg_type_applies_once(msg.type())) { |
| 2108 | unsigned int childnum = (target_childnum >= 0 |
| 2109 | ? target_childnum |
| 2110 | : toku_ftnode_which_child(node, msg.kdbt(), cmp)); |
| 2111 | BASEMENTNODE bn = BLB(node, childnum); |
| 2112 | if (msg.msn().msn > bn->max_msn_applied.msn) { |
| 2113 | bn->max_msn_applied = msg.msn(); |
| 2114 | toku_ft_bn_apply_msg( |
| 2115 | cmp, |
| 2116 | update_fun, |
| 2117 | bn, |
| 2118 | msg, |
| 2119 | gc_info, |
| 2120 | workdone, |
| 2121 | stats_to_update, |
| 2122 | logical_rows_delta); |
| 2123 | } else { |
| 2124 | toku_ft_status_note_msn_discard(); |
| 2125 | } |
| 2126 | } else if (ft_msg_type_applies_all(msg.type())) { |
| 2127 | for (int childnum=0; childnum<node->n_children; childnum++) { |
| 2128 | if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) { |
| 2129 | BLB(node, childnum)->max_msn_applied = msg.msn(); |
| 2130 | toku_ft_bn_apply_msg( |
| 2131 | cmp, |
| 2132 | update_fun, |
| 2133 | BLB(node, childnum), |
| 2134 | msg, |
| 2135 | gc_info, |
| 2136 | workdone, |
| 2137 | stats_to_update, |
| 2138 | logical_rows_delta); |
| 2139 | } else { |
| 2140 | toku_ft_status_note_msn_discard(); |
| 2141 | } |
| 2142 | } |
| 2143 | } else if (!ft_msg_type_does_nothing(msg.type())) { |
| 2144 | invariant(ft_msg_type_does_nothing(msg.type())); |
| 2145 | } |
| 2146 | VERIFY_NODE(t, node); |
| 2147 | } |
| 2148 | |
| 2149 | |