1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include "ft/serialize/block_table.h"
41#include "ft/ft.h"
42#include "ft/ft-cachetable-wrappers.h"
43#include "ft/ft-internal.h"
44#include "ft/logger/log-internal.h"
45#include "ft/log_header.h"
46#include "ft/node.h"
47#include "ft/serialize/ft-serialize.h"
48#include "ft/serialize/ft_node-serialize.h"
49
50#include <memory.h>
51#include <toku_assert.h>
52#include <portability/toku_atomic.h>
53
54toku_instr_key *ft_ref_lock_mutex_key;
55
56void toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
57 // Reset the root_xid_that_created field to the given value.
58 // This redefines which xid created the dictionary.
59
60 // hold lock around setting and clearing of dirty bit
61 // (see cooperative use of dirty bit in ft_begin_checkpoint())
62 toku_ft_lock(ft);
63 ft->h->root_xid_that_created = new_root_xid_that_created;
64 ft->h->dirty = 1;
65 toku_ft_unlock(ft);
66}
67
68static void
69ft_destroy(FT ft) {
70 //header and checkpoint_header have same Blocktable pointer
71 //cannot destroy since it is still in use by CURRENT
72 assert(ft->h->type == FT_CURRENT);
73 ft->blocktable.destroy();
74 ft->cmp.destroy();
75 toku_destroy_dbt(&ft->descriptor.dbt);
76 toku_destroy_dbt(&ft->cmp_descriptor.dbt);
77 toku_ft_destroy_reflock(ft);
78 toku_free(ft->h);
79}
80
81// Make a copy of the header for the purpose of a checkpoint
82// Not reentrant for a single FT.
83// See ft_checkpoint for explanation of why
84// FT lock must be held.
85static void
86ft_copy_for_checkpoint_unlocked(FT ft, LSN checkpoint_lsn) {
87 assert(ft->h->type == FT_CURRENT);
88 assert(ft->checkpoint_header == NULL);
89
90 FT_HEADER XMEMDUP(ch, ft->h);
91 ch->type = FT_CHECKPOINT_INPROGRESS; //Different type
92 //printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn);
93 ch->checkpoint_lsn = checkpoint_lsn;
94
95 //ch->blocktable is SHARED between the two headers
96 ft->checkpoint_header = ch;
97}
98
99void
100toku_ft_free (FT ft) {
101 ft_destroy(ft);
102 toku_free(ft);
103}
104
105void toku_ft_init_reflock(FT ft) {
106 toku_mutex_init(*ft_ref_lock_mutex_key, &ft->ft_ref_lock, nullptr);
107}
108
109void toku_ft_destroy_reflock(FT ft) { toku_mutex_destroy(&ft->ft_ref_lock); }
110
111void
112toku_ft_grab_reflock(FT ft) {
113 toku_mutex_lock(&ft->ft_ref_lock);
114}
115
116void
117toku_ft_release_reflock(FT ft) {
118 toku_mutex_unlock(&ft->ft_ref_lock);
119}
120
121/////////////////////////////////////////////////////////////////////////
122// Start of Functions that are callbacks to the cachefule
123//
124
125// maps to cf->log_fassociate_during_checkpoint
126static void
127ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
128 FT ft = (FT) header_v;
129 char* fname_in_env = toku_cachefile_fname_in_env(cf);
130 BYTESTRING bs = { .len = (uint32_t) strlen(fname_in_env), // don't include the NUL
131 .data = fname_in_env };
132 TOKULOGGER logger = toku_cachefile_logger(cf);
133 FILENUM filenum = toku_cachefile_filenum(cf);
134 bool unlink_on_close = toku_cachefile_is_unlink_on_close(cf);
135 toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
136}
137
138// Maps to cf->begin_checkpoint_userdata
139// Create checkpoint-in-progress versions of header and translation (btt)
140// Has access to fd (it is protected).
141//
142// Not reentrant for a single FT (see ft_checkpoint)
143static void ft_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
144 FT ft = (FT) header_v;
145 // hold lock around copying and clearing of dirty bit
146 toku_ft_lock (ft);
147 assert(ft->h->type == FT_CURRENT);
148 assert(ft->checkpoint_header == NULL);
149 ft_copy_for_checkpoint_unlocked(ft, checkpoint_lsn);
150 ft->h->dirty = 0; // this is only place this bit is cleared (in currentheader)
151 ft->blocktable.note_start_checkpoint_unlocked();
152 toku_ft_unlock (ft);
153}
154
155// #4922: Hack to remove data corruption race condition.
156// Reading (and upgrading) a node up to version 19 causes this.
157// We COULD skip this if we know that no nodes remained (as of last checkpoint)
158// that are below version 19.
159// If there are no nodes < version 19 this is harmless (field is unused).
160// If there are, this will make certain the value is at least as low as necessary,
161// and not much lower. (Too low is good, too high can cause data corruption).
162// TODO(yoni): If we ever stop supporting upgrades of nodes < version 19 we can delete this.
163// TODO(yoni): If we know no nodes are left to upgrade, we can skip this. (Probably not worth doing).
164static void
165ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) {
166 if (ft->h->layout_version_original < FT_LAYOUT_VERSION_19) {
167 ft->checkpoint_header->highest_unused_msn_for_upgrade = ft->h->highest_unused_msn_for_upgrade;
168 }
169}
170
171// maps to cf->checkpoint_userdata
172// Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
173// Copy current header's version of checkpoint_staging stat64info to checkpoint header.
174// Must have access to fd (protected).
175// Requires: all pending bits are clear. This implies that no thread will modify the checkpoint_staging
176// version of the stat64info.
177//
178// No locks are taken for checkpoint_count/lsn because this is single threaded. Can be called by:
179// - ft_close
180// - end_checkpoint
181// checkpoints hold references to FTs and so they cannot be closed during a checkpoint.
182// ft_close is not reentrant for a single FT
183// end_checkpoint is not reentrant period
184static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
185 FT ft = (FT) header_v;
186 FT_HEADER ch = ft->checkpoint_header;
187 assert(ch);
188 assert(ch->type == FT_CHECKPOINT_INPROGRESS);
189 if (ch->dirty) { // this is only place this bit is tested (in checkpoint_header)
190 TOKULOGGER logger = toku_cachefile_logger(cf);
191 if (logger) {
192 toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
193 }
194 uint64_t now = (uint64_t) time(NULL);
195 ft->h->time_of_last_modification = now;
196 ch->time_of_last_modification = now;
197 ch->checkpoint_count++;
198 ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
199 ch->on_disk_logical_rows =
200 ft->h->on_disk_logical_rows = ft->in_memory_logical_rows;
201
202 // write translation and header to disk (or at least to OS internal buffer)
203 toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf);
204 ch->dirty = 0; // this is only place this bit is cleared (in checkpoint_header)
205
206 // fsync the cachefile
207 toku_cachefile_fsync(cf);
208 ft->h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
209 ft->h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated.
210 } else {
211 ft->blocktable.note_skipped_checkpoint();
212 }
213}
214
215// maps to cf->end_checkpoint_userdata
216// free unused disk space
217// (i.e. tell BlockAllocator to liberate blocks used by previous checkpoint).
218// Must have access to fd (protected)
219static void ft_end_checkpoint(CACHEFILE UU(cf), int fd, void *header_v) {
220 FT ft = (FT) header_v;
221 assert(ft->h->type == FT_CURRENT);
222 ft->blocktable.note_end_checkpoint(fd);
223 toku_free(ft->checkpoint_header);
224 ft->checkpoint_header = nullptr;
225}
226
227// maps to cf->close_userdata
228// Has access to fd (it is protected).
229static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN oplsn) {
230 FT ft = (FT) header_v;
231 assert(ft->h->type == FT_CURRENT);
232 // We already have exclusive access to this field already, so skip the locking.
233 // This should already never fail.
234 invariant(!toku_ft_needed_unlocked(ft));
235 assert(ft->cf == cachefile);
236 TOKULOGGER logger = toku_cachefile_logger(cachefile);
237 LSN lsn = ZERO_LSN;
238 //Get LSN
239 if (oplsn_valid) {
240 //Use recovery-specified lsn
241 lsn = oplsn;
242 //Recovery cannot reduce lsn of a header.
243 if (lsn.lsn < ft->h->checkpoint_lsn.lsn) {
244 lsn = ft->h->checkpoint_lsn;
245 }
246 }
247 else {
248 //Get LSN from logger
249 lsn = ZERO_LSN; // if there is no logger, we use zero for the lsn
250 if (logger) {
251 char* fname_in_env = toku_cachefile_fname_in_env(cachefile);
252 assert(fname_in_env);
253 BYTESTRING bs = {.len=(uint32_t) strlen(fname_in_env), .data=fname_in_env};
254 if (!toku_cachefile_is_skip_log_recover_on_close(cachefile)) {
255 toku_log_fclose(
256 logger,
257 &lsn,
258 ft->h->dirty,
259 bs,
260 toku_cachefile_filenum(cachefile)); // flush the log on
261 // close (if new header
262 // is being written),
263 // otherwise it might
264 // not make it out.
265 toku_cachefile_do_log_recover_on_close(cachefile);
266 }
267 }
268 }
269 if (ft->h->dirty) { // this is the only place this bit is tested (in currentheader)
270 bool do_checkpoint = true;
271 if (logger && logger->rollback_cachefile == cachefile) {
272 do_checkpoint = false;
273 }
274 if (do_checkpoint) {
275 ft_begin_checkpoint(lsn, header_v);
276 ft_checkpoint(cachefile, fd, ft);
277 ft_end_checkpoint(cachefile, fd, header_v);
278 assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
279 }
280 }
281}
282
283// maps to cf->free_userdata
284static void ft_free(CACHEFILE cachefile UU(), void *header_v) {
285 FT ft = (FT) header_v;
286 toku_ft_free(ft);
287}
288
289// maps to cf->note_pin_by_checkpoint
290//Must be protected by ydb lock.
291//Is only called by checkpoint begin, which holds it
292static void ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
293 // Note: open_close lock is held by checkpoint begin
294 FT ft = (FT) header_v;
295 toku_ft_grab_reflock(ft);
296 assert(!ft->pinned_by_checkpoint);
297 assert(toku_ft_needed_unlocked(ft));
298 ft->pinned_by_checkpoint = true;
299 toku_ft_release_reflock(ft);
300}
301
302// Requires: the reflock is held.
303static void unpin_by_checkpoint_callback(FT ft, void *extra) {
304 invariant(extra == NULL);
305 invariant(ft->pinned_by_checkpoint);
306 ft->pinned_by_checkpoint = false;
307}
308
309// maps to cf->note_unpin_by_checkpoint
310//Must be protected by ydb lock.
311//Called by end_checkpoint, which grabs ydb lock around note_unpin
312static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
313 FT ft = (FT) header_v;
314 toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
315}
316
317//
318// End of Functions that are callbacks to the cachefile
319/////////////////////////////////////////////////////////////////////////
320
321static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
322 FTNODE XCALLOC(node);
323 toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->flags);
324 BP_STATE(node,0) = PT_AVAIL;
325
326 uint32_t fullhash = toku_cachetable_hash(ft->cf, blocknum);
327 node->fullhash = fullhash;
328 toku_cachetable_put(ft->cf, blocknum, fullhash,
329 node, make_ftnode_pair_attr(node),
330 get_write_callbacks_for_node(ft),
331 toku_ftnode_save_ct_pair);
332 toku_unpin_ftnode(ft, node);
333}
334
335static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
336 // fake, prevent unnecessary upgrade logic
337 ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
338 ft->checkpoint_header = NULL;
339
340 toku_list_init(&ft->live_ft_handles);
341
342 // intuitively, the comparator points to the FT's cmp descriptor
343 ft->cmp.create(options->compare_fun, &ft->cmp_descriptor, options->memcmp_magic);
344 ft->update_fun = options->update_fun;
345
346 if (ft->cf != NULL) {
347 assert(ft->cf == cf);
348 }
349 ft->cf = cf;
350 ft->in_memory_stats = ZEROSTATS;
351
352 setup_initial_ft_root_node(ft, ft->h->root_blocknum);
353 toku_cachefile_set_userdata(ft->cf,
354 ft,
355 ft_log_fassociate_during_checkpoint,
356 ft_close,
357 ft_free,
358 ft_checkpoint,
359 ft_begin_checkpoint,
360 ft_end_checkpoint,
361 ft_note_pin_by_checkpoint,
362 ft_note_unpin_by_checkpoint);
363
364 ft->blocktable.verify_no_free_blocknums();
365}
366
367
368static FT_HEADER
369ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
370{
371 uint64_t now = (uint64_t) time(NULL);
372 struct ft_header h = {
373 .type = FT_CURRENT,
374 .dirty = 0,
375 .checkpoint_count = 0,
376 .checkpoint_lsn = ZERO_LSN,
377 .layout_version = FT_LAYOUT_VERSION,
378 .layout_version_original = FT_LAYOUT_VERSION,
379 .build_id = BUILD_ID,
380 .build_id_original = BUILD_ID,
381 .time_of_creation = now,
382 .root_xid_that_created = root_xid_that_created,
383 .time_of_last_modification = now,
384 .time_of_last_verification = 0,
385 .root_blocknum = root_blocknum,
386 .flags = options->flags,
387 .nodesize = options->nodesize,
388 .basementnodesize = options->basementnodesize,
389 .compression_method = options->compression_method,
390 .fanout = options->fanout,
391 .highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) },
392 .max_msn_in_ft = ZERO_MSN,
393 .time_of_last_optimize_begin = 0,
394 .time_of_last_optimize_end = 0,
395 .count_of_optimize_in_progress = 0,
396 .count_of_optimize_in_progress_read_from_disk = 0,
397 .msn_at_start_of_last_completed_optimize = ZERO_MSN,
398 .on_disk_stats = ZEROSTATS,
399 .on_disk_logical_rows = 0
400 };
401 return (FT_HEADER) toku_xmemdup(&h, sizeof h);
402}
403
404// allocate and initialize a fractal tree.
405void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
406 invariant(ftp);
407
408 FT XCALLOC(ft);
409 ft->h = ft_header_create(options, make_blocknum(0), (txn ? txn->txnid.parent_id64: TXNID_NONE));
410
411 toku_ft_init_reflock(ft);
412
413 // Assign blocknum for root block, also dirty the header
414 ft->blocktable.create();
415 ft->blocktable.allocate_blocknum(&ft->h->root_blocknum, ft);
416
417 ft_init(ft, options, cf);
418
419 *ftp = ft;
420}
421
422// TODO: (Zardosht) get rid of ft parameter
423int toku_read_ft_and_store_in_cachefile (FT_HANDLE ft_handle, CACHEFILE cf, LSN max_acceptable_lsn, FT *header)
424// If the cachefile already has the header, then just get it.
425// If the cachefile has not been initialized, then don't modify anything.
426// max_acceptable_lsn is the latest acceptable checkpointed version of the file.
427{
428 FT ft = nullptr;
429 if ((ft = (FT) toku_cachefile_get_userdata(cf)) != nullptr) {
430 *header = ft;
431 assert(ft_handle->options.update_fun == ft->update_fun);
432 return 0;
433 }
434
435 int fd = toku_cachefile_get_fd(cf);
436 const char *fn = toku_cachefile_fname_in_env(cf);
437 int r = toku_deserialize_ft_from(fd, fn, max_acceptable_lsn, &ft);
438 if (r == TOKUDB_BAD_CHECKSUM) {
439 fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf));
440 assert(false); // make absolutely sure we crash before doing anything else
441 } else if (r != 0) {
442 return r;
443 }
444
445 invariant_notnull(ft);
446 // intuitively, the comparator points to the FT's cmp descriptor
447 ft->cmp.create(ft_handle->options.compare_fun, &ft->cmp_descriptor, ft_handle->options.memcmp_magic);
448 ft->update_fun = ft_handle->options.update_fun;
449 ft->cf = cf;
450 toku_cachefile_set_userdata(cf,
451 reinterpret_cast<void *>(ft),
452 ft_log_fassociate_during_checkpoint,
453 ft_close,
454 ft_free,
455 ft_checkpoint,
456 ft_begin_checkpoint,
457 ft_end_checkpoint,
458 ft_note_pin_by_checkpoint,
459 ft_note_unpin_by_checkpoint);
460 *header = ft;
461 return 0;
462}
463
464void
465toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) {
466 toku_ft_grab_reflock(ft);
467 live->ft = ft;
468 toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link);
469 toku_ft_release_reflock(ft);
470}
471
472// the reference count for a ft is the number of txn's that
473// touched it plus the number of open handles plus one if
474// pinned by a checkpoint.
475static int
476ft_get_reference_count(FT ft) {
477 uint32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
478 int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
479 return pinned_by_checkpoint + ft->num_txns + num_handles;
480}
481
482// a ft is needed in memory iff its reference count is non-zero
483bool
484toku_ft_needed_unlocked(FT ft) {
485 return ft_get_reference_count(ft) != 0;
486}
487
488// get the reference count and return true if it was 1
489bool
490toku_ft_has_one_reference_unlocked(FT ft) {
491 return ft_get_reference_count(ft) == 1;
492}
493
494// evict a ft from memory by closing its cachefile. any future work
495// will have to read in the ft in a new cachefile and new FT object.
496void toku_ft_evict_from_memory(FT ft, bool oplsn_valid, LSN oplsn) {
497 assert(ft->cf);
498 toku_cachefile_close(&ft->cf, oplsn_valid, oplsn);
499}
500
501// Verifies there exists exactly one ft handle and returns it.
502FT_HANDLE toku_ft_get_only_existing_ft_handle(FT ft) {
503 FT_HANDLE ft_handle_ret = NULL;
504 toku_ft_grab_reflock(ft);
505 assert(toku_list_num_elements_est(&ft->live_ft_handles) == 1);
506 ft_handle_ret = toku_list_struct(toku_list_head(&ft->live_ft_handles), struct ft_handle, live_ft_handle_link);
507 toku_ft_release_reflock(ft);
508 return ft_handle_ret;
509}
510
511// Purpose: set fields in ft_header to capture accountability info for start of HOT optimize.
512// Note: HOT accountability variables in header are modified only while holding header lock.
513// (Header lock is really needed for touching the dirty bit, but it's useful and
514// convenient here for keeping the HOT variables threadsafe.)
515void
516toku_ft_note_hot_begin(FT_HANDLE ft_handle) {
517 FT ft = ft_handle->ft;
518 time_t now = time(NULL);
519
520 // hold lock around setting and clearing of dirty bit
521 // (see cooperative use of dirty bit in ft_begin_checkpoint())
522 toku_ft_lock(ft);
523 ft->h->time_of_last_optimize_begin = now;
524 ft->h->count_of_optimize_in_progress++;
525 ft->h->dirty = 1;
526 toku_ft_unlock(ft);
527}
528
529
530// Purpose: set fields in ft_header to capture accountability info for end of HOT optimize.
531// Note: See note for toku_ft_note_hot_begin().
532void
533toku_ft_note_hot_complete(FT_HANDLE ft_handle, bool success, MSN msn_at_start_of_hot) {
534 FT ft = ft_handle->ft;
535 time_t now = time(NULL);
536
537 toku_ft_lock(ft);
538 ft->h->count_of_optimize_in_progress--;
539 if (success) {
540 ft->h->time_of_last_optimize_end = now;
541 ft->h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
542 // If we just successfully completed an optimization and no other thread is performing
543 // an optimization, then the number of optimizations in progress is zero.
544 // If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress
545 // would be reset to zero on the disk after recovery from that crash.
546 if (ft->h->count_of_optimize_in_progress == ft->h->count_of_optimize_in_progress_read_from_disk)
547 ft->h->count_of_optimize_in_progress = 0;
548 }
549 ft->h->dirty = 1;
550 toku_ft_unlock(ft);
551}
552
553
554void
555toku_ft_init(FT ft,
556 BLOCKNUM root_blocknum_on_disk,
557 LSN checkpoint_lsn,
558 TXNID root_xid_that_created,
559 uint32_t target_nodesize,
560 uint32_t target_basementnodesize,
561 enum toku_compression_method compression_method,
562 uint32_t fanout
563 )
564{
565 memset(ft, 0, sizeof *ft);
566 struct ft_options options = {
567 .nodesize = target_nodesize,
568 .basementnodesize = target_basementnodesize,
569 .compression_method = compression_method,
570 .fanout = fanout,
571 .flags = 0,
572 .memcmp_magic = 0,
573 .compare_fun = NULL,
574 .update_fun = NULL
575 };
576 ft->h = ft_header_create(&options, root_blocknum_on_disk, root_xid_that_created);
577 ft->h->checkpoint_count = 1;
578 ft->h->checkpoint_lsn = checkpoint_lsn;
579}
580
581// Open an ft for use by redirect. The new ft must have the same dict_id as the old_ft passed in. (FILENUM is assigned by the ft_handle_open() function.)
582static int
583ft_handle_open_for_redirect(FT_HANDLE *new_ftp, const char *fname_in_env, TOKUTXN txn, FT old_ft) {
584 FT_HANDLE ft_handle;
585 assert(old_ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
586 toku_ft_handle_create(&ft_handle);
587 toku_ft_set_bt_compare(ft_handle, old_ft->cmp.get_compare_func());
588 toku_ft_set_update(ft_handle, old_ft->update_fun);
589 toku_ft_handle_set_nodesize(ft_handle, old_ft->h->nodesize);
590 toku_ft_handle_set_basementnodesize(ft_handle, old_ft->h->basementnodesize);
591 toku_ft_handle_set_compression_method(ft_handle, old_ft->h->compression_method);
592 toku_ft_handle_set_fanout(ft_handle, old_ft->h->fanout);
593 CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
594 int r = toku_ft_handle_open_with_dict_id(ft_handle, fname_in_env, 0, 0, ct, txn, old_ft->dict_id);
595 if (r != 0) {
596 goto cleanup;
597 }
598 assert(ft_handle->ft->dict_id.dictid == old_ft->dict_id.dictid);
599 *new_ftp = ft_handle;
600
601 cleanup:
602 if (r != 0) {
603 toku_ft_handle_close(ft_handle);
604 }
605 return r;
606}
607
608// This function performs most of the work to redirect a dictionary to different file.
609// It is called for redirect and to abort a redirect. (This function is almost its own inverse.)
610static int
611dictionary_redirect_internal(const char *dst_fname_in_env, FT src_ft, TOKUTXN txn, FT *dst_ftp) {
612 int r;
613
614 FILENUM src_filenum = toku_cachefile_filenum(src_ft->cf);
615 FILENUM dst_filenum = FILENUM_NONE;
616
617 FT dst_ft = NULL;
618 struct toku_list *list;
619 // open a dummy ft based off of
620 // dst_fname_in_env to get the header
621 // then we will change all the ft's to have
622 // their headers point to dst_ft instead of src_ft
623 FT_HANDLE tmp_dst_ft = NULL;
624 r = ft_handle_open_for_redirect(&tmp_dst_ft, dst_fname_in_env, txn, src_ft);
625 if (r != 0) {
626 goto cleanup;
627 }
628 dst_ft = tmp_dst_ft->ft;
629
630 // some sanity checks on dst_filenum
631 dst_filenum = toku_cachefile_filenum(dst_ft->cf);
632 assert(dst_filenum.fileid!=FILENUM_NONE.fileid);
633 assert(dst_filenum.fileid!=src_filenum.fileid); //Cannot be same file.
634
635 // for each live ft_handle, ft_handle->ft is currently src_ft
636 // we want to change it to dummy_dst
637 toku_ft_grab_reflock(src_ft);
638 while (!toku_list_empty(&src_ft->live_ft_handles)) {
639 list = src_ft->live_ft_handles.next;
640 FT_HANDLE src_handle = NULL;
641 src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
642
643 toku_list_remove(&src_handle->live_ft_handle_link);
644
645 toku_ft_note_ft_handle_open(dst_ft, src_handle);
646 if (src_handle->redirect_callback) {
647 src_handle->redirect_callback(src_handle, src_handle->redirect_callback_extra);
648 }
649 }
650 assert(dst_ft);
651 // making sure that we are not leaking src_ft
652 assert(toku_ft_needed_unlocked(src_ft));
653 toku_ft_release_reflock(src_ft);
654
655 toku_ft_handle_close(tmp_dst_ft);
656
657 *dst_ftp = dst_ft;
658cleanup:
659 return r;
660}
661
662
663
664//This is the 'abort redirect' function. The redirect of old_ft to new_ft was done
665//and now must be undone, so here we redirect new_ft back to old_ft.
666int
667toku_dictionary_redirect_abort(FT old_ft, FT new_ft, TOKUTXN txn) {
668 char *old_fname_in_env = toku_cachefile_fname_in_env(old_ft->cf);
669 int r;
670 {
671 FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
672 FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
673 assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file.
674
675 //No living fts in old header.
676 toku_ft_grab_reflock(old_ft);
677 assert(toku_list_empty(&old_ft->live_ft_handles));
678 toku_ft_release_reflock(old_ft);
679 }
680
681 FT dst_ft;
682 // redirect back from new_ft to old_ft
683 r = dictionary_redirect_internal(old_fname_in_env, new_ft, txn, &dst_ft);
684 if (r == 0) {
685 assert(dst_ft == old_ft);
686 }
687 return r;
688}
689
690/****
691 * on redirect or abort:
692 * if redirect txn_note_doing_work(txn)
693 * if redirect connect src ft to txn (txn modified this ft)
694 * for each src ft
695 * open ft to dst file (create new ft struct)
696 * if redirect connect dst ft to txn
697 * redirect db to new ft
698 * redirect cursors to new ft
699 * close all src fts
700 * if redirect make rollback log entry
701 *
702 * on commit:
703 * nothing to do
704 *
705 *****/
706
707int
708toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKUTXN txn) {
709// Input args:
710// new file name for dictionary (relative to env)
711// old_ft_h is a live ft of open handle ({DB, FT_HANDLE} pair) that currently refers to old dictionary file.
712// (old_ft_h may be one of many handles to the dictionary.)
713// txn that created the loader
714// Requires:
715// multi operation lock is held.
716// The ft is open. (which implies there can be no zombies.)
717// The new file must be a valid dictionary.
718// The block size and flags in the new file must match the existing FT.
719// The new file must already have its descriptor in it (and it must match the existing descriptor).
720// Effect:
721// Open new FTs (and related header and cachefile) to the new dictionary file with a new FILENUM.
722// Redirect all DBs that point to fts that point to the old file to point to fts that point to the new file.
723// Copy the dictionary id (dict_id) from the header of the original file to the header of the new file.
724// Create a rollback log entry.
725// The original FT, header, cachefile and file remain unchanged. They will be cleaned up on commmit.
726// If the txn aborts, then this operation will be undone
727 int r;
728
729 FT old_ft = old_ft_h->ft;
730
731 // dst file should not be open. (implies that dst and src are different because src must be open.)
732 {
733 CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
734 CACHEFILE cf;
735 r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
736 if (r==0) {
737 r = EINVAL;
738 goto cleanup;
739 }
740 assert(r==ENOENT);
741 r = 0;
742 }
743
744 if (txn) {
745 toku_txn_maybe_note_ft(txn, old_ft); // mark old ft as touched by this txn
746 }
747
748 FT new_ft;
749 r = dictionary_redirect_internal(dst_fname_in_env, old_ft, txn, &new_ft);
750 if (r != 0) {
751 goto cleanup;
752 }
753
754 // make rollback log entry
755 if (txn) {
756 toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
757
758 // There is no recovery log entry for redirect,
759 // and rollback log entries are not allowed for read-only transactions.
760 // Normally the recovery log entry would ensure the begin was logged.
761 if (!txn->begin_was_logged) {
762 toku_maybe_log_begin_txn_for_write_operation(txn);
763 }
764 FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
765 FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
766 toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
767 }
768
769cleanup:
770 return r;
771}
772
773// Insert reference to transaction into ft
774void
775toku_ft_add_txn_ref(FT ft) {
776 toku_ft_grab_reflock(ft);
777 ++ft->num_txns;
778 toku_ft_release_reflock(ft);
779}
780
781static void
782remove_txn_ref_callback(FT ft, void *UU(context)) {
783 invariant(ft->num_txns > 0);
784 --ft->num_txns;
785}
786
787void
788toku_ft_remove_txn_ref(FT ft) {
789 toku_ft_remove_reference(ft, false, ZERO_LSN, remove_txn_ref_callback, NULL);
790}
791
792void toku_calculate_root_offset_pointer (
793 FT ft,
794 CACHEKEY* root_key,
795 uint32_t *roothash
796 )
797{
798 *roothash = toku_cachetable_hash(ft->cf, ft->h->root_blocknum);
799 *root_key = ft->h->root_blocknum;
800}
801
802void toku_ft_set_new_root_blocknum(
803 FT ft,
804 CACHEKEY new_root_key
805 )
806{
807 ft->h->root_blocknum = new_root_key;
808}
809
810LSN toku_ft_checkpoint_lsn(FT ft) {
811 return ft->h->checkpoint_lsn;
812}
813
814void
815toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
816 s->fsize = toku_cachefile_size(ft->cf);
817 // just use the in memory stats from the header
818 // prevent appearance of negative numbers for numrows, numbytes
819 // if the logical count was never properly re-counted on an upgrade,
820 // return the existing physical count instead.
821 int64_t n;
822 if (ft->in_memory_logical_rows == (uint64_t)-1) {
823 n = ft->in_memory_stats.numrows;
824 } else {
825 n = ft->in_memory_logical_rows;
826 }
827 if (n < 0) {
828 n = 0;
829 }
830 s->nkeys = s->ndata = n;
831 n = ft->in_memory_stats.numbytes;
832 if (n < 0) {
833 n = 0;
834 }
835 s->dsize = n;
836 s->create_time_sec = ft->h->time_of_creation;
837 s->modify_time_sec = ft->h->time_of_last_modification;
838 s->verify_time_sec = ft->h->time_of_last_verification;
839}
840
841void toku_ft_get_fractal_tree_info64(FT ft, struct ftinfo64 *info) {
842 ft->blocktable.get_info64(info);
843}
844
845int toku_ft_iterate_fractal_tree_block_map(FT ft, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
846 uint64_t this_checkpoint_count = ft->h->checkpoint_count;
847 return ft->blocktable.iterate_translation_tables(this_checkpoint_count, iter, iter_extra);
848}
849
850void
851toku_ft_update_descriptor(FT ft, DESCRIPTOR desc)
852// Effect: Changes the descriptor in a tree (log the change, make sure it makes it to disk eventually).
853// requires: the ft is fully user-opened with a valid cachefile.
854// descriptor updates cannot happen in parallel for an FT
855// (ydb layer uses a row lock to enforce this)
856{
857 assert(ft->cf);
858 int fd = toku_cachefile_get_fd(ft->cf);
859 toku_ft_update_descriptor_with_fd(ft, desc, fd);
860}
861
862// upadate the descriptor for an ft and serialize it using
863// the given descriptor instead of reading the descriptor
864// from the ft's cachefile. we do this so serialize code can
865// update a descriptor before the ft is fully opened and has
866// a valid cachefile.
867void
868toku_ft_update_descriptor_with_fd(FT ft, DESCRIPTOR desc, int fd) {
869 // the checksum is four bytes, so that's where the magic number comes from
870 // make space for the new descriptor and write it out to disk
871 DISKOFF offset, size;
872 size = toku_serialize_descriptor_size(desc) + 4;
873 ft->blocktable.realloc_descriptor_on_disk(size, &offset, ft, fd);
874 toku_serialize_descriptor_contents_to_fd(fd, desc, offset);
875
876 // cleanup the old descriptor and set the in-memory descriptor to the new one
877 toku_destroy_dbt(&ft->descriptor.dbt);
878 toku_clone_dbt(&ft->descriptor.dbt, desc->dbt);
879}
880
881void toku_ft_update_cmp_descriptor(FT ft) {
882 // cleanup the old cmp descriptor and clone it as the in-memory descriptor
883 toku_destroy_dbt(&ft->cmp_descriptor.dbt);
884 toku_clone_dbt(&ft->cmp_descriptor.dbt, ft->descriptor.dbt);
885}
886
887DESCRIPTOR toku_ft_get_descriptor(FT_HANDLE ft_handle) {
888 return &ft_handle->ft->descriptor;
889}
890
891DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
892 return &ft_handle->ft->cmp_descriptor;
893}
894
895void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
896 (void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows);
897 (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
898}
899
900void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
901 (void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
902 (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
903}
904
905void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
906 // In order to make sure that the correct count is returned from
907 // toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be
908 // modified from anywhere else from here with the exceptions of
909 // serializing in a header, initializing a new header and analyzing
910 // an index for a logical_row count.
911 // The gist is that on an index upgrade, all logical_rows values
912 // in the ft header are set to -1 until an analyze can reset it to an
913 // accurate value. Until then, the physical count from in_memory_stats
914 // must be returned in toku_ft_stat64.
915 if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
916 toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
917 if (ft->in_memory_logical_rows == (uint64_t)-1) {
918 toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), 1);
919 }
920 }
921}
922
923void toku_ft_remove_reference(
924 FT ft,
925 bool oplsn_valid,
926 LSN oplsn,
927 remove_ft_ref_callback remove_ref,
928 void *extra) {
929
930 toku_ft_grab_reflock(ft);
931 if (toku_ft_has_one_reference_unlocked(ft)) {
932 toku_ft_release_reflock(ft);
933
934 toku_ft_open_close_lock();
935 toku_ft_grab_reflock(ft);
936
937 remove_ref(ft, extra);
938 bool needed = toku_ft_needed_unlocked(ft);
939 toku_ft_release_reflock(ft);
940
941 // if we're running during recovery, we must close the underlying ft.
942 // we know we're running in recovery if we were passed a valid lsn.
943 if (oplsn_valid) {
944 assert(!needed);
945 }
946 if (!needed) {
947 // close header
948 toku_ft_evict_from_memory(ft, oplsn_valid, oplsn);
949 }
950
951 toku_ft_open_close_unlock();
952 }
953 else {
954 remove_ref(ft, extra);
955 toku_ft_release_reflock(ft);
956 }
957}
958
959void toku_ft_set_nodesize(FT ft, unsigned int nodesize) {
960 toku_ft_lock(ft);
961 ft->h->nodesize = nodesize;
962 ft->h->dirty = 1;
963 toku_ft_unlock(ft);
964}
965
966void toku_ft_get_nodesize(FT ft, unsigned int *nodesize) {
967 toku_ft_lock(ft);
968 *nodesize = ft->h->nodesize;
969 toku_ft_unlock(ft);
970}
971
972void toku_ft_set_basementnodesize(FT ft, unsigned int basementnodesize) {
973 toku_ft_lock(ft);
974 ft->h->basementnodesize = basementnodesize;
975 ft->h->dirty = 1;
976 toku_ft_unlock(ft);
977}
978
979void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize) {
980 toku_ft_lock(ft);
981 *basementnodesize = ft->h->basementnodesize;
982 toku_ft_unlock(ft);
983}
984
985void toku_ft_set_compression_method(FT ft, enum toku_compression_method method) {
986 toku_ft_lock(ft);
987 ft->h->compression_method = method;
988 ft->h->dirty = 1;
989 toku_ft_unlock(ft);
990}
991
992void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp) {
993 toku_ft_lock(ft);
994 *methodp = ft->h->compression_method;
995 toku_ft_unlock(ft);
996}
997
998void toku_ft_set_fanout(FT ft, unsigned int fanout) {
999 toku_ft_lock(ft);
1000 ft->h->fanout = fanout;
1001 ft->h->dirty = 1;
1002 toku_ft_unlock(ft);
1003}
1004
1005void toku_ft_get_fanout(FT ft, unsigned int *fanout) {
1006 toku_ft_lock(ft);
1007 *fanout = ft->h->fanout;
1008 toku_ft_unlock(ft);
1009}
1010
1011// mark the ft as a blackhole. any message injections will be a no op.
1012void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
1013 ft_handle->ft->blackhole = true;
1014}
1015
1016struct garbage_helper_extra {
1017 FT ft;
1018 size_t total_space;
1019 size_t used_space;
1020};
1021
1022static int
1023garbage_leafentry_helper(const void* key UU(), const uint32_t keylen, const LEAFENTRY & le, uint32_t UU(idx), struct garbage_helper_extra * const info) {
1024 //TODO #warning need to reanalyze for split
1025 info->total_space += leafentry_disksize(le) + keylen + sizeof(keylen);
1026 if (!le_latest_is_del(le)) {
1027 info->used_space += LE_CLEAN_MEMSIZE(le_latest_vallen(le)) + keylen + sizeof(keylen);
1028 }
1029 return 0;
1030}
1031
1032static int
1033garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *extra) {
1034 struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
1035 FTNODE node;
1036 FTNODE_DISK_DATA ndd;
1037 ftnode_fetch_extra bfe;
1038 bfe.create_for_full_read(info->ft);
1039 int fd = toku_cachefile_get_fd(info->ft->cf);
1040 int r = toku_deserialize_ftnode_from(fd, blocknum, 0, &node, &ndd, &bfe);
1041 if (r != 0) {
1042 goto no_node;
1043 }
1044 if (node->height > 0) {
1045 goto exit;
1046 }
1047 for (int i = 0; i < node->n_children; ++i) {
1048 bn_data* bd = BLB_DATA(node, i);
1049 r = bd->iterate<struct garbage_helper_extra, garbage_leafentry_helper>(info);
1050 if (r != 0) {
1051 goto exit;
1052 }
1053 }
1054 {
1055 float a = info->used_space, b=info->total_space;
1056 float percentage = (1 - (a / b)) * 100;
1057 printf("LeafNode# %d has %d BasementNodes and %2.1f%% of the allocated space is garbage\n", (int)blocknum.b, node->n_children, percentage);
1058 }
1059exit:
1060 toku_ftnode_free(&node);
1061 toku_free(ndd);
1062no_node:
1063 return r;
1064}
1065
1066void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space) {
1067// Effect: Iterates the FT's blocktable and calculates the total and used space for leaf blocks.
1068// Note: It is ok to call this function concurrently with reads/writes to the table since
1069// the blocktable lock is held, which means no new allocations or file writes can occur.
1070 invariant_notnull(total_space);
1071 invariant_notnull(used_space);
1072 struct garbage_helper_extra info = {
1073 .ft = ft,
1074 .total_space = 0,
1075 .used_space = 0
1076 };
1077 ft->blocktable.iterate(block_table::TRANSLATION_CHECKPOINTED, garbage_helper, &info, true, true);
1078 *total_space = info.total_space;
1079 *used_space = info.used_space;
1080}
1081
1082
1083#if !defined(TOKUDB_REVISION)
1084#error
1085#endif
1086
1087#define xstr(X) str(X)
1088#define str(X) #X
1089#define static_version_string xstr(DB_VERSION_MAJOR) "." \
1090 xstr(DB_VERSION_MINOR) "." \
1091 xstr(DB_VERSION_PATCH) " build " \
1092 xstr(TOKUDB_REVISION)
1093struct toku_product_name_strings_struct toku_product_name_strings;
1094
1095char toku_product_name[TOKU_MAX_PRODUCT_NAME_LENGTH];
1096void tokuft_update_product_name_strings(void) {
1097 // DO ALL STRINGS HERE.. maybe have a separate FT layer version as well
1098 {
1099 int n = snprintf(toku_product_name_strings.db_version,
1100 sizeof(toku_product_name_strings.db_version),
1101 "%s %s", toku_product_name, static_version_string);
1102 assert(n >= 0);
1103 assert((unsigned)n < sizeof(toku_product_name_strings.db_version));
1104 }
1105 {
1106 int n = snprintf(toku_product_name_strings.fileopsdirectory,
1107 sizeof(toku_product_name_strings.fileopsdirectory),
1108 "%s.directory", toku_product_name);
1109 assert(n >= 0);
1110 assert((unsigned)n < sizeof(toku_product_name_strings.fileopsdirectory));
1111 }
1112 {
1113 int n = snprintf(toku_product_name_strings.environmentdictionary,
1114 sizeof(toku_product_name_strings.environmentdictionary),
1115 "%s.environment", toku_product_name);
1116 assert(n >= 0);
1117 assert((unsigned)n < sizeof(toku_product_name_strings.environmentdictionary));
1118 }
1119 {
1120 int n = snprintf(toku_product_name_strings.rollback_cachefile,
1121 sizeof(toku_product_name_strings.rollback_cachefile),
1122 "%s.rollback", toku_product_name);
1123 assert(n >= 0);
1124 assert((unsigned)n < sizeof(toku_product_name_strings.rollback_cachefile));
1125 }
1126 {
1127 int n = snprintf(toku_product_name_strings.single_process_lock,
1128 sizeof(toku_product_name_strings.single_process_lock),
1129 "__%s_lock_dont_delete_me", toku_product_name);
1130 assert(n >= 0);
1131 assert((unsigned)n < sizeof(toku_product_name_strings.single_process_lock));
1132 }
1133}
1134#undef xstr
1135#undef str
1136
1137int
1138toku_single_process_lock(const char *lock_dir, const char *which, int *lockfd) {
1139 if (!lock_dir)
1140 return ENOENT;
1141 int namelen=strlen(lock_dir)+strlen(which);
1142 char lockfname[namelen+sizeof("/_") + strlen(toku_product_name_strings.single_process_lock)];
1143
1144 int l = snprintf(lockfname, sizeof(lockfname), "%s/%s_%s",
1145 lock_dir, toku_product_name_strings.single_process_lock, which);
1146 assert(l+1 == (signed)(sizeof(lockfname)));
1147 *lockfd = toku_os_lock_file(lockfname);
1148 if (*lockfd < 0) {
1149 int e = get_error_errno();
1150 fprintf(stderr, "Couldn't start tokuft because some other tokuft process is using the same directory [%s] for [%s]\n", lock_dir, which);
1151 return e;
1152 }
1153 return 0;
1154}
1155
1156int
1157toku_single_process_unlock(int *lockfd) {
1158 int fd = *lockfd;
1159 *lockfd = -1;
1160 if (fd>=0) {
1161 int r = toku_os_unlock_file(fd);
1162 if (r != 0)
1163 return get_error_errno();
1164 }
1165 return 0;
1166}
1167
1168int tokuft_num_envs = 0;
1169int
1170db_env_set_toku_product_name(const char *name) {
1171 if (tokuft_num_envs > 0) {
1172 return EINVAL;
1173 }
1174 if (!name || strlen(name) < 1) {
1175 return EINVAL;
1176 }
1177 if (strlen(name) >= sizeof(toku_product_name)) {
1178 return ENAMETOOLONG;
1179 }
1180 if (strncmp(toku_product_name, name, sizeof(toku_product_name))) {
1181 strcpy(toku_product_name, name);
1182 tokuft_update_product_name_strings();
1183 }
1184 return 0;
1185}
1186
1187