1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | /*====== |
4 | This file is part of PerconaFT. |
5 | |
6 | |
7 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
8 | |
9 | PerconaFT is free software: you can redistribute it and/or modify |
10 | it under the terms of the GNU General Public License, version 2, |
11 | as published by the Free Software Foundation. |
12 | |
13 | PerconaFT is distributed in the hope that it will be useful, |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | GNU General Public License for more details. |
17 | |
18 | You should have received a copy of the GNU General Public License |
19 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
20 | |
21 | ---------------------------------------- |
22 | |
23 | PerconaFT is free software: you can redistribute it and/or modify |
24 | it under the terms of the GNU Affero General Public License, version 3, |
25 | as published by the Free Software Foundation. |
26 | |
27 | PerconaFT is distributed in the hope that it will be useful, |
28 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
29 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
30 | GNU Affero General Public License for more details. |
31 | |
32 | You should have received a copy of the GNU Affero General Public License |
33 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
34 | ======= */ |
35 | |
36 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
37 | |
38 | #include <my_global.h> |
39 | #include "ft/ft.h" |
40 | #include "ft/ft-internal.h" |
41 | #include "ft/serialize/ft_node-serialize.h" |
42 | #include "ft/node.h" |
43 | #include "ft/serialize/rbuf.h" |
44 | #include "ft/serialize/wbuf.h" |
45 | #include "util/scoped_malloc.h" |
46 | #include "util/sort.h" |
47 | |
48 | // Effect: Fill in N as an empty ftnode. |
49 | // TODO: Rename toku_ftnode_create |
50 | void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) { |
51 | paranoid_invariant(layout_version != 0); |
52 | paranoid_invariant(height >= 0); |
53 | |
54 | n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others |
55 | n->flags = flags; |
56 | n->blocknum = blocknum; |
57 | n->layout_version = layout_version; |
58 | n->layout_version_original = layout_version; |
59 | n->layout_version_read_from_disk = layout_version; |
60 | n->height = height; |
61 | n->pivotkeys.create_empty(); |
62 | n->bp = 0; |
63 | n->n_children = num_children; |
64 | n->oldest_referenced_xid_known = TXNID_NONE; |
65 | |
66 | if (num_children > 0) { |
67 | XMALLOC_N(num_children, n->bp); |
68 | for (int i = 0; i < num_children; i++) { |
69 | BP_BLOCKNUM(n,i).b=0; |
70 | BP_STATE(n,i) = PT_INVALID; |
71 | BP_WORKDONE(n,i) = 0; |
72 | BP_INIT_TOUCHED_CLOCK(n, i); |
73 | set_BNULL(n,i); |
74 | if (height > 0) { |
75 | set_BNC(n, i, toku_create_empty_nl()); |
76 | } else { |
77 | set_BLB(n, i, toku_create_empty_bn()); |
78 | } |
79 | } |
80 | } |
81 | n->dirty = 1; // special case exception, it's okay to mark as dirty because the basements are empty |
82 | |
83 | toku_ft_status_note_ftnode(height, true); |
84 | } |
85 | |
86 | // destroys the internals of the ftnode, but it does not free the values |
87 | // that are stored |
88 | // this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf |
89 | // MUST NOT do anything besides free the structures that have been allocated |
90 | void toku_destroy_ftnode_internals(FTNODE node) { |
91 | node->pivotkeys.destroy(); |
92 | for (int i = 0; i < node->n_children; i++) { |
93 | if (BP_STATE(node,i) == PT_AVAIL) { |
94 | if (node->height > 0) { |
95 | destroy_nonleaf_childinfo(BNC(node,i)); |
96 | } else { |
97 | paranoid_invariant(BLB_LRD(node, i) == 0); |
98 | destroy_basement_node(BLB(node, i)); |
99 | } |
100 | } else if (BP_STATE(node,i) == PT_COMPRESSED) { |
101 | SUB_BLOCK sb = BSB(node,i); |
102 | toku_free(sb->compressed_ptr); |
103 | toku_free(sb); |
104 | } else { |
105 | paranoid_invariant(is_BNULL(node, i)); |
106 | } |
107 | set_BNULL(node, i); |
108 | } |
109 | toku_free(node->bp); |
110 | node->bp = NULL; |
111 | } |
112 | |
113 | /* Frees a node, including all the stuff in the hash table. */ |
114 | void toku_ftnode_free(FTNODE *nodep) { |
115 | FTNODE node = *nodep; |
116 | toku_ft_status_note_ftnode(node->height, false); |
117 | toku_destroy_ftnode_internals(node); |
118 | toku_free(node); |
119 | *nodep = nullptr; |
120 | } |
121 | |
122 | void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) { |
123 | STAT64INFO_S deltas = ZEROSTATS; |
124 | // capture deltas before rebalancing basements for serialization |
125 | deltas = toku_get_and_clear_basement_stats(ftnode); |
126 | // locking not necessary here with respect to checkpointing |
127 | // in Clayface (because of the pending lock and cachetable lock |
128 | // in toku_cachetable_begin_checkpoint) |
129 | // essentially, if we are dealing with a for_checkpoint |
130 | // parameter in a function that is called by the flush_callback, |
131 | // then the cachetable needs to ensure that this is called in a safe |
132 | // manner that does not interfere with the beginning |
133 | // of a checkpoint, which it does with the cachetable lock |
134 | // and pending lock |
135 | toku_ft_update_stats(&ft->h->on_disk_stats, deltas); |
136 | if (for_checkpoint) { |
137 | toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas); |
138 | } |
139 | } |
140 | |
141 | void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) { |
142 | for (int i = 0; i < node->n_children; i++) { |
143 | BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i); |
144 | paranoid_invariant(BP_STATE(node,i) == PT_AVAIL); |
145 | BP_STATE(cloned_node,i) = PT_AVAIL; |
146 | BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i); |
147 | if (node->height == 0) { |
148 | set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i))); |
149 | } else { |
150 | set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i))); |
151 | } |
152 | } |
153 | } |
154 | |
155 | void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) { |
156 | // free the basement node |
157 | assert(!node->dirty); |
158 | BASEMENTNODE bn = BLB(node, childnum); |
159 | toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta); |
160 | destroy_basement_node(bn); |
161 | set_BNULL(node, childnum); |
162 | BP_STATE(node, childnum) = PT_ON_DISK; |
163 | } |
164 | |
165 | BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) { |
166 | assert(BP_STATE(node, childnum) == PT_AVAIL); |
167 | BASEMENTNODE bn = BLB(node, childnum); |
168 | set_BNULL(node, childnum); |
169 | BP_STATE(node, childnum) = PT_ON_DISK; |
170 | return bn; |
171 | } |
172 | |
173 | // |
174 | // Orthopush |
175 | // |
176 | |
177 | struct { |
178 | int32_t *; |
179 | int ; |
180 | }; |
181 | |
182 | int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const ) __attribute__((nonnull(3))); |
183 | int (const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const ) |
184 | { |
185 | extra->offsets[extra->i] = offset; |
186 | extra->i++; |
187 | return 0; |
188 | } |
189 | |
190 | /** |
191 | * Given pointers to offsets within a message buffer where we can find messages, |
192 | * figure out the MSN of each message, and compare those MSNs. Returns 1, |
193 | * 0, or -1 if a is larger than, equal to, or smaller than b. |
194 | */ |
195 | int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo); |
196 | int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo) |
197 | { |
198 | MSN amsn, bmsn; |
199 | msg_buffer.get_message_key_msn(ao, nullptr, &amsn); |
200 | msg_buffer.get_message_key_msn(bo, nullptr, &bmsn); |
201 | if (amsn.msn > bmsn.msn) { |
202 | return +1; |
203 | } |
204 | if (amsn.msn < bmsn.msn) { |
205 | return -1; |
206 | } |
207 | return 0; |
208 | } |
209 | |
210 | /** |
211 | * Given a message buffer and and offset, apply the message with |
212 | * toku_ft_bn_apply_msg, or discard it, |
213 | * based on its MSN and the MSN of the basement node. |
214 | */ |
215 | static void do_bn_apply_msg( |
216 | FT_HANDLE ft_handle, |
217 | BASEMENTNODE bn, |
218 | message_buffer* msg_buffer, |
219 | int32_t offset, |
220 | txn_gc_info* gc_info, |
221 | uint64_t* workdone, |
222 | STAT64INFO stats_to_update, |
223 | int64_t* logical_rows_delta) { |
224 | |
225 | DBT k, v; |
226 | ft_msg msg = msg_buffer->get_message(offset, &k, &v); |
227 | |
228 | // The messages are being iterated over in (key,msn) order or just in |
229 | // msn order, so all the messages for one key, from one buffer, are in |
230 | // ascending msn order. So it's ok that we don't update the basement |
231 | // node's msn until the end. |
232 | if (msg.msn().msn > bn->max_msn_applied.msn) { |
233 | toku_ft_bn_apply_msg( |
234 | ft_handle->ft->cmp, |
235 | ft_handle->ft->update_fun, |
236 | bn, |
237 | msg, |
238 | gc_info, |
239 | workdone, |
240 | stats_to_update, |
241 | logical_rows_delta); |
242 | } else { |
243 | toku_ft_status_note_msn_discard(); |
244 | } |
245 | |
246 | // We must always mark message as stale since it has been marked |
247 | // (using omt::iterate_and_mark_range) |
248 | // It is possible to call do_bn_apply_msg even when it won't apply the |
249 | // message because the node containing it could have been evicted and |
250 | // brought back in. |
251 | msg_buffer->set_freshness(offset, false); |
252 | } |
253 | |
254 | |
255 | struct { |
256 | FT_HANDLE ; |
257 | BASEMENTNODE ; |
258 | NONLEAF_CHILDINFO ; |
259 | txn_gc_info *; |
260 | uint64_t *; |
261 | STAT64INFO ; |
262 | int64_t *; |
263 | }; |
264 | |
265 | int iterate_do_bn_apply_msg( |
266 | const int32_t &offset, |
267 | const uint32_t UU(idx), |
268 | struct iterate_do_bn_apply_msg_extra* const e) |
269 | __attribute__((nonnull(3))); |
270 | |
271 | int ( |
272 | const int32_t &offset, |
273 | const uint32_t UU(idx), |
274 | struct iterate_do_bn_apply_msg_extra* const e) |
275 | { |
276 | do_bn_apply_msg( |
277 | e->t, |
278 | e->bn, |
279 | &e->bnc->msg_buffer, |
280 | offset, |
281 | e->gc_info, |
282 | e->workdone, |
283 | e->stats_to_update, |
284 | e->logical_rows_delta); |
285 | return 0; |
286 | } |
287 | |
288 | /** |
289 | * Given the bounds of the basement node to which we will apply messages, |
290 | * find the indexes within message_tree which contain the range of |
291 | * relevant messages. |
292 | * |
293 | * The message tree contains offsets into the buffer, where messages are |
294 | * found. The pivot_bounds are the lower bound exclusive and upper bound |
295 | * inclusive, because they come from pivot keys in the tree. We want OMT |
296 | * indices, which must have the lower bound be inclusive and the upper |
297 | * bound exclusive. We will get these by telling omt::find to look |
298 | * for something strictly bigger than each of our pivot bounds. |
299 | * |
300 | * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper |
301 | * bound exclusive). |
302 | */ |
303 | template<typename find_bounds_omt_t> |
304 | static void |
305 | find_bounds_within_message_tree( |
306 | const toku::comparator &cmp, |
307 | const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices |
308 | message_buffer *msg_buffer, /// message buffer in which messages are found |
309 | const pivot_bounds &bounds, /// key bounds within the basement node we're applying messages to |
310 | uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree) |
311 | uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree) |
312 | ) |
313 | { |
314 | int r = 0; |
315 | |
316 | if (!toku_dbt_is_empty(bounds.lbe())) { |
317 | // By setting msn to MAX_MSN and by using direction of +1, we will |
318 | // get the first message greater than (in (key, msn) order) any |
319 | // message (with any msn) with the key lower_bound_exclusive. |
320 | // This will be a message we want to try applying, so it is the |
321 | // "lower bound inclusive" within the message_tree. |
322 | struct toku_msg_buffer_key_msn_heaviside_extra (cmp, msg_buffer, bounds.lbe(), MAX_MSN); |
323 | int32_t found_lb; |
324 | r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi); |
325 | if (r == DB_NOTFOUND) { |
326 | // There is no relevant data (the lower bound is bigger than |
327 | // any message in this tree), so we have no range and we're |
328 | // done. |
329 | *lbi = 0; |
330 | *ube = 0; |
331 | return; |
332 | } |
333 | if (!toku_dbt_is_empty(bounds.ubi())) { |
334 | // Check if what we found for lbi is greater than the upper |
335 | // bound inclusive that we have. If so, there are no relevant |
336 | // messages between these bounds. |
337 | const DBT *ubi = bounds.ubi(); |
338 | const int32_t offset = found_lb; |
339 | DBT found_lbidbt; |
340 | msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr); |
341 | int c = cmp(&found_lbidbt, ubi); |
342 | // These DBTs really are both inclusive bounds, so we need |
343 | // strict inequality in order to determine that there's |
344 | // nothing between them. If they're equal, then we actually |
345 | // need to apply the message pointed to by lbi, and also |
346 | // anything with the same key but a bigger msn. |
347 | if (c > 0) { |
348 | *lbi = 0; |
349 | *ube = 0; |
350 | return; |
351 | } |
352 | } |
353 | } else { |
354 | // No lower bound given, it's negative infinity, so we start at |
355 | // the first message in the OMT. |
356 | *lbi = 0; |
357 | } |
358 | if (!toku_dbt_is_empty(bounds.ubi())) { |
359 | // Again, we use an msn of MAX_MSN and a direction of +1 to get |
360 | // the first thing bigger than the upper_bound_inclusive key. |
361 | // This is therefore the smallest thing we don't want to apply, |
362 | // and omt::iterate_on_range will not examine it. |
363 | struct toku_msg_buffer_key_msn_heaviside_extra (cmp, msg_buffer, bounds.ubi(), MAX_MSN); |
364 | r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube); |
365 | if (r == DB_NOTFOUND) { |
366 | // Couldn't find anything in the buffer bigger than our key, |
367 | // so we need to look at everything up to the end of |
368 | // message_tree. |
369 | *ube = message_tree.size(); |
370 | } |
371 | } else { |
372 | // No upper bound given, it's positive infinity, so we need to go |
373 | // through the end of the OMT. |
374 | *ube = message_tree.size(); |
375 | } |
376 | } |
377 | |
378 | // For each message in the ancestor's buffer (determined by childnum) that |
379 | // is key-wise between lower_bound_exclusive and upper_bound_inclusive, |
380 | // apply the message to the basement node. We treat the bounds as minus |
381 | // or plus infinity respectively if they are NULL. Do not mark the node |
382 | // as dirty (preserve previous state of 'dirty' bit). |
383 | static void bnc_apply_messages_to_basement_node( |
384 | FT_HANDLE t, // used for comparison function |
385 | BASEMENTNODE bn, // where to apply messages |
386 | FTNODE ancestor, // the ancestor node where we can find messages to apply |
387 | int childnum, // which child buffer of ancestor contains messages we want |
388 | const pivot_bounds & |
389 | bounds, // contains pivot key bounds of this basement node |
390 | txn_gc_info *gc_info, |
391 | bool *msgs_applied) { |
392 | int r; |
393 | NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum); |
394 | |
395 | // Determine the offsets in the message trees between which we need to |
396 | // apply messages from this buffer |
397 | STAT64INFO_S stats_delta = {0, 0}; |
398 | uint64_t workdone_this_ancestor = 0; |
399 | int64_t logical_rows_delta = 0; |
400 | |
401 | uint32_t stale_lbi, stale_ube; |
402 | if (!bn->stale_ancestor_messages_applied) { |
403 | find_bounds_within_message_tree(t->ft->cmp, |
404 | bnc->stale_message_tree, |
405 | &bnc->msg_buffer, |
406 | bounds, |
407 | &stale_lbi, |
408 | &stale_ube); |
409 | } else { |
410 | stale_lbi = 0; |
411 | stale_ube = 0; |
412 | } |
413 | uint32_t fresh_lbi, fresh_ube; |
414 | find_bounds_within_message_tree(t->ft->cmp, |
415 | bnc->fresh_message_tree, |
416 | &bnc->msg_buffer, |
417 | bounds, |
418 | &fresh_lbi, |
419 | &fresh_ube); |
420 | |
421 | // We now know where all the messages we must apply are, so one of the |
422 | // following 4 cases will do the application, depending on which of |
423 | // the lists contains relevant messages: |
424 | // |
425 | // 1. broadcast messages and anything else, or a mix of fresh and stale |
426 | // 2. only fresh messages |
427 | // 3. only stale messages |
428 | if (bnc->broadcast_list.size() > 0 || |
429 | (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) { |
430 | // We have messages in multiple trees, so we grab all |
431 | // the relevant messages' offsets and sort them by MSN, then apply |
432 | // them in MSN order. |
433 | const int buffer_size = |
434 | ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + |
435 | bnc->broadcast_list.size()); |
436 | toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t)); |
437 | int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get()); |
438 | struct store_msg_buffer_offset_extra = {.offsets = offsets, |
439 | .i = 0}; |
440 | |
441 | // Populate offsets array with offsets to stale messages |
442 | r = bnc->stale_message_tree |
443 | .iterate_on_range<struct store_msg_buffer_offset_extra, |
444 | store_msg_buffer_offset>( |
445 | stale_lbi, stale_ube, &sfo_extra); |
446 | assert_zero(r); |
447 | |
448 | // Then store fresh offsets, and mark them to be moved to stale later. |
449 | r = bnc->fresh_message_tree |
450 | .iterate_and_mark_range<struct store_msg_buffer_offset_extra, |
451 | store_msg_buffer_offset>( |
452 | fresh_lbi, fresh_ube, &sfo_extra); |
453 | assert_zero(r); |
454 | |
455 | // Store offsets of all broadcast messages. |
456 | r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra, |
457 | store_msg_buffer_offset>(&sfo_extra); |
458 | assert_zero(r); |
459 | invariant(sfo_extra.i == buffer_size); |
460 | |
461 | // Sort by MSN. |
462 | toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>:: |
463 | mergesort_r(offsets, buffer_size, bnc->msg_buffer); |
464 | |
465 | // Apply the messages in MSN order. |
466 | for (int i = 0; i < buffer_size; ++i) { |
467 | *msgs_applied = true; |
468 | do_bn_apply_msg(t, |
469 | bn, |
470 | &bnc->msg_buffer, |
471 | offsets[i], |
472 | gc_info, |
473 | &workdone_this_ancestor, |
474 | &stats_delta, |
475 | &logical_rows_delta); |
476 | } |
477 | } else if (stale_lbi == stale_ube) { |
478 | // No stale messages to apply, we just apply fresh messages, and mark |
479 | // them to be moved to stale later. |
480 | struct iterate_do_bn_apply_msg_extra = { |
481 | .t = t, |
482 | .bn = bn, |
483 | .bnc = bnc, |
484 | .gc_info = gc_info, |
485 | .workdone = &workdone_this_ancestor, |
486 | .stats_to_update = &stats_delta, |
487 | .logical_rows_delta = &logical_rows_delta}; |
488 | if (fresh_ube - fresh_lbi > 0) |
489 | *msgs_applied = true; |
490 | r = bnc->fresh_message_tree |
491 | .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra, |
492 | iterate_do_bn_apply_msg>( |
493 | fresh_lbi, fresh_ube, &iter_extra); |
494 | assert_zero(r); |
495 | } else { |
496 | invariant(fresh_lbi == fresh_ube); |
497 | // No fresh messages to apply, we just apply stale messages. |
498 | |
499 | if (stale_ube - stale_lbi > 0) |
500 | *msgs_applied = true; |
501 | struct iterate_do_bn_apply_msg_extra = { |
502 | .t = t, |
503 | .bn = bn, |
504 | .bnc = bnc, |
505 | .gc_info = gc_info, |
506 | .workdone = &workdone_this_ancestor, |
507 | .stats_to_update = &stats_delta, |
508 | .logical_rows_delta = &logical_rows_delta}; |
509 | |
510 | r = bnc->stale_message_tree |
511 | .iterate_on_range<struct iterate_do_bn_apply_msg_extra, |
512 | iterate_do_bn_apply_msg>( |
513 | stale_lbi, stale_ube, &iter_extra); |
514 | assert_zero(r); |
515 | } |
516 | // |
517 | // update stats |
518 | // |
519 | if (workdone_this_ancestor > 0) { |
520 | (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum), |
521 | workdone_this_ancestor); |
522 | } |
523 | if (stats_delta.numbytes || stats_delta.numrows) { |
524 | toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta); |
525 | } |
526 | toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta); |
527 | bn->logical_rows_delta += logical_rows_delta; |
528 | } |
529 | |
530 | static void |
531 | apply_ancestors_messages_to_bn( |
532 | FT_HANDLE t, |
533 | FTNODE node, |
534 | int childnum, |
535 | ANCESTORS ancestors, |
536 | const pivot_bounds &bounds, |
537 | txn_gc_info *gc_info, |
538 | bool* msgs_applied |
539 | ) |
540 | { |
541 | BASEMENTNODE curr_bn = BLB(node, childnum); |
542 | const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum); |
543 | for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { |
544 | if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) { |
545 | paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); |
546 | bnc_apply_messages_to_basement_node( |
547 | t, |
548 | curr_bn, |
549 | curr_ancestors->node, |
550 | curr_ancestors->childnum, |
551 | curr_bounds, |
552 | gc_info, |
553 | msgs_applied |
554 | ); |
555 | // We don't want to check this ancestor node again if the |
556 | // next time we query it, the msn hasn't changed. |
557 | curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk; |
558 | } |
559 | } |
560 | // At this point, we know all the stale messages above this |
561 | // basement node have been applied, and any new messages will be |
562 | // fresh, so we don't need to look at stale messages for this |
563 | // basement node, unless it gets evicted (and this field becomes |
564 | // false when it's read in again). |
565 | curr_bn->stale_ancestor_messages_applied = true; |
566 | } |
567 | |
568 | void |
569 | toku_apply_ancestors_messages_to_node ( |
570 | FT_HANDLE t, |
571 | FTNODE node, |
572 | ANCESTORS ancestors, |
573 | const pivot_bounds &bounds, |
574 | bool* msgs_applied, |
575 | int child_to_read |
576 | ) |
577 | // Effect: |
578 | // Bring a leaf node up-to-date according to all the messages in the ancestors. |
579 | // If the leaf node is already up-to-date then do nothing. |
580 | // If the leaf node is not already up-to-date, then record the work done |
581 | // for that leaf in each ancestor. |
582 | // Requires: |
583 | // This is being called when pinning a leaf node for the query path. |
584 | // The entire root-to-leaf path is pinned and appears in the ancestors list. |
585 | { |
586 | VERIFY_NODE(t, node); |
587 | paranoid_invariant(node->height == 0); |
588 | |
589 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t); |
590 | txn_manager_state txn_state_for_gc(txn_manager); |
591 | |
592 | TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t); |
593 | txn_gc_info gc_info(&txn_state_for_gc, |
594 | oldest_referenced_xid_for_simple_gc, |
595 | node->oldest_referenced_xid_known, |
596 | true); |
597 | if (!node->dirty && child_to_read >= 0) { |
598 | paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL); |
599 | apply_ancestors_messages_to_bn( |
600 | t, |
601 | node, |
602 | child_to_read, |
603 | ancestors, |
604 | bounds, |
605 | &gc_info, |
606 | msgs_applied |
607 | ); |
608 | } |
609 | else { |
610 | // know we are a leaf node |
611 | // An important invariant: |
612 | // We MUST bring every available basement node for a dirty node up to date. |
613 | // flushing on the cleaner thread depends on this. This invariant |
614 | // allows the cleaner thread to just pick an internal node and flush it |
615 | // as opposed to being forced to start from the root. |
616 | for (int i = 0; i < node->n_children; i++) { |
617 | if (BP_STATE(node, i) != PT_AVAIL) { continue; } |
618 | apply_ancestors_messages_to_bn( |
619 | t, |
620 | node, |
621 | i, |
622 | ancestors, |
623 | bounds, |
624 | &gc_info, |
625 | msgs_applied |
626 | ); |
627 | } |
628 | } |
629 | VERIFY_NODE(t, node); |
630 | } |
631 | |
632 | static bool bn_needs_ancestors_messages( |
633 | FT ft, |
634 | FTNODE node, |
635 | int childnum, |
636 | const pivot_bounds &bounds, |
637 | ANCESTORS ancestors, |
638 | MSN* max_msn_applied |
639 | ) |
640 | { |
641 | BASEMENTNODE bn = BLB(node, childnum); |
642 | const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum); |
643 | bool needs_ancestors_messages = false; |
644 | for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { |
645 | if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) { |
646 | paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); |
647 | NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum); |
648 | if (bnc->broadcast_list.size() > 0) { |
649 | needs_ancestors_messages = true; |
650 | goto cleanup; |
651 | } |
652 | if (!bn->stale_ancestor_messages_applied) { |
653 | uint32_t stale_lbi, stale_ube; |
654 | find_bounds_within_message_tree(ft->cmp, |
655 | bnc->stale_message_tree, |
656 | &bnc->msg_buffer, |
657 | curr_bounds, |
658 | &stale_lbi, |
659 | &stale_ube); |
660 | if (stale_lbi < stale_ube) { |
661 | needs_ancestors_messages = true; |
662 | goto cleanup; |
663 | } |
664 | } |
665 | uint32_t fresh_lbi, fresh_ube; |
666 | find_bounds_within_message_tree(ft->cmp, |
667 | bnc->fresh_message_tree, |
668 | &bnc->msg_buffer, |
669 | curr_bounds, |
670 | &fresh_lbi, |
671 | &fresh_ube); |
672 | if (fresh_lbi < fresh_ube) { |
673 | needs_ancestors_messages = true; |
674 | goto cleanup; |
675 | } |
676 | if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) { |
677 | max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn; |
678 | } |
679 | } |
680 | } |
681 | cleanup: |
682 | return needs_ancestors_messages; |
683 | } |
684 | |
685 | bool toku_ft_leaf_needs_ancestors_messages( |
686 | FT ft, |
687 | FTNODE node, |
688 | ANCESTORS ancestors, |
689 | const pivot_bounds &bounds, |
690 | MSN *const max_msn_in_path, |
691 | int child_to_read |
692 | ) |
693 | // Effect: Determine whether there are messages in a node's ancestors |
694 | // which must be applied to it. These messages are in the correct |
695 | // keyrange for any available basement nodes, and are in nodes with the |
696 | // correct max_msn_applied_to_node_on_disk. |
697 | // Notes: |
698 | // This is an approximate query. |
699 | // Output: |
700 | // max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over |
701 | // ancestors. This is used later to update basement nodes' |
702 | // max_msn_applied values in case we don't do the full algorithm. |
703 | // Returns: |
704 | // true if there may be some such messages |
705 | // false only if there are definitely no such messages |
706 | // Rationale: |
707 | // When we pin a node with a read lock, we want to quickly determine if |
708 | // we should exchange it for a write lock in preparation for applying |
709 | // messages. If there are no messages, we don't need the write lock. |
710 | { |
711 | paranoid_invariant(node->height == 0); |
712 | bool needs_ancestors_messages = false; |
713 | // child_to_read may be -1 in test cases |
714 | if (!node->dirty && child_to_read >= 0) { |
715 | paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL); |
716 | needs_ancestors_messages = bn_needs_ancestors_messages( |
717 | ft, |
718 | node, |
719 | child_to_read, |
720 | bounds, |
721 | ancestors, |
722 | max_msn_in_path |
723 | ); |
724 | } |
725 | else { |
726 | for (int i = 0; i < node->n_children; ++i) { |
727 | if (BP_STATE(node, i) != PT_AVAIL) { continue; } |
728 | needs_ancestors_messages = bn_needs_ancestors_messages( |
729 | ft, |
730 | node, |
731 | i, |
732 | bounds, |
733 | ancestors, |
734 | max_msn_in_path |
735 | ); |
736 | if (needs_ancestors_messages) { |
737 | goto cleanup; |
738 | } |
739 | } |
740 | } |
741 | cleanup: |
742 | return needs_ancestors_messages; |
743 | } |
744 | |
745 | void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) { |
746 | invariant(node->height == 0); |
747 | if (!node->dirty && child_to_read >= 0) { |
748 | paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL); |
749 | BASEMENTNODE bn = BLB(node, child_to_read); |
750 | if (max_msn_applied.msn > bn->max_msn_applied.msn) { |
751 | // see comment below |
752 | (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); |
753 | } |
754 | } |
755 | else { |
756 | for (int i = 0; i < node->n_children; ++i) { |
757 | if (BP_STATE(node, i) != PT_AVAIL) { continue; } |
758 | BASEMENTNODE bn = BLB(node, i); |
759 | if (max_msn_applied.msn > bn->max_msn_applied.msn) { |
760 | // This function runs in a shared access context, so to silence tools |
761 | // like DRD, we use a CAS and ignore the result. |
762 | // Any threads trying to update these basement nodes should be |
763 | // updating them to the same thing (since they all have a read lock on |
764 | // the same root-to-leaf path) so this is safe. |
765 | (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); |
766 | } |
767 | } |
768 | } |
769 | } |
770 | |
771 | struct { |
772 | FT ; |
773 | NONLEAF_CHILDINFO ; |
774 | }; |
775 | |
776 | int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const ) __attribute__((nonnull(3))); |
777 | int (const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const ) |
778 | { |
779 | MSN msn; |
780 | DBT key; |
781 | extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn); |
782 | struct toku_msg_buffer_key_msn_heaviside_extra (extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn); |
783 | int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr); |
784 | invariant_zero(r); |
785 | return 0; |
786 | } |
787 | |
788 | void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) { |
789 | struct copy_to_stale_extra = { .ft = ft, .bnc = bnc }; |
790 | int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra); |
791 | invariant_zero(r); |
792 | bnc->fresh_message_tree.delete_all_marked(); |
793 | } |
794 | |
795 | void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) { |
796 | invariant(node->height > 0); |
797 | for (int i = 0; i < node->n_children; ++i) { |
798 | if (BP_STATE(node, i) != PT_AVAIL) { |
799 | continue; |
800 | } |
801 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
802 | // We can't delete things out of the fresh tree inside the above |
803 | // procedures because we're still looking at the fresh tree. Instead |
804 | // we have to move messages after we're done looking at it. |
805 | toku_ft_bnc_move_messages_to_stale(ft, bnc); |
806 | } |
807 | } |
808 | |
809 | // |
810 | // Balance // Availibility // Size |
811 | |
812 | struct rebalance_array_info { |
813 | uint32_t offset; |
814 | LEAFENTRY *le_array; |
815 | uint32_t *key_sizes_array; |
816 | const void **key_ptr_array; |
817 | static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le, |
818 | const uint32_t idx, struct rebalance_array_info *const ai) { |
819 | ai->le_array[idx+ai->offset] = le; |
820 | ai->key_sizes_array[idx+ai->offset] = keylen; |
821 | ai->key_ptr_array[idx+ai->offset] = key; |
822 | return 0; |
823 | } |
824 | }; |
825 | |
826 | // There must still be at least one child |
827 | // Requires that all messages in buffers above have been applied. |
828 | // Because all messages above have been applied, setting msn of all new basements |
829 | // to max msn of existing basements is correct. (There cannot be any messages in |
830 | // buffers above that still need to be applied.) |
831 | void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) { |
832 | |
833 | assert(node->height == 0); |
834 | assert(node->dirty); |
835 | |
836 | uint32_t num_orig_basements = node->n_children; |
837 | // Count number of leaf entries in this leaf (num_le). |
838 | uint32_t num_le = 0; |
839 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
840 | num_le += BLB_DATA(node, i)->num_klpairs(); |
841 | } |
842 | |
843 | uint32_t num_alloc = num_le ? num_le : 1; // simplify logic below by always having at least one entry per array |
844 | |
845 | // Create an array of OMTVALUE's that store all the pointers to all the data. |
846 | // Each element in leafpointers is a pointer to a leaf. |
847 | toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc); |
848 | LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get()); |
849 | leafpointers[0] = NULL; |
850 | |
851 | toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc); |
852 | const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get()); |
853 | key_pointers[0] = NULL; |
854 | |
855 | toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc); |
856 | uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get()); |
857 | |
858 | // Capture pointers to old mempools' buffers (so they can be destroyed) |
859 | toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements); |
860 | BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get()); |
861 | old_bns[0] = NULL; |
862 | |
863 | uint32_t curr_le = 0; |
864 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
865 | bn_data* bd = BLB_DATA(node, i); |
866 | struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers }; |
867 | bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai); |
868 | curr_le += bd->num_klpairs(); |
869 | } |
870 | |
871 | // Create an array that will store indexes of new pivots. |
872 | // Each element in new_pivots is the index of a pivot key. |
873 | // (Allocating num_le of them is overkill, but num_le is an upper bound.) |
874 | toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc); |
875 | uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get()); |
876 | new_pivots[0] = 0; |
877 | |
878 | // Each element in le_sizes is the size of the leafentry pointed to by leafpointers. |
879 | toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc); |
880 | size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get()); |
881 | le_sizes[0] = 0; |
882 | |
883 | // Create an array that will store the size of each basement. |
884 | // This is the sum of the leaf sizes of all the leaves in that basement. |
885 | // We don't know how many basements there will be, so we use num_le as the upper bound. |
886 | |
887 | // Sum of all le sizes in a single basement |
888 | toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc); |
889 | size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get()); |
890 | |
891 | // Sum of all key sizes in a single basement |
892 | toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc); |
893 | size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get()); |
894 | |
895 | // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les). |
896 | // Each entry is the number of leafentries in this basement. (Again, num_le is overkill upper baound.) |
897 | toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc); |
898 | uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get()); |
899 | num_les_this_bn[0] = 0; |
900 | |
901 | // Figure out the new pivots. |
902 | // We need the index of each pivot, and for each basement we need |
903 | // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement). |
904 | uint32_t curr_pivot = 0; |
905 | uint32_t num_le_in_curr_bn = 0; |
906 | uint32_t bn_size_so_far = 0; |
907 | for (uint32_t i = 0; i < num_le; i++) { |
908 | uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]); |
909 | le_sizes[i] = curr_le_size; |
910 | if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) { |
911 | // cap off the current basement node to end with the element before i |
912 | new_pivots[curr_pivot] = i-1; |
913 | curr_pivot++; |
914 | num_le_in_curr_bn = 0; |
915 | bn_size_so_far = 0; |
916 | } |
917 | num_le_in_curr_bn++; |
918 | num_les_this_bn[curr_pivot] = num_le_in_curr_bn; |
919 | bn_le_sizes[curr_pivot] += curr_le_size; |
920 | bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i]; // uint32_t le_offset |
921 | bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i]; |
922 | } |
923 | // curr_pivot is now the total number of pivot keys in the leaf node |
924 | int num_pivots = curr_pivot; |
925 | int num_children = num_pivots + 1; |
926 | |
927 | // now we need to fill in the new basement nodes and pivots |
928 | |
929 | // TODO: (Zardosht) this is an ugly thing right now |
930 | // Need to figure out how to properly deal with seqinsert. |
931 | // I am not happy with how this is being |
932 | // handled with basement nodes |
933 | uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1); |
934 | |
935 | // choose the max msn applied to any basement as the max msn applied to all new basements |
936 | MSN max_msn = ZERO_MSN; |
937 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
938 | MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i); |
939 | max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn; |
940 | } |
941 | // remove the basement node in the node, we've saved a copy |
942 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
943 | // save a reference to the old basement nodes |
944 | // we will need them to ensure that the memory |
945 | // stays intact |
946 | old_bns[i] = toku_detach_bn(node, i); |
947 | } |
948 | // Now destroy the old basements, but do not destroy leaves |
949 | toku_destroy_ftnode_internals(node); |
950 | |
951 | // now reallocate pieces and start filling them in |
952 | invariant(num_children > 0); |
953 | |
954 | node->n_children = num_children; |
955 | XCALLOC_N(num_children, node->bp); // allocate pointers to basements (bp) |
956 | for (int i = 0; i < num_children; i++) { |
957 | set_BLB(node, i, toku_create_empty_bn()); // allocate empty basements and set bp pointers |
958 | } |
959 | |
960 | // now we start to fill in the data |
961 | |
962 | // first the pivots |
963 | toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT)); |
964 | DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get()); |
965 | for (int i = 0; i < num_pivots; i++) { |
966 | uint32_t size = key_sizes[new_pivots[i]]; |
967 | const void *key = key_pointers[new_pivots[i]]; |
968 | toku_fill_dbt(&pivotkeys[i], key, size); |
969 | } |
970 | node->pivotkeys.create_from_dbts(pivotkeys, num_pivots); |
971 | |
972 | uint32_t baseindex_this_bn = 0; |
973 | // now the basement nodes |
974 | for (int i = 0; i < num_children; i++) { |
975 | // put back seqinsert |
976 | BLB_SEQINSERT(node, i) = tmp_seqinsert; |
977 | |
978 | // create start (inclusive) and end (exclusive) boundaries for data of basement node |
979 | uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1; // index of first leaf in basement |
980 | uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1; // index of first leaf in next basement |
981 | uint32_t num_in_bn = curr_end - curr_start; // number of leaves in this basement |
982 | |
983 | // create indexes for new basement |
984 | invariant(baseindex_this_bn == curr_start); |
985 | uint32_t num_les_to_copy = num_les_this_bn[i]; |
986 | invariant(num_les_to_copy == num_in_bn); |
987 | |
988 | bn_data* bd = BLB_DATA(node, i); |
989 | bd->set_contents_as_clone_of_sorted_array( |
990 | num_les_to_copy, |
991 | &key_pointers[baseindex_this_bn], |
992 | &key_sizes[baseindex_this_bn], |
993 | &leafpointers[baseindex_this_bn], |
994 | &le_sizes[baseindex_this_bn], |
995 | bn_key_sizes[i], // Total key sizes |
996 | bn_le_sizes[i] // total le sizes |
997 | ); |
998 | |
999 | BP_STATE(node,i) = PT_AVAIL; |
1000 | BP_TOUCH_CLOCK(node,i); |
1001 | BLB_MAX_MSN_APPLIED(node,i) = max_msn; |
1002 | baseindex_this_bn += num_les_to_copy; // set to index of next bn |
1003 | } |
1004 | node->max_msn_applied_to_node_on_disk = max_msn; |
1005 | |
1006 | // destroy buffers of old mempools |
1007 | for (uint32_t i = 0; i < num_orig_basements; i++) { |
1008 | destroy_basement_node(old_bns[i]); |
1009 | } |
1010 | } |
1011 | |
1012 | bool toku_ftnode_fully_in_memory(FTNODE node) { |
1013 | for (int i = 0; i < node->n_children; i++) { |
1014 | if (BP_STATE(node,i) != PT_AVAIL) { |
1015 | return false; |
1016 | } |
1017 | } |
1018 | return true; |
1019 | } |
1020 | |
1021 | void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) { |
1022 | paranoid_invariant(toku_ftnode_fully_in_memory(node)); |
1023 | } |
1024 | |
1025 | uint32_t toku_ftnode_leaf_num_entries(FTNODE node) { |
1026 | toku_ftnode_assert_fully_in_memory(node); |
1027 | uint32_t num_entries = 0; |
1028 | for (int i = 0; i < node->n_children; i++) { |
1029 | num_entries += BLB_DATA(node, i)->num_klpairs(); |
1030 | } |
1031 | return num_entries; |
1032 | } |
1033 | |
1034 | enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) { |
1035 | enum reactivity re = RE_STABLE; |
1036 | toku_ftnode_assert_fully_in_memory(node); |
1037 | paranoid_invariant(node->height==0); |
1038 | unsigned int size = toku_serialize_ftnode_size(node); |
1039 | if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) { |
1040 | re = RE_FISSIBLE; |
1041 | } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) { |
1042 | re = RE_FUSIBLE; |
1043 | } |
1044 | return re; |
1045 | } |
1046 | |
1047 | enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) { |
1048 | paranoid_invariant(node->height > 0); |
1049 | int n_children = node->n_children; |
1050 | if (n_children > (int) fanout) { |
1051 | return RE_FISSIBLE; |
1052 | } |
1053 | if (n_children * 4 < (int) fanout) { |
1054 | return RE_FUSIBLE; |
1055 | } |
1056 | return RE_STABLE; |
1057 | } |
1058 | |
1059 | enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) { |
1060 | toku_ftnode_assert_fully_in_memory(node); |
1061 | if (node->height == 0) { |
1062 | return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize); |
1063 | } else { |
1064 | return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout); |
1065 | } |
1066 | } |
1067 | |
1068 | unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) { |
1069 | return bnc->msg_buffer.buffer_size_in_use(); |
1070 | } |
1071 | |
1072 | // Return true if the size of the buffers plus the amount of work done is large enough. |
1073 | // Return false if there is nothing to be flushed (the buffers empty). |
1074 | bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) { |
1075 | uint64_t size = toku_serialize_ftnode_size(node); |
1076 | |
1077 | bool buffers_are_empty = true; |
1078 | toku_ftnode_assert_fully_in_memory(node); |
1079 | // |
1080 | // the nonleaf node is gorged if the following holds true: |
1081 | // - the buffers are non-empty |
1082 | // - the total workdone by the buffers PLUS the size of the buffers |
1083 | // is greater than nodesize (which as of Maxwell should be |
1084 | // 4MB) |
1085 | // |
1086 | paranoid_invariant(node->height > 0); |
1087 | for (int child = 0; child < node->n_children; ++child) { |
1088 | size += BP_WORKDONE(node, child); |
1089 | } |
1090 | for (int child = 0; child < node->n_children; ++child) { |
1091 | if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) { |
1092 | buffers_are_empty = false; |
1093 | break; |
1094 | } |
1095 | } |
1096 | return ((size > nodesize) |
1097 | && |
1098 | (!buffers_are_empty)); |
1099 | } |
1100 | |
1101 | int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) { |
1102 | return bnc->msg_buffer.num_entries(); |
1103 | } |
1104 | |
1105 | // how much memory does this child buffer consume? |
1106 | long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) { |
1107 | return (sizeof(*bnc) + |
1108 | bnc->msg_buffer.memory_footprint() + |
1109 | bnc->fresh_message_tree.memory_size() + |
1110 | bnc->stale_message_tree.memory_size() + |
1111 | bnc->broadcast_list.memory_size()); |
1112 | } |
1113 | |
1114 | // how much memory in this child buffer holds useful data? |
1115 | // originally created solely for use by test program(s). |
1116 | long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) { |
1117 | return (sizeof(*bnc) + |
1118 | bnc->msg_buffer.memory_size_in_use() + |
1119 | bnc->fresh_message_tree.memory_size() + |
1120 | bnc->stale_message_tree.memory_size() + |
1121 | bnc->broadcast_list.memory_size()); |
1122 | } |
1123 | |
1124 | // |
1125 | // Garbage collection |
1126 | // Message injection |
1127 | // Message application |
1128 | // |
1129 | |
1130 | // Used only by test programs: append a child node to a parent node |
1131 | void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) { |
1132 | int childnum = node->n_children; |
1133 | node->n_children++; |
1134 | REALLOC_N(node->n_children, node->bp); |
1135 | BP_BLOCKNUM(node,childnum) = child->blocknum; |
1136 | BP_STATE(node,childnum) = PT_AVAIL; |
1137 | BP_WORKDONE(node, childnum) = 0; |
1138 | set_BNC(node, childnum, toku_create_empty_nl()); |
1139 | if (pivotkey) { |
1140 | invariant(childnum > 0); |
1141 | node->pivotkeys.insert_at(pivotkey, childnum - 1); |
1142 | } |
1143 | node->dirty = 1; |
1144 | } |
1145 | |
1146 | void |
1147 | toku_ft_bn_apply_msg_once ( |
1148 | BASEMENTNODE bn, |
1149 | const ft_msg &msg, |
1150 | uint32_t idx, |
1151 | uint32_t le_keylen, |
1152 | LEAFENTRY le, |
1153 | txn_gc_info *gc_info, |
1154 | uint64_t *workdone, |
1155 | STAT64INFO stats_to_update, |
1156 | int64_t *logical_rows_delta |
1157 | ) |
1158 | // Effect: Apply msg to leafentry (msn is ignored) |
1159 | // Calculate work done by message on leafentry and add it to caller's workdone counter. |
1160 | // idx is the location where it goes |
1161 | // le is old leafentry |
1162 | { |
1163 | size_t newsize=0, oldsize=0, workdone_this_le=0; |
1164 | LEAFENTRY new_le=0; |
1165 | // how many bytes of user data (not including overhead) were added or |
1166 | // deleted from this row |
1167 | int64_t numbytes_delta = 0; |
1168 | // will be +1 or -1 or 0 (if row was added or deleted or not) |
1169 | int64_t numrows_delta = 0; |
1170 | // will be +1, -1 or 0 if a message that was accounted for logically has |
1171 | // changed in meaning such as an insert changed to an update or a delete |
1172 | // changed to a noop |
1173 | int64_t logical_rows_delta_le = 0; |
1174 | uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t); |
1175 | if (le) { |
1176 | oldsize = leafentry_memsize(le) + key_storage_size; |
1177 | } |
1178 | |
1179 | // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt() |
1180 | // to allocate more space. That means le is guaranteed to not cause a |
1181 | // sigsegv but it may point to a mempool that is no longer in use. |
1182 | // We'll have to release the old mempool later. |
1183 | logical_rows_delta_le = toku_le_apply_msg( |
1184 | msg, |
1185 | le, |
1186 | &bn->data_buffer, |
1187 | idx, |
1188 | le_keylen, |
1189 | gc_info, |
1190 | &new_le, |
1191 | &numbytes_delta); |
1192 | |
1193 | // at this point, we cannot trust cmd->u.id.key to be valid. |
1194 | // The dmt may have realloced its mempool and freed the one containing key. |
1195 | |
1196 | newsize = new_le ? (leafentry_memsize(new_le) + + key_storage_size) : 0; |
1197 | if (le && new_le) { |
1198 | workdone_this_le = (oldsize > newsize ? oldsize : newsize); // work done is max of le size before and after message application |
1199 | |
1200 | } else { // we did not just replace a row, so ... |
1201 | if (le) { |
1202 | // ... we just deleted a row ... |
1203 | workdone_this_le = oldsize; |
1204 | numrows_delta = -1; |
1205 | } |
1206 | if (new_le) { |
1207 | // ... or we just added a row |
1208 | workdone_this_le = newsize; |
1209 | numrows_delta = 1; |
1210 | } |
1211 | } |
1212 | if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL |
1213 | *workdone += workdone_this_le; |
1214 | } |
1215 | |
1216 | if (FT_LIKELY(logical_rows_delta != NULL)) { |
1217 | *logical_rows_delta += logical_rows_delta_le; |
1218 | } |
1219 | // now update stat64 statistics |
1220 | bn->stat64_delta.numrows += numrows_delta; |
1221 | bn->stat64_delta.numbytes += numbytes_delta; |
1222 | // the only reason stats_to_update may be null is for tests |
1223 | if (FT_LIKELY(stats_to_update != NULL)) { |
1224 | stats_to_update->numrows += numrows_delta; |
1225 | stats_to_update->numbytes += numbytes_delta; |
1226 | } |
1227 | } |
1228 | |
1229 | static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in. |
1230 | struct { |
1231 | uint32_t ; |
1232 | bool ; |
1233 | // any error code that setval_fun wants to return goes here. |
1234 | int ; |
1235 | // need arguments for toku_ft_bn_apply_msg_once |
1236 | BASEMENTNODE ; |
1237 | // captured from original message, not currently used |
1238 | MSN ; |
1239 | XIDS ; |
1240 | const DBT* ; |
1241 | uint32_t ; |
1242 | uint32_t ; |
1243 | LEAFENTRY ; |
1244 | txn_gc_info* ; |
1245 | uint64_t* ; // set by toku_ft_bn_apply_msg_once() |
1246 | STAT64INFO ; |
1247 | int64_t* ; |
1248 | }; |
1249 | |
1250 | /* |
1251 | * If new_val == NULL, we send a delete message instead of an insert. |
1252 | * This happens here instead of in do_delete() for consistency. |
1253 | * setval_fun() is called from handlerton, passing in svextra_v |
1254 | * from setval_extra_s input arg to ft->update_fun(). |
1255 | */ |
1256 | static void setval_fun (const DBT *new_val, void *) { |
1257 | struct setval_extra_s *CAST_FROM_VOIDP(, svextra_v); |
1258 | paranoid_invariant(svextra->tag==setval_tag); |
1259 | paranoid_invariant(!svextra->did_set_val); |
1260 | svextra->did_set_val = true; |
1261 | |
1262 | { |
1263 | // can't leave scope until toku_ft_bn_apply_msg_once if |
1264 | // this is a delete |
1265 | DBT val; |
1266 | ft_msg msg( |
1267 | svextra->key, |
1268 | new_val ? new_val : toku_init_dbt(&val), |
1269 | new_val ? FT_INSERT : FT_DELETE_ANY, |
1270 | svextra->msn, |
1271 | svextra->xids); |
1272 | toku_ft_bn_apply_msg_once( |
1273 | svextra->bn, |
1274 | msg, |
1275 | svextra->idx, |
1276 | svextra->le_keylen, |
1277 | svextra->le, |
1278 | svextra->gc_info, |
1279 | svextra->workdone, |
1280 | svextra->stats_to_update, |
1281 | svextra->logical_rows_delta); |
1282 | svextra->setval_r = 0; |
1283 | } |
1284 | } |
1285 | |
1286 | // We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls |
1287 | // do_update()), so capturing the msn in the setval_extra_s is not strictly |
1288 | // required. The alternative would be to put a dummy msn in the messages |
1289 | // created by setval_fun(), but preserving the original msn seems cleaner and |
1290 | // it preserves accountability at a lower layer. |
1291 | static int do_update( |
1292 | ft_update_func update_fun, |
1293 | const DESCRIPTOR_S* desc, |
1294 | BASEMENTNODE bn, |
1295 | const ft_msg &msg, |
1296 | uint32_t idx, |
1297 | LEAFENTRY le, |
1298 | void* keydata, |
1299 | uint32_t keylen, |
1300 | txn_gc_info* gc_info, |
1301 | uint64_t* workdone, |
1302 | STAT64INFO stats_to_update, |
1303 | int64_t* logical_rows_delta) { |
1304 | |
1305 | LEAFENTRY le_for_update; |
1306 | DBT key; |
1307 | const DBT *keyp; |
1308 | const DBT *; |
1309 | DBT vdbt; |
1310 | const DBT *vdbtp; |
1311 | |
1312 | // the location of data depends whether this is a regular or |
1313 | // broadcast update |
1314 | if (msg.type() == FT_UPDATE) { |
1315 | // key is passed in with command (should be same as from le) |
1316 | // update function extra is passed in with command |
1317 | keyp = msg.kdbt(); |
1318 | update_function_extra = msg.vdbt(); |
1319 | } else { |
1320 | invariant(msg.type() == FT_UPDATE_BROADCAST_ALL); |
1321 | // key is not passed in with broadcast, it comes from le |
1322 | // update function extra is passed in with command |
1323 | paranoid_invariant(le); // for broadcast updates, we just hit all leafentries |
1324 | // so this cannot be null |
1325 | paranoid_invariant(keydata); |
1326 | paranoid_invariant(keylen); |
1327 | paranoid_invariant(msg.kdbt()->size == 0); |
1328 | keyp = toku_fill_dbt(&key, keydata, keylen); |
1329 | update_function_extra = msg.vdbt(); |
1330 | } |
1331 | toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL); |
1332 | |
1333 | if (le && !le_latest_is_del(le)) { |
1334 | // if the latest val exists, use it, and we'll use the leafentry later |
1335 | uint32_t vallen; |
1336 | void *valp = le_latest_val_and_len(le, &vallen); |
1337 | vdbtp = toku_fill_dbt(&vdbt, valp, vallen); |
1338 | } else { |
1339 | // otherwise, the val and leafentry are both going to be null |
1340 | vdbtp = NULL; |
1341 | } |
1342 | le_for_update = le; |
1343 | |
1344 | struct setval_extra_s = { |
1345 | setval_tag, |
1346 | false, |
1347 | 0, |
1348 | bn, |
1349 | msg.msn(), |
1350 | msg.xids(), |
1351 | keyp, |
1352 | idx, |
1353 | keylen, |
1354 | le_for_update, |
1355 | gc_info, |
1356 | workdone, |
1357 | stats_to_update, |
1358 | logical_rows_delta |
1359 | }; |
1360 | // call handlerton's ft->update_fun(), which passes setval_extra |
1361 | // to setval_fun() |
1362 | FAKE_DB(db, desc); |
1363 | int r = update_fun( |
1364 | &db, |
1365 | keyp, |
1366 | vdbtp, |
1367 | update_function_extra, |
1368 | setval_fun, |
1369 | &setval_extra); |
1370 | |
1371 | if (r == 0) { r = setval_extra.setval_r; } |
1372 | return r; |
1373 | } |
1374 | |
1375 | // Should be renamed as something like "apply_msg_to_basement()." |
1376 | void toku_ft_bn_apply_msg( |
1377 | const toku::comparator& cmp, |
1378 | ft_update_func update_fun, |
1379 | BASEMENTNODE bn, |
1380 | const ft_msg& msg, |
1381 | txn_gc_info* gc_info, |
1382 | uint64_t* workdone, |
1383 | STAT64INFO stats_to_update, |
1384 | int64_t* logical_rows_delta) { |
1385 | // Effect: |
1386 | // Put a msg into a leaf. |
1387 | // Calculate work done by message on leafnode and add it to caller's |
1388 | // workdone counter. |
1389 | // The leaf could end up "too big" or "too small". The caller must fix that up. |
1390 | LEAFENTRY storeddata; |
1391 | void* key = NULL; |
1392 | uint32_t keylen = 0; |
1393 | |
1394 | uint32_t num_klpairs; |
1395 | int r; |
1396 | struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt()); |
1397 | |
1398 | unsigned int doing_seqinsert = bn->seqinsert; |
1399 | bn->seqinsert = 0; |
1400 | |
1401 | switch (msg.type()) { |
1402 | case FT_INSERT_NO_OVERWRITE: |
1403 | case FT_INSERT: { |
1404 | uint32_t idx; |
1405 | if (doing_seqinsert) { |
1406 | idx = bn->data_buffer.num_klpairs(); |
1407 | DBT kdbt; |
1408 | r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data); |
1409 | if (r != 0) goto fz; |
1410 | int c = toku_msg_leafval_heaviside(kdbt, be); |
1411 | if (c >= 0) goto fz; |
1412 | r = DB_NOTFOUND; |
1413 | } else { |
1414 | fz: |
1415 | r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>( |
1416 | be, |
1417 | &storeddata, |
1418 | &key, |
1419 | &keylen, |
1420 | &idx |
1421 | ); |
1422 | } |
1423 | if (r==DB_NOTFOUND) { |
1424 | storeddata = 0; |
1425 | } else { |
1426 | assert_zero(r); |
1427 | } |
1428 | toku_ft_bn_apply_msg_once( |
1429 | bn, |
1430 | msg, |
1431 | idx, |
1432 | keylen, |
1433 | storeddata, |
1434 | gc_info, |
1435 | workdone, |
1436 | stats_to_update, |
1437 | logical_rows_delta); |
1438 | |
1439 | // if the insertion point is within a window of the right edge of |
1440 | // the leaf then it is sequential |
1441 | // window = min(32, number of leaf entries/16) |
1442 | { |
1443 | uint32_t s = bn->data_buffer.num_klpairs(); |
1444 | uint32_t w = s / 16; |
1445 | if (w == 0) w = 1; |
1446 | if (w > 32) w = 32; |
1447 | |
1448 | // within the window? |
1449 | if (s - idx <= w) |
1450 | bn->seqinsert = doing_seqinsert + 1; |
1451 | } |
1452 | break; |
1453 | } |
1454 | case FT_DELETE_ANY: |
1455 | case FT_ABORT_ANY: |
1456 | case FT_COMMIT_ANY: { |
1457 | uint32_t idx; |
1458 | // Apply to all the matches |
1459 | |
1460 | r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>( |
1461 | be, |
1462 | &storeddata, |
1463 | &key, |
1464 | &keylen, |
1465 | &idx); |
1466 | if (r == DB_NOTFOUND) break; |
1467 | assert_zero(r); |
1468 | toku_ft_bn_apply_msg_once( |
1469 | bn, |
1470 | msg, |
1471 | idx, |
1472 | keylen, |
1473 | storeddata, |
1474 | gc_info, |
1475 | workdone, |
1476 | stats_to_update, |
1477 | logical_rows_delta); |
1478 | break; |
1479 | } |
1480 | case FT_OPTIMIZE_FOR_UPGRADE: |
1481 | // fall through so that optimize_for_upgrade performs rest of the optimize logic |
1482 | case FT_COMMIT_BROADCAST_ALL: |
1483 | case FT_OPTIMIZE: |
1484 | // Apply to all leafentries |
1485 | num_klpairs = bn->data_buffer.num_klpairs(); |
1486 | for (uint32_t idx = 0; idx < num_klpairs; ) { |
1487 | void* curr_keyp = NULL; |
1488 | uint32_t curr_keylen = 0; |
1489 | r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp); |
1490 | assert_zero(r); |
1491 | int deleted = 0; |
1492 | if (!le_is_clean(storeddata)) { //If already clean, nothing to do. |
1493 | // message application code needs a key in order to determine |
1494 | // how much work was done by this message. since this is a |
1495 | // broadcast message, we have to create a new message whose |
1496 | // key is the current le's key. |
1497 | DBT curr_keydbt; |
1498 | ft_msg curr_msg( |
1499 | toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), |
1500 | msg.vdbt(), |
1501 | msg.type(), |
1502 | msg.msn(), |
1503 | msg.xids()); |
1504 | toku_ft_bn_apply_msg_once( |
1505 | bn, |
1506 | curr_msg, |
1507 | idx, |
1508 | curr_keylen, |
1509 | storeddata, |
1510 | gc_info, |
1511 | workdone, |
1512 | stats_to_update, |
1513 | logical_rows_delta); |
1514 | // at this point, we cannot trust msg.kdbt to be valid. |
1515 | uint32_t new_dmt_size = bn->data_buffer.num_klpairs(); |
1516 | if (new_dmt_size != num_klpairs) { |
1517 | paranoid_invariant(new_dmt_size + 1 == num_klpairs); |
1518 | //Item was deleted. |
1519 | deleted = 1; |
1520 | } |
1521 | } |
1522 | if (deleted) |
1523 | num_klpairs--; |
1524 | else |
1525 | idx++; |
1526 | } |
1527 | paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs); |
1528 | |
1529 | break; |
1530 | case FT_COMMIT_BROADCAST_TXN: |
1531 | case FT_ABORT_BROADCAST_TXN: |
1532 | // Apply to all leafentries if txn is represented |
1533 | num_klpairs = bn->data_buffer.num_klpairs(); |
1534 | for (uint32_t idx = 0; idx < num_klpairs; ) { |
1535 | void* curr_keyp = NULL; |
1536 | uint32_t curr_keylen = 0; |
1537 | r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp); |
1538 | assert_zero(r); |
1539 | int deleted = 0; |
1540 | if (le_has_xids(storeddata, msg.xids())) { |
1541 | // message application code needs a key in order to determine |
1542 | // how much work was done by this message. since this is a |
1543 | // broadcast message, we have to create a new message whose key |
1544 | // is the current le's key. |
1545 | DBT curr_keydbt; |
1546 | ft_msg curr_msg( |
1547 | toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), |
1548 | msg.vdbt(), |
1549 | msg.type(), |
1550 | msg.msn(), |
1551 | msg.xids()); |
1552 | toku_ft_bn_apply_msg_once( |
1553 | bn, |
1554 | curr_msg, |
1555 | idx, |
1556 | curr_keylen, |
1557 | storeddata, |
1558 | gc_info, |
1559 | workdone, |
1560 | stats_to_update, |
1561 | logical_rows_delta); |
1562 | uint32_t new_dmt_size = bn->data_buffer.num_klpairs(); |
1563 | if (new_dmt_size != num_klpairs) { |
1564 | paranoid_invariant(new_dmt_size + 1 == num_klpairs); |
1565 | //Item was deleted. |
1566 | deleted = 1; |
1567 | } |
1568 | } |
1569 | if (deleted) |
1570 | num_klpairs--; |
1571 | else |
1572 | idx++; |
1573 | } |
1574 | paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs); |
1575 | |
1576 | break; |
1577 | case FT_UPDATE: { |
1578 | uint32_t idx; |
1579 | r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>( |
1580 | be, |
1581 | &storeddata, |
1582 | &key, |
1583 | &keylen, |
1584 | &idx |
1585 | ); |
1586 | if (r==DB_NOTFOUND) { |
1587 | { |
1588 | //Point to msg's copy of the key so we don't worry about le being freed |
1589 | //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled |
1590 | key = msg.kdbt()->data; |
1591 | keylen = msg.kdbt()->size; |
1592 | } |
1593 | r = do_update( |
1594 | update_fun, |
1595 | cmp.get_descriptor(), |
1596 | bn, |
1597 | msg, |
1598 | idx, |
1599 | NULL, |
1600 | NULL, |
1601 | 0, |
1602 | gc_info, |
1603 | workdone, |
1604 | stats_to_update, |
1605 | logical_rows_delta); |
1606 | } else if (r==0) { |
1607 | r = do_update( |
1608 | update_fun, |
1609 | cmp.get_descriptor(), |
1610 | bn, |
1611 | msg, |
1612 | idx, |
1613 | storeddata, |
1614 | key, |
1615 | keylen, |
1616 | gc_info, |
1617 | workdone, |
1618 | stats_to_update, |
1619 | logical_rows_delta); |
1620 | } // otherwise, a worse error, just return it |
1621 | break; |
1622 | } |
1623 | case FT_UPDATE_BROADCAST_ALL: { |
1624 | // apply to all leafentries. |
1625 | uint32_t idx = 0; |
1626 | uint32_t num_leafentries_before; |
1627 | // This is used to avoid having the logical row count changed on apply |
1628 | // of this message since it will return a negative number of the number |
1629 | // of leaf entries visited and cause the ft header value to go to 0; |
1630 | // This message will not change the number of rows, so just use the |
1631 | // bogus value. |
1632 | int64_t temp_logical_rows_delta = 0; |
1633 | while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) { |
1634 | void* curr_key = nullptr; |
1635 | uint32_t curr_keylen = 0; |
1636 | r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key); |
1637 | assert_zero(r); |
1638 | |
1639 | //TODO: 46 replace this with something better than cloning key |
1640 | // TODO: (Zardosht) This may be unnecessary now, due to how the key |
1641 | // is handled in the bndata. Investigate and determine |
1642 | char clone_mem[curr_keylen]; // only lasts one loop, alloca would overflow (end of function) |
1643 | memcpy((void*)clone_mem, curr_key, curr_keylen); |
1644 | curr_key = (void*)clone_mem; |
1645 | |
1646 | // This is broken below. Have a compilation error checked |
1647 | // in as a reminder |
1648 | r = do_update( |
1649 | update_fun, |
1650 | cmp.get_descriptor(), |
1651 | bn, |
1652 | msg, |
1653 | idx, |
1654 | storeddata, |
1655 | curr_key, |
1656 | curr_keylen, |
1657 | gc_info, |
1658 | workdone, |
1659 | stats_to_update, |
1660 | &temp_logical_rows_delta); |
1661 | assert_zero(r); |
1662 | |
1663 | if (num_leafentries_before == bn->data_buffer.num_klpairs()) { |
1664 | // we didn't delete something, so increment the index. |
1665 | idx++; |
1666 | } |
1667 | } |
1668 | break; |
1669 | } |
1670 | case FT_NONE: break; // don't do anything |
1671 | } |
1672 | |
1673 | return; |
1674 | } |
1675 | |
1676 | static inline int |
1677 | key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) { |
1678 | int r = cmp(a, b); |
1679 | if (r == 0) { |
1680 | if (amsn.msn > bmsn.msn) { |
1681 | r = +1; |
1682 | } else if (amsn.msn < bmsn.msn) { |
1683 | r = -1; |
1684 | } else { |
1685 | r = 0; |
1686 | } |
1687 | } |
1688 | return r; |
1689 | } |
1690 | |
1691 | int (const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &) { |
1692 | MSN query_msn; |
1693 | DBT query_key; |
1694 | extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn); |
1695 | return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp); |
1696 | } |
1697 | |
1698 | int (const struct toku_msg_buffer_key_msn_cmp_extra &, const int32_t &ao, const int32_t &bo) { |
1699 | MSN amsn, bmsn; |
1700 | DBT akey, bkey; |
1701 | extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn); |
1702 | extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn); |
1703 | return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp); |
1704 | } |
1705 | |
1706 | // Effect: Enqueue the message represented by the parameters into the |
1707 | // bnc's buffer, and put it in either the fresh or stale message tree, |
1708 | // or the broadcast list. |
1709 | static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) { |
1710 | int r = 0; |
1711 | int32_t offset; |
1712 | bnc->msg_buffer.enqueue(msg, is_fresh, &offset); |
1713 | enum ft_msg_type type = msg.type(); |
1714 | if (ft_msg_type_applies_once(type)) { |
1715 | DBT key; |
1716 | toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size); |
1717 | struct toku_msg_buffer_key_msn_heaviside_extra (cmp, &bnc->msg_buffer, &key, msg.msn()); |
1718 | if (is_fresh) { |
1719 | r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr); |
1720 | assert_zero(r); |
1721 | } else { |
1722 | r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr); |
1723 | assert_zero(r); |
1724 | } |
1725 | } else { |
1726 | invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)); |
1727 | const uint32_t idx = bnc->broadcast_list.size(); |
1728 | r = bnc->broadcast_list.insert_at(offset, idx); |
1729 | assert_zero(r); |
1730 | } |
1731 | } |
1732 | |
1733 | // This is only exported for tests. |
1734 | void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp) |
1735 | { |
1736 | DBT k, v; |
1737 | ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids); |
1738 | bnc_insert_msg(bnc, msg, is_fresh, cmp); |
1739 | } |
1740 | |
1741 | // append a msg to a nonleaf node's child buffer |
1742 | static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node, |
1743 | int childnum, const ft_msg &msg, bool is_fresh) { |
1744 | paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL); |
1745 | bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp); |
1746 | node->dirty = 1; |
1747 | } |
1748 | |
1749 | // This is only exported for tests. |
1750 | void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) { |
1751 | ft_msg msg(key, val, type, msn, xids); |
1752 | ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh); |
1753 | } |
1754 | |
1755 | static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[]) |
1756 | // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here. |
1757 | // Also we don't worry about the node getting overfull here. It's the caller's problem. |
1758 | { |
1759 | unsigned int childnum = (target_childnum >= 0 |
1760 | ? target_childnum |
1761 | : toku_ftnode_which_child(node, msg.kdbt(), cmp)); |
1762 | ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh); |
1763 | NONLEAF_CHILDINFO bnc = BNC(node, childnum); |
1764 | bnc->flow[0] += flow_deltas[0]; |
1765 | bnc->flow[1] += flow_deltas[1]; |
1766 | } |
1767 | |
1768 | // TODO: Remove me, I'm boring. |
1769 | static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) { |
1770 | return cmp(key, pivot); |
1771 | } |
1772 | |
1773 | /* Find the leftmost child that may contain the key. |
1774 | * If the key exists it will be in the child whose number |
1775 | * is the return value of this function. |
1776 | */ |
1777 | int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) { |
1778 | // a funny case of no pivots |
1779 | if (node->n_children <= 1) return 0; |
1780 | |
1781 | DBT pivot; |
1782 | |
1783 | // check the last key to optimize seq insertions |
1784 | int n = node->n_children-1; |
1785 | int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot)); |
1786 | if (c > 0) return n; |
1787 | |
1788 | // binary search the pivots |
1789 | int lo = 0; |
1790 | int hi = n-1; // skip the last one, we checked it above |
1791 | int mi; |
1792 | while (lo < hi) { |
1793 | mi = (lo + hi) / 2; |
1794 | c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot)); |
1795 | if (c > 0) { |
1796 | lo = mi+1; |
1797 | continue; |
1798 | } |
1799 | if (c < 0) { |
1800 | hi = mi; |
1801 | continue; |
1802 | } |
1803 | return mi; |
1804 | } |
1805 | return lo; |
1806 | } |
1807 | |
1808 | // Used for HOT. |
1809 | int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) { |
1810 | DBT pivot; |
1811 | int low = 0; |
1812 | int hi = node->n_children - 1; |
1813 | int mi; |
1814 | while (low < hi) { |
1815 | mi = (low + hi) / 2; |
1816 | int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot)); |
1817 | if (r > 0) { |
1818 | low = mi + 1; |
1819 | } else if (r < 0) { |
1820 | hi = mi; |
1821 | } else { |
1822 | // if they were exactly equal, then we want the sub-tree under |
1823 | // the next pivot. |
1824 | return mi + 1; |
1825 | } |
1826 | } |
1827 | invariant(low == hi); |
1828 | return low; |
1829 | } |
1830 | |
1831 | void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) { |
1832 | FTNODE CAST_FROM_VOIDP(node, value_data); |
1833 | node->ct_pair = p; |
1834 | } |
1835 | |
1836 | static void |
1837 | ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[]) |
1838 | // Effect: Put the message into a nonleaf node. We put it into all children, possibly causing the children to become reactive. |
1839 | // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. |
1840 | // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) |
1841 | { |
1842 | for (int i = 0; i < node->n_children; i++) { |
1843 | ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas); |
1844 | } |
1845 | } |
1846 | |
1847 | static void |
1848 | ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[]) |
1849 | // Effect: Put the message into a nonleaf node. We may put it into a child, possibly causing the child to become reactive. |
1850 | // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. |
1851 | // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) |
1852 | // |
1853 | { |
1854 | |
1855 | // |
1856 | // see comments in toku_ft_leaf_apply_msg |
1857 | // to understand why we handle setting |
1858 | // node->max_msn_applied_to_node_on_disk here, |
1859 | // and don't do it in toku_ftnode_put_msg |
1860 | // |
1861 | MSN msg_msn = msg.msn(); |
1862 | invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn); |
1863 | node->max_msn_applied_to_node_on_disk = msg_msn; |
1864 | |
1865 | if (ft_msg_type_applies_once(msg.type())) { |
1866 | ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas); |
1867 | } else if (ft_msg_type_applies_all(msg.type())) { |
1868 | ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas); |
1869 | } else { |
1870 | paranoid_invariant(ft_msg_type_does_nothing(msg.type())); |
1871 | } |
1872 | } |
1873 | |
1874 | // Garbage collect one leaf entry. |
1875 | static void |
1876 | ft_basement_node_gc_once(BASEMENTNODE bn, |
1877 | uint32_t index, |
1878 | void* keyp, |
1879 | uint32_t keylen, |
1880 | LEAFENTRY leaf_entry, |
1881 | txn_gc_info *gc_info, |
1882 | STAT64INFO_S * delta) |
1883 | { |
1884 | paranoid_invariant(leaf_entry); |
1885 | |
1886 | // Don't run garbage collection on non-mvcc leaf entries. |
1887 | if (leaf_entry->type != LE_MVCC) { |
1888 | goto exit; |
1889 | } |
1890 | |
1891 | // Don't run garbage collection if this leafentry decides it's not worth it. |
1892 | if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) { |
1893 | goto exit; |
1894 | } |
1895 | |
1896 | LEAFENTRY new_leaf_entry; |
1897 | new_leaf_entry = NULL; |
1898 | |
1899 | // The mempool doesn't free itself. When it allocates new memory, |
1900 | // this pointer will be set to the older memory that must now be |
1901 | // freed. |
1902 | void * maybe_free; |
1903 | maybe_free = NULL; |
1904 | |
1905 | // These will represent the number of bytes and rows changed as |
1906 | // part of the garbage collection. |
1907 | int64_t numbytes_delta; |
1908 | int64_t numrows_delta; |
1909 | toku_le_garbage_collect(leaf_entry, |
1910 | &bn->data_buffer, |
1911 | index, |
1912 | keyp, |
1913 | keylen, |
1914 | gc_info, |
1915 | &new_leaf_entry, |
1916 | &numbytes_delta); |
1917 | |
1918 | numrows_delta = 0; |
1919 | if (new_leaf_entry) { |
1920 | numrows_delta = 0; |
1921 | } else { |
1922 | numrows_delta = -1; |
1923 | } |
1924 | |
1925 | // If we created a new mempool buffer we must free the |
1926 | // old/original buffer. |
1927 | if (maybe_free) { |
1928 | toku_free(maybe_free); |
1929 | } |
1930 | |
1931 | // Update stats. |
1932 | bn->stat64_delta.numrows += numrows_delta; |
1933 | bn->stat64_delta.numbytes += numbytes_delta; |
1934 | delta->numrows += numrows_delta; |
1935 | delta->numbytes += numbytes_delta; |
1936 | |
1937 | exit: |
1938 | return; |
1939 | } |
1940 | |
1941 | // Garbage collect all leaf entries for a given basement node. |
1942 | static void |
1943 | basement_node_gc_all_les(BASEMENTNODE bn, |
1944 | txn_gc_info *gc_info, |
1945 | STAT64INFO_S * delta) |
1946 | { |
1947 | int r = 0; |
1948 | uint32_t index = 0; |
1949 | uint32_t num_leafentries_before; |
1950 | while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) { |
1951 | void* keyp = NULL; |
1952 | uint32_t keylen = 0; |
1953 | LEAFENTRY leaf_entry; |
1954 | r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp); |
1955 | assert_zero(r); |
1956 | ft_basement_node_gc_once( |
1957 | bn, |
1958 | index, |
1959 | keyp, |
1960 | keylen, |
1961 | leaf_entry, |
1962 | gc_info, |
1963 | delta |
1964 | ); |
1965 | // Check if the leaf entry was deleted or not. |
1966 | if (num_leafentries_before == bn->data_buffer.num_klpairs()) { |
1967 | ++index; |
1968 | } |
1969 | } |
1970 | } |
1971 | |
1972 | // Garbage collect all leaf entires in all basement nodes. |
1973 | static void |
1974 | ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info) |
1975 | { |
1976 | toku_ftnode_assert_fully_in_memory(node); |
1977 | paranoid_invariant_zero(node->height); |
1978 | // Loop through each leaf entry, garbage collecting as we go. |
1979 | for (int i = 0; i < node->n_children; ++i) { |
1980 | // Perform the garbage collection. |
1981 | BASEMENTNODE bn = BLB(node, i); |
1982 | STAT64INFO_S delta; |
1983 | delta.numrows = 0; |
1984 | delta.numbytes = 0; |
1985 | basement_node_gc_all_les(bn, gc_info, &delta); |
1986 | toku_ft_update_stats(&ft->in_memory_stats, delta); |
1987 | } |
1988 | } |
1989 | |
1990 | void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) { |
1991 | TOKULOGGER logger = toku_cachefile_logger(ft->cf); |
1992 | if (logger) { |
1993 | TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger); |
1994 | txn_manager_state txn_state_for_gc(txn_manager); |
1995 | txn_state_for_gc.init(); |
1996 | TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager); |
1997 | |
1998 | // Perform full garbage collection. |
1999 | // |
2000 | // - txn_state_for_gc |
2001 | // a fresh snapshot of the transaction system. |
2002 | // - oldest_referenced_xid_for_simple_gc |
2003 | // the oldest xid in any live list as of right now - suitible for simple gc |
2004 | // - node->oldest_referenced_xid_known |
2005 | // the last known oldest referenced xid for this node and any unapplied messages. |
2006 | // it is a lower bound on the actual oldest referenced xid - but becasue there |
2007 | // may be abort messages above us, we need to be careful to only use this value |
2008 | // for implicit promotion (as opposed to the oldest referenced xid for simple gc) |
2009 | // |
2010 | // The node has its own oldest referenced xid because it must be careful not to implicitly promote |
2011 | // provisional entries for transactions that are no longer live, but may have abort messages |
2012 | // somewhere above us in the tree. |
2013 | txn_gc_info gc_info(&txn_state_for_gc, |
2014 | oldest_referenced_xid_for_simple_gc, |
2015 | node->oldest_referenced_xid_known, |
2016 | true); |
2017 | ft_leaf_gc_all_les(ft, node, &gc_info); |
2018 | } |
2019 | } |
2020 | |
2021 | void toku_ftnode_put_msg( |
2022 | const toku::comparator &cmp, |
2023 | ft_update_func update_fun, |
2024 | FTNODE node, |
2025 | int target_childnum, |
2026 | const ft_msg &msg, |
2027 | bool is_fresh, |
2028 | txn_gc_info* gc_info, |
2029 | size_t flow_deltas[], |
2030 | STAT64INFO stats_to_update, |
2031 | int64_t* logical_rows_delta) { |
2032 | // Effect: Push message into the subtree rooted at NODE. |
2033 | // If NODE is a leaf, then |
2034 | // put message into leaf, applying it to the leafentries |
2035 | // If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren). |
2036 | // The node may become overfull. That's not our problem. |
2037 | toku_ftnode_assert_fully_in_memory(node); |
2038 | // |
2039 | // see comments in toku_ft_leaf_apply_msg |
2040 | // to understand why we don't handle setting |
2041 | // node->max_msn_applied_to_node_on_disk here, |
2042 | // and instead defer to these functions |
2043 | // |
2044 | if (node->height==0) { |
2045 | toku_ft_leaf_apply_msg( |
2046 | cmp, |
2047 | update_fun, |
2048 | node, |
2049 | target_childnum, msg, |
2050 | gc_info, |
2051 | nullptr, |
2052 | stats_to_update, |
2053 | logical_rows_delta); |
2054 | } else { |
2055 | ft_nonleaf_put_msg( |
2056 | cmp, |
2057 | node, |
2058 | target_childnum, |
2059 | msg, |
2060 | is_fresh, |
2061 | flow_deltas); |
2062 | } |
2063 | } |
2064 | |
2065 | // Effect: applies the message to the leaf if the appropriate basement node is |
2066 | // in memory. This function is called during message injection and/or |
2067 | // flushing, so the entire node MUST be in memory. |
2068 | void toku_ft_leaf_apply_msg( |
2069 | const toku::comparator& cmp, |
2070 | ft_update_func update_fun, |
2071 | FTNODE node, |
2072 | int target_childnum, // which child to inject to, or -1 if unknown |
2073 | const ft_msg& msg, |
2074 | txn_gc_info* gc_info, |
2075 | uint64_t* workdone, |
2076 | STAT64INFO stats_to_update, |
2077 | int64_t* logical_rows_delta) { |
2078 | |
2079 | VERIFY_NODE(t, node); |
2080 | toku_ftnode_assert_fully_in_memory(node); |
2081 | |
2082 | // |
2083 | // Because toku_ft_leaf_apply_msg is called with the intent of permanently |
2084 | // applying a message to a leaf node (meaning the message is permanently applied |
2085 | // and will be purged from the system after this call, as opposed to |
2086 | // toku_apply_ancestors_messages_to_node, which applies a message |
2087 | // for a query, but the message may still reside in the system and |
2088 | // be reapplied later), we mark the node as dirty and |
2089 | // take the opportunity to update node->max_msn_applied_to_node_on_disk. |
2090 | // |
2091 | node->dirty = 1; |
2092 | |
2093 | // |
2094 | // we cannot blindly update node->max_msn_applied_to_node_on_disk, |
2095 | // we must check to see if the msn is greater that the one already stored, |
2096 | // because the message may have already been applied earlier (via |
2097 | // toku_apply_ancestors_messages_to_node) to answer a query |
2098 | // |
2099 | // This is why we handle node->max_msn_applied_to_node_on_disk both here |
2100 | // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg. |
2101 | // |
2102 | MSN msg_msn = msg.msn(); |
2103 | if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) { |
2104 | node->max_msn_applied_to_node_on_disk = msg_msn; |
2105 | } |
2106 | |
2107 | if (ft_msg_type_applies_once(msg.type())) { |
2108 | unsigned int childnum = (target_childnum >= 0 |
2109 | ? target_childnum |
2110 | : toku_ftnode_which_child(node, msg.kdbt(), cmp)); |
2111 | BASEMENTNODE bn = BLB(node, childnum); |
2112 | if (msg.msn().msn > bn->max_msn_applied.msn) { |
2113 | bn->max_msn_applied = msg.msn(); |
2114 | toku_ft_bn_apply_msg( |
2115 | cmp, |
2116 | update_fun, |
2117 | bn, |
2118 | msg, |
2119 | gc_info, |
2120 | workdone, |
2121 | stats_to_update, |
2122 | logical_rows_delta); |
2123 | } else { |
2124 | toku_ft_status_note_msn_discard(); |
2125 | } |
2126 | } else if (ft_msg_type_applies_all(msg.type())) { |
2127 | for (int childnum=0; childnum<node->n_children; childnum++) { |
2128 | if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) { |
2129 | BLB(node, childnum)->max_msn_applied = msg.msn(); |
2130 | toku_ft_bn_apply_msg( |
2131 | cmp, |
2132 | update_fun, |
2133 | BLB(node, childnum), |
2134 | msg, |
2135 | gc_info, |
2136 | workdone, |
2137 | stats_to_update, |
2138 | logical_rows_delta); |
2139 | } else { |
2140 | toku_ft_status_note_msn_discard(); |
2141 | } |
2142 | } |
2143 | } else if (!ft_msg_type_does_nothing(msg.type())) { |
2144 | invariant(ft_msg_type_does_nothing(msg.type())); |
2145 | } |
2146 | VERIFY_NODE(t, node); |
2147 | } |
2148 | |
2149 | |