1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39/* Verify an FT. */
40/* Check:
41 * The tree is of uniform depth (and the height is correct at every node)
42 * For each pivot key: the max of the stuff to the left is <= the pivot key < the min of the stuff to the right.
43 * For each leaf node: All the keys are in strictly increasing order.
44 * For each nonleaf node: All the messages have keys that are between the associated pivot keys ( left_pivot_key < message <= right_pivot_key)
45 */
46
47#include <my_global.h>
48#include "ft/serialize/block_table.h"
49#include "ft/ft.h"
50#include "ft/ft-cachetable-wrappers.h"
51#include "ft/ft-internal.h"
52#include "ft/node.h"
53
54static int
55compare_pairs (FT_HANDLE ft_handle, const DBT *a, const DBT *b) {
56 return ft_handle->ft->cmp(a, b);
57}
58
59static int
60compare_pair_to_key (FT_HANDLE ft_handle, const DBT *a, const void *key, uint32_t keylen) {
61 DBT y;
62 return ft_handle->ft->cmp(a, toku_fill_dbt(&y, key, keylen));
63}
64
65static int
66verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, const void *key, uint32_t keylen, const void *UU(data), uint32_t UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot)
67 __attribute__((warn_unused_result));
68
69UU()
70static int
71verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, const void *key, uint32_t keylen, const void *UU(data), uint32_t UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) {
72 int result = 0;
73 if (msn.msn == ZERO_MSN.msn)
74 result = EINVAL;
75 switch (type) {
76 default:
77 break;
78 case FT_INSERT:
79 case FT_INSERT_NO_OVERWRITE:
80 case FT_DELETE_ANY:
81 case FT_ABORT_ANY:
82 case FT_COMMIT_ANY:
83 // verify key in bounds
84 if (lesser_pivot) {
85 int compare = compare_pair_to_key(ft_handle, lesser_pivot, key, keylen);
86 if (compare >= 0)
87 result = EINVAL;
88 }
89 if (result == 0 && greatereq_pivot) {
90 int compare = compare_pair_to_key(ft_handle, greatereq_pivot, key, keylen);
91 if (compare < 0)
92 result = EINVAL;
93 }
94 break;
95 }
96 return result;
97}
98
99static DBT
100get_ith_key_dbt (BASEMENTNODE bn, int i) {
101 DBT kdbt;
102 int r = bn->data_buffer.fetch_key_and_len(i, &kdbt.size, &kdbt.data);
103 invariant_zero(r); // this is a bad failure if it happens.
104 return kdbt;
105}
106
107#define VERIFY_ASSERTION(predicate, i, string) ({ \
108 if(!(predicate)) { \
109 fprintf(stderr, "%s:%d: Looking at child %d of block %" PRId64 ": %s\n", __FILE__, __LINE__, i, blocknum.b, string); \
110 result = TOKUDB_NEEDS_REPAIR; \
111 if (!keep_going_on_failure) goto done; \
112 }})
113
114#define VERIFY_ASSERTION_BASEMENT(predicate, bn, entry, string) ({ \
115 if(!(predicate)) { \
116 fprintf(stderr, "%s:%d: Looking at block %" PRId64 " bn %d entry %d: %s\n", __FILE__, __LINE__, blocknum.b, bn, entry, string); \
117 result = TOKUDB_NEEDS_REPAIR; \
118 if (!keep_going_on_failure) goto done; \
119 }})
120
121struct count_msgs_extra {
122 int count;
123 MSN msn;
124 message_buffer *msg_buffer;
125};
126
127// template-only function, but must be extern
128int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
129 __attribute__((nonnull(3)));
130int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
131{
132 MSN msn;
133 e->msg_buffer->get_message_key_msn(offset, nullptr, &msn);
134 if (msn.msn == e->msn.msn) {
135 e->count++;
136 }
137 return 0;
138}
139
140struct verify_message_tree_extra {
141 message_buffer *msg_buffer;
142 bool broadcast;
143 bool is_fresh;
144 int i;
145 int verbose;
146 BLOCKNUM blocknum;
147 int keep_going_on_failure;
148 bool messages_have_been_moved;
149};
150
151int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e) __attribute__((nonnull(3)));
152int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
153{
154 BLOCKNUM blocknum = e->blocknum;
155 int keep_going_on_failure = e->keep_going_on_failure;
156 int result = 0;
157 DBT k, v;
158 ft_msg msg = e->msg_buffer->get_message(offset, &k, &v);
159 bool is_fresh = e->msg_buffer->get_freshness(offset);
160 if (e->broadcast) {
161 VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) msg.type()) || ft_msg_type_does_nothing((enum ft_msg_type) msg.type()),
162 e->i, "message found in broadcast list that is not a broadcast");
163 } else {
164 VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) msg.type()),
165 e->i, "message found in fresh or stale message tree that does not apply once");
166 if (e->is_fresh) {
167 if (e->messages_have_been_moved) {
168 VERIFY_ASSERTION(is_fresh,
169 e->i, "message found in fresh message tree that is not fresh");
170 }
171 } else {
172 VERIFY_ASSERTION(!is_fresh,
173 e->i, "message found in stale message tree that is fresh");
174 }
175 }
176done:
177 return result;
178}
179
180int error_on_iter(const int32_t &UU(offset), const uint32_t UU(idx), void *UU(e));
181int error_on_iter(const int32_t &UU(offset), const uint32_t UU(idx), void *UU(e)) {
182 return TOKUDB_NEEDS_REPAIR;
183}
184
185int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e) __attribute__((nonnull(3)));
186int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
187{
188 BLOCKNUM blocknum = e->blocknum;
189 int keep_going_on_failure = e->keep_going_on_failure;
190 int result = 0;
191 bool is_fresh = e->msg_buffer->get_freshness(offset);
192 VERIFY_ASSERTION(!is_fresh, e->i, "marked message found in the fresh message tree that is fresh");
193 done:
194 return result;
195}
196
197template<typename verify_omt_t>
198static int
199verify_sorted_by_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const verify_omt_t &mt) {
200 int result = 0;
201 size_t last_offset = 0;
202 for (uint32_t i = 0; i < mt.size(); i++) {
203 int32_t offset;
204 int r = mt.fetch(i, &offset);
205 assert_zero(r);
206 if (i > 0) {
207 struct toku_msg_buffer_key_msn_cmp_extra extra(ft_handle->ft->cmp, msg_buffer);
208 if (toku_msg_buffer_key_msn_cmp(extra, last_offset, offset) >= 0) {
209 result = TOKUDB_NEEDS_REPAIR;
210 break;
211 }
212 }
213 last_offset = offset;
214 }
215 return result;
216}
217
218template<typename count_omt_t>
219static int
220count_eq_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const count_omt_t &mt, const DBT *key, MSN msn) {
221 struct toku_msg_buffer_key_msn_heaviside_extra extra(ft_handle->ft->cmp, msg_buffer, key, msn);
222 int r = mt.template find_zero<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(extra, nullptr, nullptr);
223 int count;
224 if (r == 0) {
225 count = 1;
226 } else {
227 assert(r == DB_NOTFOUND);
228 count = 0;
229 }
230 return count;
231}
232
233void
234toku_get_node_for_verify(
235 BLOCKNUM blocknum,
236 FT_HANDLE ft_handle,
237 FTNODE* nodep
238 )
239{
240 uint32_t fullhash = toku_cachetable_hash(ft_handle->ft->cf, blocknum);
241 ftnode_fetch_extra bfe;
242 bfe.create_for_full_read(ft_handle->ft);
243 toku_pin_ftnode(
244 ft_handle->ft,
245 blocknum,
246 fullhash,
247 &bfe,
248 PL_WRITE_EXPENSIVE, // may_modify_node
249 nodep,
250 false
251 );
252}
253
254struct verify_msg_fn {
255 FT_HANDLE ft_handle;
256 NONLEAF_CHILDINFO bnc;
257 const DBT *curr_less_pivot;
258 const DBT *curr_geq_pivot;
259 BLOCKNUM blocknum;
260 MSN this_msn;
261 int verbose;
262 int keep_going_on_failure;
263 bool messages_have_been_moved;
264
265 MSN last_msn;
266 int msg_i;
267 int result = 0; // needed by VERIFY_ASSERTION
268
269 verify_msg_fn(FT_HANDLE handle, NONLEAF_CHILDINFO nl, const DBT *less, const DBT *geq,
270 BLOCKNUM b, MSN tmsn, int v, int k, bool m) :
271 ft_handle(handle), bnc(nl), curr_less_pivot(less), curr_geq_pivot(geq),
272 blocknum(b), this_msn(tmsn), verbose(v), keep_going_on_failure(k), messages_have_been_moved(m), last_msn(ZERO_MSN), msg_i(0) {
273 }
274
275 int operator()(const ft_msg &msg, bool is_fresh) {
276 enum ft_msg_type type = (enum ft_msg_type) msg.type();
277 MSN msn = msg.msn();
278 XIDS xid = msg.xids();
279 const void *key = msg.kdbt()->data;
280 const void *data = msg.vdbt()->data;
281 uint32_t keylen = msg.kdbt()->size;
282 uint32_t datalen = msg.vdbt()->size;
283
284 int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid,
285 curr_less_pivot,
286 curr_geq_pivot);
287 VERIFY_ASSERTION(r == 0, msg_i, "A message in the buffer is out of place");
288 VERIFY_ASSERTION((msn.msn > last_msn.msn), msg_i, "msn per msg must be monotonically increasing toward newer messages in buffer");
289 VERIFY_ASSERTION((msn.msn <= this_msn.msn), msg_i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory");
290 if (ft_msg_type_applies_once(type)) {
291 int count;
292 DBT keydbt;
293 toku_fill_dbt(&keydbt, key, keylen);
294 int total_count = 0;
295 count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree, toku_fill_dbt(&keydbt, key, keylen), msn);
296 total_count += count;
297 if (is_fresh) {
298 VERIFY_ASSERTION(count == 1, msg_i, "a fresh message was not found in the fresh message tree");
299 } else if (messages_have_been_moved) {
300 VERIFY_ASSERTION(count == 0, msg_i, "a stale message was found in the fresh message tree");
301 }
302 VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the fresh message tree");
303 count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree, &keydbt, msn);
304
305 total_count += count;
306 if (is_fresh) {
307 VERIFY_ASSERTION(count == 0, msg_i, "a fresh message was found in the stale message tree");
308 } else if (messages_have_been_moved) {
309 VERIFY_ASSERTION(count == 1, msg_i, "a stale message was not found in the stale message tree");
310 }
311 VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the stale message tree");
312
313 VERIFY_ASSERTION(total_count <= 1, msg_i, "a message was found in both message trees (or more than once in a single tree)");
314 VERIFY_ASSERTION(total_count >= 1, msg_i, "a message was not found in either message tree");
315 } else {
316 VERIFY_ASSERTION(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type), msg_i, "a message was found that does not apply either to all or to only one key");
317 struct count_msgs_extra extra = { .count = 0, .msn = msn, .msg_buffer = &bnc->msg_buffer };
318 bnc->broadcast_list.iterate<struct count_msgs_extra, count_msgs>(&extra);
319 VERIFY_ASSERTION(extra.count == 1, msg_i, "a broadcast message was not found in the broadcast list");
320 }
321 last_msn = msn;
322 msg_i++;
323done:
324 return result;
325 }
326};
327
328static int
329toku_verify_ftnode_internal(FT_HANDLE ft_handle,
330 MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
331 FTNODE node, int height,
332 const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
333 const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
334 int verbose, int keep_going_on_failure, bool messages_have_been_moved)
335{
336 int result=0;
337 MSN this_msn;
338 BLOCKNUM blocknum = node->blocknum;
339
340 //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
341 toku_ftnode_assert_fully_in_memory(node);
342 this_msn = node->max_msn_applied_to_node_on_disk;
343
344 if (height >= 0) {
345 invariant(height == node->height); // this is a bad failure if wrong
346 }
347 if (node->height > 0 && messages_exist_above) {
348 VERIFY_ASSERTION((parentmsn_with_messages.msn >= this_msn.msn), 0, "node msn must be descending down tree, newest messages at top");
349 }
350 // Verify that all the pivot keys are in order.
351 for (int i = 0; i < node->n_children-2; i++) {
352 DBT x, y;
353 int compare = compare_pairs(ft_handle, node->pivotkeys.fill_pivot(i, &x), node->pivotkeys.fill_pivot(i + 1, &y));
354 VERIFY_ASSERTION(compare < 0, i, "Value is >= the next value");
355 }
356 // Verify that all the pivot keys are lesser_pivot < pivot <= greatereq_pivot
357 for (int i = 0; i < node->n_children-1; i++) {
358 DBT x;
359 if (lesser_pivot) {
360 int compare = compare_pairs(ft_handle, lesser_pivot, node->pivotkeys.fill_pivot(i, &x));
361 VERIFY_ASSERTION(compare < 0, i, "Pivot is >= the lower-bound pivot");
362 }
363 if (greatereq_pivot) {
364 int compare = compare_pairs(ft_handle, greatereq_pivot, node->pivotkeys.fill_pivot(i, &x));
365 VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot");
366 }
367 }
368
369 for (int i = 0; i < node->n_children; i++) {
370 DBT x, y;
371 const DBT *curr_less_pivot = (i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x);
372 const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y);
373 if (node->height > 0) {
374 NONLEAF_CHILDINFO bnc = BNC(node, i);
375 // Verify that messages in the buffers are in the right place.
376 VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree) == 0, i, "fresh_message_tree");
377 VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree) == 0, i, "stale_message_tree");
378
379 verify_msg_fn verify_msg(ft_handle, bnc, curr_less_pivot, curr_geq_pivot,
380 blocknum, this_msn, verbose, keep_going_on_failure, messages_have_been_moved);
381 int r = bnc->msg_buffer.iterate(verify_msg);
382 if (r != 0) { result = r; goto done; }
383
384 struct verify_message_tree_extra extra = { .msg_buffer = &bnc->msg_buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = node->blocknum, .keep_going_on_failure = keep_going_on_failure, .messages_have_been_moved = messages_have_been_moved };
385 r = bnc->fresh_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
386 if (r != 0) { result = r; goto done; }
387 extra.is_fresh = false;
388 r = bnc->stale_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
389 if (r != 0) { result = r; goto done; }
390
391 bnc->fresh_message_tree.verify_marks_consistent();
392 if (messages_have_been_moved) {
393 VERIFY_ASSERTION(!bnc->fresh_message_tree.has_marks(), i, "fresh message tree still has marks after moving messages");
394 r = bnc->fresh_message_tree.iterate_over_marked<void, error_on_iter>(nullptr);
395 if (r != 0) { result = r; goto done; }
396 }
397 else {
398 r = bnc->fresh_message_tree.iterate_over_marked<struct verify_message_tree_extra, verify_marked_messages>(&extra);
399 if (r != 0) { result = r; goto done; }
400 }
401
402 extra.broadcast = true;
403 r = bnc->broadcast_list.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
404 if (r != 0) { result = r; goto done; }
405 }
406 else {
407 BASEMENTNODE bn = BLB(node, i);
408 for (uint32_t j = 0; j < bn->data_buffer.num_klpairs(); j++) {
409 VERIFY_ASSERTION((rootmsn.msn >= this_msn.msn), 0, "leaf may have latest msn, but cannot be greater than root msn");
410 DBT kdbt = get_ith_key_dbt(bn, j);
411 if (curr_less_pivot) {
412 int compare = compare_pairs(ft_handle, curr_less_pivot, &kdbt);
413 VERIFY_ASSERTION_BASEMENT(compare < 0, i, j, "The leafentry is >= the lower-bound pivot");
414 }
415 if (curr_geq_pivot) {
416 int compare = compare_pairs(ft_handle, curr_geq_pivot, &kdbt);
417 VERIFY_ASSERTION_BASEMENT(compare >= 0, i, j, "The leafentry is < the upper-bound pivot");
418 }
419 if (0 < j) {
420 DBT prev_key_dbt = get_ith_key_dbt(bn, j-1);
421 int compare = compare_pairs(ft_handle, &prev_key_dbt, &kdbt);
422 VERIFY_ASSERTION_BASEMENT(compare < 0, i, j, "Adjacent leafentries are out of order");
423 }
424 }
425 }
426 }
427
428done:
429 return result;
430}
431
432
433// input is a pinned node, on exit, node is unpinned
434int
435toku_verify_ftnode (FT_HANDLE ft_handle,
436 MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
437 FTNODE node, int height,
438 const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
439 const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
440 int (*progress_callback)(void *extra, float progress), void *progress_extra,
441 int recurse, int verbose, int keep_going_on_failure)
442{
443 MSN this_msn;
444
445 //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
446 toku_ftnode_assert_fully_in_memory(node);
447 this_msn = node->max_msn_applied_to_node_on_disk;
448
449 int result = 0;
450 int result2 = 0;
451 if (node->height > 0) {
452 // Otherwise we'll just do the next call
453
454 result = toku_verify_ftnode_internal(
455 ft_handle, rootmsn, parentmsn_with_messages, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
456 verbose, keep_going_on_failure, false);
457 if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
458 }
459 if (node->height > 0) {
460 toku_move_ftnode_messages_to_stale(ft_handle->ft, node);
461 }
462 result2 = toku_verify_ftnode_internal(
463 ft_handle, rootmsn, parentmsn_with_messages, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
464 verbose, keep_going_on_failure, true);
465 if (result == 0) {
466 result = result2;
467 if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
468 }
469
470 // Verify that the subtrees have the right properties.
471 if (recurse && node->height > 0) {
472 for (int i = 0; i < node->n_children; i++) {
473 FTNODE child_node;
474 toku_get_node_for_verify(BP_BLOCKNUM(node, i), ft_handle, &child_node);
475 DBT x, y;
476 int r = toku_verify_ftnode(ft_handle, rootmsn,
477 (toku_bnc_n_entries(BNC(node, i)) > 0
478 ? this_msn
479 : parentmsn_with_messages),
480 messages_exist_above || toku_bnc_n_entries(BNC(node, i)) > 0,
481 child_node, node->height-1,
482 (i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x),
483 (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y),
484 progress_callback, progress_extra,
485 recurse, verbose, keep_going_on_failure);
486 if (r) {
487 result = r;
488 if (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR) goto done;
489 }
490 }
491 }
492done:
493 toku_unpin_ftnode(ft_handle->ft, node);
494
495 if (result == 0 && progress_callback)
496 result = progress_callback(progress_extra, 0.0);
497
498 return result;
499}
500
501int
502toku_verify_ft_with_progress (FT_HANDLE ft_handle, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_on_going) {
503 assert(ft_handle->ft);
504 FTNODE root_node = NULL;
505 {
506 uint32_t root_hash;
507 CACHEKEY root_key;
508 toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &root_hash);
509 toku_get_node_for_verify(root_key, ft_handle, &root_node);
510 }
511 int r = toku_verify_ftnode(ft_handle, ft_handle->ft->h->max_msn_in_ft, ft_handle->ft->h->max_msn_in_ft, false, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
512 if (r == 0) {
513 toku_ft_lock(ft_handle->ft);
514 ft_handle->ft->h->time_of_last_verification = time(NULL);
515 ft_handle->ft->h->dirty = 1;
516 toku_ft_unlock(ft_handle->ft);
517 }
518 return r;
519}
520
521int
522toku_verify_ft (FT_HANDLE ft_handle) {
523 return toku_verify_ft_with_progress(ft_handle, NULL, NULL, 0, 0);
524}
525