1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <memory>
40#include "ft/cachetable/cachetable.h"
41#include "ft/cachetable/checkpoint.h"
42#include "ft/ft.h"
43#include "ft/log_header.h"
44#include "ft/logger/log-internal.h"
45#include "ft/logger/logcursor.h"
46#include "ft/txn/txn_manager.h"
47#include "util/omt.h"
48
49int tokuft_recovery_trace = 0; // turn on recovery tracing, default off.
50
51//#define DO_VERIFY_COUNTS
52#ifdef DO_VERIFY_COUNTS
53#define VERIFY_COUNTS(n) toku_verify_or_set_counts(n, false)
54#else
55#define VERIFY_COUNTS(n) ((void)0)
56#endif
57
58// time in seconds between recovery progress reports
59#define TOKUFT_RECOVERY_PROGRESS_TIME 15
60time_t tokuft_recovery_progress_time = TOKUFT_RECOVERY_PROGRESS_TIME;
61
62enum ss {
63 BACKWARD_NEWER_CHECKPOINT_END = 1,
64 BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END,
65 FORWARD_BETWEEN_CHECKPOINT_BEGIN_END,
66 FORWARD_NEWER_CHECKPOINT_END,
67};
68
69struct scan_state {
70 enum ss ss;
71 LSN checkpoint_begin_lsn;
72 LSN checkpoint_end_lsn;
73 uint64_t checkpoint_end_timestamp;
74 uint64_t checkpoint_begin_timestamp;
75 uint32_t checkpoint_num_fassociate;
76 uint32_t checkpoint_num_xstillopen;
77 TXNID last_xid;
78};
79
80static const char *scan_state_strings[] = {
81 "?", "bw_newer", "bw_between", "fw_between", "fw_newer",
82};
83
84static void scan_state_init(struct scan_state *ss) {
85 ss->ss = BACKWARD_NEWER_CHECKPOINT_END;
86 ss->checkpoint_begin_lsn = ZERO_LSN;
87 ss->checkpoint_end_lsn = ZERO_LSN;
88 ss->checkpoint_num_fassociate = 0;
89 ss->checkpoint_num_xstillopen = 0;
90 ss->last_xid = 0;
91}
92
93static const char *scan_state_string(struct scan_state *ss) {
94 assert(BACKWARD_NEWER_CHECKPOINT_END <= ss->ss && ss->ss <= FORWARD_NEWER_CHECKPOINT_END);
95 return scan_state_strings[ss->ss];
96}
97
98// File map tuple
99struct file_map_tuple {
100 FILENUM filenum;
101 FT_HANDLE ft_handle; // NULL ft_handle means it's a rollback file.
102 char *iname;
103 struct __toku_db fake_db;
104};
105
106static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, FT_HANDLE ft_handle, char *iname) {
107 tuple->filenum = filenum;
108 tuple->ft_handle = ft_handle;
109 tuple->iname = iname;
110 // use a fake DB for comparisons, using the ft's cmp descriptor
111 memset(&tuple->fake_db, 0, sizeof(tuple->fake_db));
112 tuple->fake_db.cmp_descriptor = &tuple->ft_handle->ft->cmp_descriptor;
113 tuple->fake_db.descriptor = &tuple->ft_handle->ft->descriptor;
114}
115
116static void file_map_tuple_destroy(struct file_map_tuple *tuple) {
117 if (tuple->iname) {
118 toku_free(tuple->iname);
119 tuple->iname = NULL;
120 }
121}
122
123// Map filenum to ft_handle
124struct file_map {
125 toku::omt<struct file_map_tuple *> *filenums;
126};
127
128// The recovery environment
129struct recover_env {
130 DB_ENV *env;
131 prepared_txn_callback_t prepared_txn_callback; // at the end of recovery, all the prepared txns are passed back to the ydb layer to make them into valid transactions.
132 keep_cachetable_callback_t keep_cachetable_callback; // after recovery, store the cachetable into the environment.
133 CACHETABLE ct;
134 TOKULOGGER logger;
135 CHECKPOINTER cp;
136 ft_compare_func bt_compare;
137 ft_update_func update_function;
138 generate_row_for_put_func generate_row_for_put;
139 generate_row_for_del_func generate_row_for_del;
140 DBT_ARRAY dest_keys;
141 DBT_ARRAY dest_vals;
142 struct scan_state ss;
143 struct file_map fmap;
144 bool goforward;
145 bool destroy_logger_at_end; // If true then destroy the logger when we are done. If false then set the logger into write-files mode when we are done with recovery.*/
146};
147typedef struct recover_env *RECOVER_ENV;
148
149
150static void file_map_init(struct file_map *fmap) {
151 XMALLOC(fmap->filenums);
152 fmap->filenums->create();
153}
154
155static void file_map_destroy(struct file_map *fmap) {
156 fmap->filenums->destroy();
157 toku_free(fmap->filenums);
158 fmap->filenums = nullptr;
159}
160
161static uint32_t file_map_get_num_dictionaries(struct file_map *fmap) {
162 return fmap->filenums->size();
163}
164
165static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) {
166 int r;
167
168 while (1) {
169 uint32_t n = fmap->filenums->size();
170 if (n == 0) {
171 break;
172 }
173 struct file_map_tuple *tuple;
174 r = fmap->filenums->fetch(n - 1, &tuple);
175 assert(r == 0);
176 r = fmap->filenums->delete_at(n - 1);
177 assert(r == 0);
178 assert(tuple->ft_handle);
179 // Logging is on again, but we must pass the right LSN into close.
180 if (tuple->ft_handle) { // it's a DB, not a rollback file
181 toku_ft_handle_close_recovery(tuple->ft_handle, oplsn);
182 }
183 file_map_tuple_destroy(tuple);
184 toku_free(tuple);
185 }
186}
187
188static int file_map_h(struct file_map_tuple *const &a, const FILENUM &b) {
189 if (a->filenum.fileid < b.fileid) {
190 return -1;
191 } else if (a->filenum.fileid > b.fileid) {
192 return 1;
193 } else {
194 return 0;
195 }
196}
197
198static int file_map_insert (struct file_map *fmap, FILENUM fnum, FT_HANDLE ft_handle, char *iname) {
199 struct file_map_tuple *XMALLOC(tuple);
200 file_map_tuple_init(tuple, fnum, ft_handle, iname);
201 int r = fmap->filenums->insert<FILENUM, file_map_h>(tuple, fnum, nullptr);
202 return r;
203}
204
205static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
206 uint32_t idx;
207 struct file_map_tuple *tuple;
208 int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx);
209 if (r == 0) {
210 r = fmap->filenums->delete_at(idx);
211 file_map_tuple_destroy(tuple);
212 toku_free(tuple);
213 }
214}
215
216// Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND)
217static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
218 uint32_t idx;
219 struct file_map_tuple *tuple;
220 int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx);
221 if (r == 0) {
222 assert(tuple->filenum.fileid == fnum.fileid);
223 *file_map_tuple = tuple;
224 } else {
225 assert(r == DB_NOTFOUND);
226 }
227 return r;
228}
229
230static int recover_env_init (RECOVER_ENV renv,
231 const char *env_dir,
232 DB_ENV *env,
233 prepared_txn_callback_t prepared_txn_callback,
234 keep_cachetable_callback_t keep_cachetable_callback,
235 TOKULOGGER logger,
236 ft_compare_func bt_compare,
237 ft_update_func update_function,
238 generate_row_for_put_func generate_row_for_put,
239 generate_row_for_del_func generate_row_for_del,
240 size_t cachetable_size) {
241 int r = 0;
242
243 // If we are passed a logger use it, otherwise create one.
244 renv->destroy_logger_at_end = logger==NULL;
245 if (logger) {
246 renv->logger = logger;
247 } else {
248 r = toku_logger_create(&renv->logger);
249 assert(r == 0);
250 }
251 toku_logger_write_log_files(renv->logger, false);
252 toku_cachetable_create(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, renv->logger);
253 toku_cachetable_set_env_dir(renv->ct, env_dir);
254 if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct);
255 toku_logger_set_cachetable(renv->logger, renv->ct);
256 renv->env = env;
257 renv->prepared_txn_callback = prepared_txn_callback;
258 renv->keep_cachetable_callback = keep_cachetable_callback;
259 renv->bt_compare = bt_compare;
260 renv->update_function = update_function;
261 renv->generate_row_for_put = generate_row_for_put;
262 renv->generate_row_for_del = generate_row_for_del;
263 file_map_init(&renv->fmap);
264 renv->goforward = false;
265 renv->cp = toku_cachetable_get_checkpointer(renv->ct);
266 toku_dbt_array_init(&renv->dest_keys, 1);
267 toku_dbt_array_init(&renv->dest_vals, 1);
268 if (tokuft_recovery_trace)
269 fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__);
270 return r;
271}
272
273static void recover_env_cleanup (RECOVER_ENV renv) {
274 invariant_zero(renv->fmap.filenums->size());
275 file_map_destroy(&renv->fmap);
276
277 if (renv->destroy_logger_at_end) {
278 toku_logger_close_rollback(renv->logger);
279 int r = toku_logger_close(&renv->logger);
280 assert(r == 0);
281 } else {
282 toku_logger_write_log_files(renv->logger, true);
283 }
284
285 if (renv->keep_cachetable_callback) {
286 renv->ct = NULL;
287 } else {
288 toku_cachetable_close(&renv->ct);
289 }
290 toku_dbt_array_destroy(&renv->dest_keys);
291 toku_dbt_array_destroy(&renv->dest_vals);
292
293 if (tokuft_recovery_trace)
294 fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__);
295}
296
297static const char *recover_state(RECOVER_ENV renv) {
298 return scan_state_string(&renv->ss);
299}
300
301// Open the file if it is not already open. If it is already open, then do nothing.
302static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, uint32_t treeflags,
303 TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) {
304 int r = 0;
305 FT_HANDLE ft_handle = NULL;
306 char *iname = fixup_fname(bs_iname);
307
308 toku_ft_handle_create(&ft_handle);
309 toku_ft_set_flags(ft_handle, treeflags);
310
311 if (nodesize != 0) {
312 toku_ft_handle_set_nodesize(ft_handle, nodesize);
313 }
314
315 if (basementnodesize != 0) {
316 toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize);
317 }
318
319 if (compression_method != TOKU_DEFAULT_COMPRESSION_METHOD) {
320 toku_ft_handle_set_compression_method(ft_handle, compression_method);
321 }
322
323 // set the key compare functions
324 if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare) {
325 toku_ft_set_bt_compare(ft_handle, renv->bt_compare);
326 }
327
328 if (renv->update_function) {
329 toku_ft_set_update(ft_handle, renv->update_function);
330 }
331
332 // TODO mode (FUTURE FEATURE)
333 //mode = mode;
334
335 r = toku_ft_handle_open_recovery(ft_handle, iname, must_create, must_create, renv->ct, txn, filenum, max_acceptable_lsn);
336 if (r != 0) {
337 //Note: If ft_handle_open fails, then close_ft will NOT write a header to disk.
338 //No need to provide lsn, so use the regular toku_ft_handle_close function
339 toku_ft_handle_close(ft_handle);
340 toku_free(iname);
341 if (r == ENOENT) //Not an error to simply be missing.
342 r = 0;
343 return r;
344 }
345
346 file_map_insert(&renv->fmap, filenum, ft_handle, iname);
347 return 0;
348}
349
350static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
351 int r;
352 TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger);
353 switch (renv->ss.ss) {
354 case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
355 assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
356 invariant(renv->ss.last_xid == TXNID_NONE);
357 renv->ss.last_xid = l->last_xid;
358 toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid);
359
360 r = 0;
361 break;
362 case FORWARD_NEWER_CHECKPOINT_END:
363 assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn);
364 // Verify last_xid is no older than the previous begin
365 invariant(l->last_xid >= renv->ss.last_xid);
366 // Verify last_xid is no older than the newest txn
367 invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr));
368
369 r = 0; // ignore it (log only has a begin checkpoint)
370 break;
371 default:
372 fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
373 abort();
374 break;
375 }
376 return r;
377}
378
379static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
380 int r;
381 time_t tnow = time(NULL);
382 fprintf(stderr, "%.24s PerconaFT recovery bw_begin_checkpoint at %" PRIu64 " timestamp %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv));
383 switch (renv->ss.ss) {
384 case BACKWARD_NEWER_CHECKPOINT_END:
385 // incomplete checkpoint, nothing to do
386 r = 0;
387 break;
388 case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
389 assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
390 renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END;
391 renv->ss.checkpoint_begin_timestamp = l->timestamp;
392 renv->goforward = true;
393 tnow = time(NULL);
394 fprintf(stderr, "%.24s PerconaFT recovery turning around at begin checkpoint %" PRIu64 " time %" PRIu64 "\n",
395 ctime(&tnow), l->lsn.lsn,
396 renv->ss.checkpoint_end_timestamp - renv->ss.checkpoint_begin_timestamp);
397 r = 0;
398 break;
399 default:
400 fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
401 abort();
402 break;
403 }
404 return r;
405}
406
407static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
408 int r;
409 switch (renv->ss.ss) {
410 case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
411 assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn);
412 assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn);
413 assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate);
414 assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen);
415 renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END;
416 r = 0;
417 break;
418 case FORWARD_NEWER_CHECKPOINT_END:
419 assert(0);
420 return 0;
421 default:
422 assert(0);
423 return 0;
424 }
425 return r;
426}
427
428static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
429 time_t tnow = time(NULL);
430 fprintf(stderr, "%.24s PerconaFT recovery bw_end_checkpoint at %" PRIu64 " timestamp %" PRIu64 " xid %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, l->lsn_begin_checkpoint.lsn, recover_state(renv));
431 switch (renv->ss.ss) {
432 case BACKWARD_NEWER_CHECKPOINT_END:
433 renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END;
434 renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn;
435 renv->ss.checkpoint_end_lsn.lsn = l->lsn.lsn;
436 renv->ss.checkpoint_end_timestamp = l->timestamp;
437 return 0;
438 case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
439 fprintf(stderr, "PerconaFT recovery %s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n", __FILE__, __LINE__);
440 abort();
441 default:
442 break;
443 }
444 fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
445 abort();
446}
447
448static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
449 struct file_map_tuple *tuple = NULL;
450 int r = file_map_find(&renv->fmap, l->filenum, &tuple);
451 char *fname = fixup_fname(&l->iname);
452 switch (renv->ss.ss) {
453 case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
454 renv->ss.checkpoint_num_fassociate++;
455 assert(r==DB_NOTFOUND); //Not open
456 // Open it if it exists.
457 // If rollback file, specify which checkpointed version of file we need (not just the latest)
458 // because we cannot use a rollback log that is later than the last complete checkpoint. See #3113.
459 {
460 bool rollback_file = (0==strcmp(fname, toku_product_name_strings.rollback_cachefile));
461 LSN max_acceptable_lsn = MAX_LSN;
462 if (rollback_file) {
463 max_acceptable_lsn = renv->ss.checkpoint_begin_lsn;
464 FT_HANDLE t;
465 toku_ft_handle_create(&t);
466 r = toku_ft_handle_open_recovery(t, toku_product_name_strings.rollback_cachefile, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn);
467 renv->logger->rollback_cachefile = t->ft->cf;
468 toku_logger_initialize_rollback_cache(renv->logger, t->ft);
469 } else {
470 r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn);
471 assert(r==0);
472 }
473 }
474 // try to open the file again and if we get it, restore
475 // the unlink on close bit.
476 int ret;
477 ret = file_map_find(&renv->fmap, l->filenum, &tuple);
478 if (ret == 0 && l->unlink_on_close) {
479 toku_cachefile_unlink_on_close(tuple->ft_handle->ft->cf);
480 }
481 break;
482 case FORWARD_NEWER_CHECKPOINT_END:
483 if (r == 0) { //IF it is open
484 // assert that the filenum maps to the correct iname
485 assert(strcmp(fname, tuple->iname) == 0);
486 }
487 r = 0;
488 break;
489 default:
490 assert(0);
491 return 0;
492 }
493 toku_free(fname);
494
495 return r;
496}
497
498static int toku_recover_backward_fassociate (struct logtype_fassociate *UU(l), RECOVER_ENV UU(renv)) {
499 // nothing
500 return 0;
501}
502
503static int
504recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOGGER logger) {
505 int r;
506
507 // lookup the parent
508 TOKUTXN parent = NULL;
509 if (!txn_pair_is_none(parentxid)) {
510 toku_txnid2txn(logger, parentxid, &parent);
511 assert(parent!=NULL);
512 }
513 else {
514 invariant(xid.child_id64 == TXNID_NONE);
515 }
516
517 // create a transaction and bind it to the transaction id
518 TOKUTXN txn = NULL;
519 {
520 //Verify it does not yet exist.
521 toku_txnid2txn(logger, xid, &txn);
522 assert(txn==NULL);
523 }
524 r = toku_txn_begin_with_xid(
525 parent,
526 &txn,
527 logger,
528 xid,
529 TXN_SNAPSHOT_NONE,
530 NULL,
531 true, // for_recovery
532 false // read_only
533 );
534 assert(r == 0);
535 // We only know about it because it was logged. Restore the log bit.
536 // Logging is 'off' but it will still set the bit.
537 toku_maybe_log_begin_txn_for_write_operation(txn);
538 if (txnp) *txnp = txn;
539 return 0;
540}
541
542static int recover_xstillopen_internal (TOKUTXN *txnp,
543 LSN UU(lsn),
544 TXNID_PAIR xid,
545 TXNID_PAIR parentxid,
546 uint64_t rollentry_raw_count,
547 FILENUMS open_filenums,
548 bool force_fsync_on_commit,
549 uint64_t num_rollback_nodes,
550 uint64_t num_rollentries,
551 BLOCKNUM spilled_rollback_head,
552 BLOCKNUM spilled_rollback_tail,
553 BLOCKNUM current_rollback,
554 uint32_t UU(crc),
555 uint32_t UU(len),
556 RECOVER_ENV renv) {
557 int r;
558 *txnp = NULL;
559 switch (renv->ss.ss) {
560 case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
561 renv->ss.checkpoint_num_xstillopen++;
562 invariant(renv->ss.last_xid != TXNID_NONE);
563 invariant(xid.parent_id64 <= renv->ss.last_xid);
564 TOKUTXN txn = NULL;
565 { //Create the transaction.
566 r = recover_transaction(&txn, xid, parentxid, renv->logger);
567 assert(r==0);
568 assert(txn!=NULL);
569 *txnp = txn;
570 }
571 { //Recover rest of transaction.
572#define COPY_TO_INFO(field) .field = field
573 struct txninfo info = {
574 COPY_TO_INFO(rollentry_raw_count),
575 .num_fts = 0, //Set afterwards
576 .open_fts = NULL, //Set afterwards
577 COPY_TO_INFO(force_fsync_on_commit),
578 COPY_TO_INFO(num_rollback_nodes),
579 COPY_TO_INFO(num_rollentries),
580 COPY_TO_INFO(spilled_rollback_head),
581 COPY_TO_INFO(spilled_rollback_tail),
582 COPY_TO_INFO(current_rollback)
583 };
584#undef COPY_TO_INFO
585 //Generate open_fts
586 FT array[open_filenums.num]; //Allocate maximum possible requirement
587 info.open_fts = array;
588 uint32_t i;
589 for (i = 0; i < open_filenums.num; i++) {
590 //open_filenums.filenums[]
591 struct file_map_tuple *tuple = NULL;
592 r = file_map_find(&renv->fmap, open_filenums.filenums[i], &tuple);
593 if (r==0) {
594 info.open_fts[info.num_fts++] = tuple->ft_handle->ft;
595 }
596 else {
597 assert(r==DB_NOTFOUND);
598 }
599 }
600 r = toku_txn_load_txninfo(txn, &info);
601 assert(r==0);
602 }
603 break;
604 }
605 case FORWARD_NEWER_CHECKPOINT_END: {
606 // assert that the transaction exists
607 TOKUTXN txn = NULL;
608 toku_txnid2txn(renv->logger, xid, &txn);
609 r = 0;
610 *txnp = txn;
611 break;
612 }
613 default:
614 assert(0);
615 return 0;
616 }
617 return r;
618}
619
620static int toku_recover_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) {
621 TOKUTXN txn;
622 return recover_xstillopen_internal (&txn,
623 l->lsn,
624 l->xid,
625 l->parentxid,
626 l->rollentry_raw_count,
627 l->open_filenums,
628 l->force_fsync_on_commit,
629 l->num_rollback_nodes,
630 l->num_rollentries,
631 l->spilled_rollback_head,
632 l->spilled_rollback_tail,
633 l->current_rollback,
634 l->crc,
635 l->len,
636 renv);
637}
638
639static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l, RECOVER_ENV renv) {
640 TOKUTXN txn;
641 int r = recover_xstillopen_internal (&txn,
642 l->lsn,
643 l->xid,
644 TXNID_PAIR_NONE,
645 l->rollentry_raw_count,
646 l->open_filenums,
647 l->force_fsync_on_commit,
648 l->num_rollback_nodes,
649 l->num_rollentries,
650 l->spilled_rollback_head,
651 l->spilled_rollback_tail,
652 l->current_rollback,
653 l->crc,
654 l->len,
655 renv);
656 if (r != 0) {
657 goto exit;
658 }
659 switch (renv->ss.ss) {
660 case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
661 toku_txn_prepare_txn(txn, l->xa_xid, 0);
662 break;
663 }
664 case FORWARD_NEWER_CHECKPOINT_END: {
665 assert(txn->state == TOKUTXN_PREPARING);
666 break;
667 }
668 default: {
669 assert(0);
670 }
671 }
672exit:
673 return r;
674}
675
676static int toku_recover_backward_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
677 // nothing
678 return 0;
679}
680static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenprepared *UU(l), RECOVER_ENV UU(renv)) {
681 // nothing
682 return 0;
683}
684
685static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
686 int r;
687 r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger);
688 return r;
689}
690
691static int toku_recover_backward_xbegin (struct logtype_xbegin *UU(l), RECOVER_ENV UU(renv)) {
692 // nothing
693 return 0;
694}
695
696struct toku_txn_progress_extra {
697 time_t tlast;
698 LSN lsn;
699 const char *type;
700 TXNID_PAIR xid;
701 uint64_t last_total;
702};
703
704static void toku_recover_txn_progress(TOKU_TXN_PROGRESS txn_progress, void *extra) {
705 toku_txn_progress_extra *txn_progress_extra = static_cast<toku_txn_progress_extra *>(extra);
706 if (txn_progress_extra->last_total == 0)
707 txn_progress_extra->last_total = txn_progress->entries_total;
708 else
709 assert(txn_progress_extra->last_total == txn_progress->entries_total);
710 time_t tnow = time(NULL);
711 if (tnow - txn_progress_extra->tlast >= tokuft_recovery_progress_time) {
712 txn_progress_extra->tlast = tnow;
713 fprintf(stderr, "%.24s PerconaFT ", ctime(&tnow));
714 if (txn_progress_extra->lsn.lsn != 0)
715 fprintf(stderr, "lsn %" PRIu64 " ", txn_progress_extra->lsn.lsn);
716 fprintf(stderr, "%s xid %" PRIu64 ":%" PRIu64 " ",
717 txn_progress_extra->type, txn_progress_extra->xid.parent_id64, txn_progress_extra->xid.child_id64);
718 fprintf(stderr, "%" PRIu64 "/%" PRIu64 " ",
719 txn_progress->entries_processed, txn_progress->entries_total);
720 if (txn_progress->entries_total > 0)
721 fprintf(stderr, "%.0f%% ", ((double) txn_progress->entries_processed / (double) txn_progress->entries_total) * 100.0);
722 fprintf(stderr, "\n");
723 }
724}
725
726static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
727 // find the transaction by transaction id
728 TOKUTXN txn = NULL;
729 toku_txnid2txn(renv->logger, l->xid, &txn);
730 assert(txn!=NULL);
731
732 // commit the transaction
733 toku_txn_progress_extra extra = { time(NULL), l->lsn, "commit", l->xid, 0 };
734 int r = toku_txn_commit_with_lsn(txn, true, l->lsn, toku_recover_txn_progress, &extra);
735 assert(r == 0);
736
737 // close the transaction
738 toku_txn_close_txn(txn);
739
740 return 0;
741}
742
743static int toku_recover_backward_xcommit (struct logtype_xcommit *UU(l), RECOVER_ENV UU(renv)) {
744 // nothing
745 return 0;
746}
747
748static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) {
749 // find the transaction by transaction id
750 TOKUTXN txn = NULL;
751 toku_txnid2txn(renv->logger, l->xid, &txn);
752 assert(txn!=NULL);
753
754 // Save the transaction
755 toku_txn_prepare_txn(txn, l->xa_xid, 0);
756
757 return 0;
758}
759
760static int toku_recover_backward_xprepare (struct logtype_xprepare *UU(l), RECOVER_ENV UU(renv)) {
761 // nothing
762 return 0;
763}
764
765
766
767static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
768 int r;
769
770 // find the transaction by transaction id
771 TOKUTXN txn = NULL;
772 toku_txnid2txn(renv->logger, l->xid, &txn);
773 assert(txn!=NULL);
774
775 // abort the transaction
776 toku_txn_progress_extra extra = { time(NULL), l->lsn, "abort", l->xid, 0 };
777 r = toku_txn_abort_with_lsn(txn, l->lsn, toku_recover_txn_progress, &extra);
778 assert(r == 0);
779
780 // close the transaction
781 toku_txn_close_txn(txn);
782
783 return 0;
784}
785
786static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) {
787 // nothing
788 return 0;
789}
790
791// fcreate is like fopen except that the file must be created.
792static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
793 int r;
794
795 TOKUTXN txn = NULL;
796 toku_txnid2txn(renv->logger, l->xid, &txn);
797
798 // assert that filenum is closed
799 struct file_map_tuple *tuple = NULL;
800 r = file_map_find(&renv->fmap, l->filenum, &tuple);
801 assert(r==DB_NOTFOUND);
802
803 assert(txn!=NULL);
804
805 //unlink if it exists (recreate from scratch).
806 char *iname = fixup_fname(&l->iname);
807 char *iname_in_cwd = toku_cachetable_get_fname_in_cwd(renv->ct, iname);
808 r = unlink(iname_in_cwd);
809 if (r != 0) {
810 int er = get_error_errno();
811 if (er != ENOENT) {
812 fprintf(stderr, "PerconaFT recovery %s:%d unlink %s %d\n", __FUNCTION__, __LINE__, iname, er);
813 toku_free(iname);
814 return r;
815 }
816 }
817 assert(0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)); //Creation of rollback cachefile never gets logged.
818 toku_free(iname_in_cwd);
819 toku_free(iname);
820
821 bool must_create = true;
822 r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, txn, l->nodesize, l->basementnodesize, (enum toku_compression_method) l->compression_method, MAX_LSN);
823 return r;
824}
825
826static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
827 // nothing
828 return 0;
829}
830
831
832
833static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
834 int r;
835
836 // assert that filenum is closed
837 struct file_map_tuple *tuple = NULL;
838 r = file_map_find(&renv->fmap, l->filenum, &tuple);
839 assert(r==DB_NOTFOUND);
840
841 bool must_create = false;
842 TOKUTXN txn = NULL;
843 char *fname = fixup_fname(&l->iname);
844
845 assert(0!=strcmp(fname, toku_product_name_strings.rollback_cachefile)); //Rollback cachefile can be opened only via fassociate.
846 r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, txn, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, MAX_LSN);
847
848 toku_free(fname);
849 return r;
850}
851
852static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), RECOVER_ENV UU(renv)) {
853 // nothing
854 return 0;
855}
856
857static int toku_recover_change_fdescriptor (struct logtype_change_fdescriptor *l, RECOVER_ENV renv) {
858 int r;
859 struct file_map_tuple *tuple = NULL;
860 r = file_map_find(&renv->fmap, l->filenum, &tuple);
861 if (r==0) {
862 TOKUTXN txn = NULL;
863 //Maybe do the descriptor (lsn filter)
864 toku_txnid2txn(renv->logger, l->xid, &txn);
865 DBT old_descriptor, new_descriptor;
866 toku_fill_dbt(
867 &old_descriptor,
868 l->old_descriptor.data,
869 l->old_descriptor.len
870 );
871 toku_fill_dbt(
872 &new_descriptor,
873 l->new_descriptor.data,
874 l->new_descriptor.len
875 );
876 toku_ft_change_descriptor(
877 tuple->ft_handle,
878 &old_descriptor,
879 &new_descriptor,
880 false,
881 txn,
882 l->update_cmp_descriptor
883 );
884 }
885 return 0;
886}
887
888static int toku_recover_backward_change_fdescriptor (struct logtype_change_fdescriptor *UU(l), RECOVER_ENV UU(renv)) {
889 return 0;
890}
891
892
893// if file referred to in l is open, close it
894static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
895 struct file_map_tuple *tuple = NULL;
896 int r = file_map_find(&renv->fmap, l->filenum, &tuple);
897 if (r == 0) { // if file is open
898 char *iname = fixup_fname(&l->iname);
899 assert(strcmp(tuple->iname, iname) == 0); // verify that file_map has same iname as log entry
900
901 if (0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)) {
902 //Rollback cachefile is closed manually at end of recovery, not here
903 toku_ft_handle_close_recovery(tuple->ft_handle, l->lsn);
904 }
905 file_map_remove(&renv->fmap, l->filenum);
906 toku_free(iname);
907 }
908 return 0;
909}
910
911static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) {
912 // nothing
913 return 0;
914}
915
916// fdelete is a transactional file delete.
917static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
918 TOKUTXN txn = NULL;
919 toku_txnid2txn(renv->logger, l->xid, &txn);
920 assert(txn != NULL);
921
922 // if the forward scan in recovery found this file and opened it, we
923 // need to mark the txn to remove the ft on commit. if the file was
924 // not found and not opened, we don't need to do anything - the ft
925 // is already gone, so we're happy.
926 struct file_map_tuple *tuple;
927 int r = file_map_find(&renv->fmap, l->filenum, &tuple);
928 if (r == 0) {
929 toku_ft_unlink_on_commit(tuple->ft_handle, txn);
930 }
931 return 0;
932}
933
934static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER_ENV UU(renv)) {
935 // nothing
936 return 0;
937}
938
939static int toku_recover_frename(struct logtype_frename *l, RECOVER_ENV renv) {
940 assert(renv);
941 assert(renv->env);
942
943 toku_struct_stat stat;
944 const char *data_dir = renv->env->get_data_dir(renv->env);
945 bool old_exist = true;
946 bool new_exist = true;
947
948 assert(data_dir);
949
950 struct file_map_tuple *tuple;
951
952 std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(
953 toku_construct_full_name(2, data_dir, l->old_iname.data), &toku_free);
954 std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(
955 toku_construct_full_name(2, data_dir, l->new_iname.data), &toku_free);
956
957 if (toku_stat(old_iname_full.get(), &stat, toku_uninstrumented) == -1) {
958 if (ENOENT == errno)
959 old_exist = false;
960 else
961 return 1;
962 }
963
964 if (toku_stat(new_iname_full.get(), &stat, toku_uninstrumented) == -1) {
965 if (ENOENT == errno)
966 new_exist = false;
967 else
968 return 1;
969 }
970
971 // Both old and new files can exist if:
972 // - rename() is not completed
973 // - fcreate was replayed during recovery
974 // 'Stalled cachefiles' container cachefile_list::m_stale_fileid contains
975 // closed but not yet evicted cachefiles and the key of this container is
976 // fs-dependent file id - (device id, inode number) pair. As it is supposed
977 // new file have not yet created during recovery process the 'stalled
978 // cachefile' container can contain only cache file of old file.
979 // To preserve the old cachefile file's id and keep it in
980 // 'stalled cachefiles' container the new file is removed
981 // and the old file is renamed.
982 if (old_exist && new_exist &&
983 (toku_os_delete(new_iname_full.get()) == -1 ||
984 toku_os_rename(old_iname_full.get(), new_iname_full.get()) == -1 ||
985 toku_fsync_directory(old_iname_full.get()) == -1 ||
986 toku_fsync_directory(new_iname_full.get()) == -1))
987 return 1;
988
989 if (old_exist && !new_exist &&
990 (!toku_create_subdirs_if_needed(new_iname_full.get()) ||
991 toku_os_rename(old_iname_full.get(), new_iname_full.get()) == -1 ||
992 toku_fsync_directory(old_iname_full.get()) == -1 ||
993 toku_fsync_directory(new_iname_full.get()) == -1))
994 return 1;
995
996 if (file_map_find(&renv->fmap, l->old_filenum, &tuple) != DB_NOTFOUND) {
997 if (tuple->iname)
998 toku_free(tuple->iname);
999 tuple->iname = toku_xstrdup(l->new_iname.data);
1000 }
1001
1002 TOKUTXN txn = NULL;
1003 toku_txnid2txn(renv->logger, l->xid, &txn);
1004
1005 if (txn)
1006 toku_logger_save_rollback_frename(txn, &l->old_iname, &l->new_iname);
1007
1008 return 0;
1009}
1010
1011static int toku_recover_backward_frename(struct logtype_frename *UU(l),
1012 RECOVER_ENV UU(renv)) {
1013 // nothing
1014 return 0;
1015}
1016
1017static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) {
1018 int r;
1019 TOKUTXN txn = NULL;
1020 toku_txnid2txn(renv->logger, l->xid, &txn);
1021 assert(txn!=NULL);
1022 struct file_map_tuple *tuple = NULL;
1023 r = file_map_find(&renv->fmap, l->filenum, &tuple);
1024 if (r==0) {
1025 //Maybe do the insertion if we found the cachefile.
1026 DBT keydbt, valdbt;
1027 toku_fill_dbt(&keydbt, l->key.data, l->key.len);
1028 toku_fill_dbt(&valdbt, l->value.data, l->value.len);
1029 toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT);
1030 toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft);
1031 }
1032 return 0;
1033}
1034
1035static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) {
1036 // nothing
1037 return 0;
1038}
1039
1040static int toku_recover_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *l, RECOVER_ENV renv) {
1041 int r;
1042 TOKUTXN txn = NULL;
1043 toku_txnid2txn(renv->logger, l->xid, &txn);
1044 assert(txn!=NULL);
1045 struct file_map_tuple *tuple = NULL;
1046 r = file_map_find(&renv->fmap, l->filenum, &tuple);
1047 if (r==0) {
1048 //Maybe do the insertion if we found the cachefile.
1049 DBT keydbt, valdbt;
1050 toku_fill_dbt(&keydbt, l->key.data, l->key.len);
1051 toku_fill_dbt(&valdbt, l->value.data, l->value.len);
1052 toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT_NO_OVERWRITE);
1053 }
1054 return 0;
1055}
1056
1057static int toku_recover_backward_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *UU(l), RECOVER_ENV UU(renv)) {
1058 // nothing
1059 return 0;
1060}
1061
1062static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVER_ENV renv) {
1063 int r;
1064 TOKUTXN txn = NULL;
1065 toku_txnid2txn(renv->logger, l->xid, &txn);
1066 assert(txn!=NULL);
1067 struct file_map_tuple *tuple = NULL;
1068 r = file_map_find(&renv->fmap, l->filenum, &tuple);
1069 if (r==0) {
1070 //Maybe do the deletion if we found the cachefile.
1071 DBT keydbt;
1072 toku_fill_dbt(&keydbt, l->key.data, l->key.len);
1073 toku_ft_maybe_delete(tuple->ft_handle, &keydbt, txn, true, l->lsn, false);
1074 }
1075 return 0;
1076}
1077
1078static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(renv)) {
1079 // nothing
1080 return 0;
1081}
1082
1083static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) {
1084 int r;
1085 TOKUTXN txn = NULL;
1086 toku_txnid2txn(renv->logger, l->xid, &txn);
1087 assert(txn!=NULL);
1088 DB *src_db = NULL;
1089 bool do_inserts = true;
1090 {
1091 struct file_map_tuple *tuple = NULL;
1092 r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
1093 if (l->src_filenum.fileid == FILENUM_NONE.fileid)
1094 assert(r==DB_NOTFOUND);
1095 else {
1096 if (r == 0)
1097 src_db = &tuple->fake_db;
1098 else
1099 do_inserts = false; // src file was probably deleted, #3129
1100 }
1101 }
1102
1103 if (do_inserts) {
1104 DBT src_key, src_val;
1105
1106 toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
1107 toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
1108
1109 for (uint32_t file = 0; file < l->dest_filenums.num; file++) {
1110 struct file_map_tuple *tuple = NULL;
1111 r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
1112 if (r==0) {
1113 // We found the cachefile. (maybe) Do the insert.
1114 DB *db = &tuple->fake_db;
1115
1116 DBT_ARRAY key_array;
1117 DBT_ARRAY val_array;
1118 if (db != src_db) {
1119 r = renv->generate_row_for_put(db, src_db, &renv->dest_keys, &renv->dest_vals, &src_key, &src_val);
1120 assert(r==0);
1121 invariant(renv->dest_keys.size <= renv->dest_keys.capacity);
1122 invariant(renv->dest_vals.size <= renv->dest_vals.capacity);
1123 invariant(renv->dest_keys.size == renv->dest_vals.size);
1124 key_array = renv->dest_keys;
1125 val_array = renv->dest_vals;
1126 } else {
1127 key_array.size = key_array.capacity = 1;
1128 key_array.dbts = &src_key;
1129
1130 val_array.size = val_array.capacity = 1;
1131 val_array.dbts = &src_val;
1132 }
1133 for (uint32_t i = 0; i < key_array.size; i++) {
1134 toku_ft_maybe_insert(tuple->ft_handle, &key_array.dbts[i], &val_array.dbts[i], txn, true, l->lsn, false, FT_INSERT);
1135 }
1136 }
1137 }
1138 }
1139
1140 return 0;
1141}
1142
1143static int toku_recover_backward_enq_insert_multiple (struct logtype_enq_insert_multiple *UU(l), RECOVER_ENV UU(renv)) {
1144 // nothing
1145 return 0;
1146}
1147
1148static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple *l, RECOVER_ENV renv) {
1149 int r;
1150 TOKUTXN txn = NULL;
1151 toku_txnid2txn(renv->logger, l->xid, &txn);
1152 assert(txn!=NULL);
1153 DB *src_db = NULL;
1154 bool do_deletes = true;
1155 {
1156 struct file_map_tuple *tuple = NULL;
1157 r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
1158 if (l->src_filenum.fileid == FILENUM_NONE.fileid)
1159 assert(r==DB_NOTFOUND);
1160 else {
1161 if (r == 0) {
1162 src_db = &tuple->fake_db;
1163 } else {
1164 do_deletes = false; // src file was probably deleted, #3129
1165 }
1166 }
1167 }
1168
1169 if (do_deletes) {
1170 DBT src_key, src_val;
1171 toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
1172 toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
1173
1174 for (uint32_t file = 0; file < l->dest_filenums.num; file++) {
1175 struct file_map_tuple *tuple = NULL;
1176 r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
1177 if (r==0) {
1178 // We found the cachefile. (maybe) Do the delete.
1179 DB *db = &tuple->fake_db;
1180
1181 DBT_ARRAY key_array;
1182 if (db != src_db) {
1183 r = renv->generate_row_for_del(db, src_db, &renv->dest_keys, &src_key, &src_val);
1184 assert(r==0);
1185 invariant(renv->dest_keys.size <= renv->dest_keys.capacity);
1186 key_array = renv->dest_keys;
1187 } else {
1188 key_array.size = key_array.capacity = 1;
1189 key_array.dbts = &src_key;
1190 }
1191 for (uint32_t i = 0; i < key_array.size; i++) {
1192 toku_ft_maybe_delete(tuple->ft_handle, &key_array.dbts[i], txn, true, l->lsn, false);
1193 }
1194 }
1195 }
1196 }
1197
1198 return 0;
1199}
1200
1201static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_multiple *UU(l), RECOVER_ENV UU(renv)) {
1202 // nothing
1203 return 0;
1204}
1205
1206static int toku_recover_enq_update(struct logtype_enq_update *l, RECOVER_ENV renv) {
1207 int r;
1208 TOKUTXN txn = NULL;
1209 toku_txnid2txn(renv->logger, l->xid, &txn);
1210 assert(txn != NULL);
1211 struct file_map_tuple *tuple = NULL;
1212 r = file_map_find(&renv->fmap, l->filenum, &tuple);
1213 if (r == 0) {
1214 // Maybe do the update if we found the cachefile.
1215 DBT key, extra;
1216 toku_fill_dbt(&key, l->key.data, l->key.len);
1217 toku_fill_dbt(&extra, l->extra.data, l->extra.len);
1218 toku_ft_maybe_update(tuple->ft_handle, &key, &extra, txn, true, l->lsn, false);
1219 }
1220 return 0;
1221}
1222
1223static int toku_recover_enq_updatebroadcast(struct logtype_enq_updatebroadcast *l, RECOVER_ENV renv) {
1224 int r;
1225 TOKUTXN txn = NULL;
1226 toku_txnid2txn(renv->logger, l->xid, &txn);
1227 assert(txn != NULL);
1228 struct file_map_tuple *tuple = NULL;
1229 r = file_map_find(&renv->fmap, l->filenum, &tuple);
1230 if (r == 0) {
1231 // Maybe do the update broadcast if we found the cachefile.
1232 DBT extra;
1233 toku_fill_dbt(&extra, l->extra.data, l->extra.len);
1234 toku_ft_maybe_update_broadcast(tuple->ft_handle, &extra, txn, true,
1235 l->lsn, false, l->is_resetting_op);
1236 }
1237 return 0;
1238}
1239
1240static int toku_recover_backward_enq_update(struct logtype_enq_update *UU(l), RECOVER_ENV UU(renv)) {
1241 // nothing
1242 return 0;
1243}
1244
1245static int toku_recover_backward_enq_updatebroadcast(struct logtype_enq_updatebroadcast *UU(l), RECOVER_ENV UU(renv)) {
1246 // nothing
1247 return 0;
1248}
1249
1250static int toku_recover_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
1251 // nothing
1252 return 0;
1253}
1254
1255static int toku_recover_backward_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
1256 // nothing
1257 return 0;
1258}
1259
1260static int toku_recover_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) {
1261 // nothing
1262 return 0;
1263}
1264
1265static int toku_recover_backward_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) {
1266 // nothing
1267 return 0;
1268}
1269
1270static int toku_recover_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
1271 // nothing
1272 return 0;
1273}
1274
1275static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
1276 // nothing
1277 return 0;
1278}
1279
1280static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
1281 TOKUTXN txn = NULL;
1282 toku_txnid2txn(renv->logger, l->xid, &txn);
1283 assert(txn!=NULL);
1284 char *new_iname = fixup_fname(&l->new_iname);
1285
1286 toku_ft_load_recovery(txn, l->old_filenum, new_iname, 0, 0, (LSN*)NULL);
1287
1288 toku_free(new_iname);
1289 return 0;
1290}
1291
1292static int toku_recover_backward_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
1293 // nothing
1294 return 0;
1295}
1296
1297// #2954
1298static int toku_recover_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) {
1299 TOKUTXN txn = NULL;
1300 toku_txnid2txn(renv->logger, l->xid, &txn);
1301 assert(txn!=NULL);
1302 // just make an entry in the rollback log
1303 // - set do_log = 0 -> don't write to recovery log
1304 toku_ft_hot_index_recovery(txn, l->hot_index_filenums, 0, 0, (LSN*)NULL);
1305 return 0;
1306}
1307
1308// #2954
1309static int toku_recover_backward_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) {
1310 // nothing
1311 return 0;
1312}
1313
1314// Effects: If there are no log files, or if there is a clean "shutdown" at
1315// the end of the log, then we don't need recovery to run.
1316// Returns: true if we need recovery, otherwise false.
1317int tokuft_needs_recovery(const char *log_dir, bool ignore_log_empty) {
1318 int needs_recovery;
1319 int r;
1320 TOKULOGCURSOR logcursor = NULL;
1321
1322 r = toku_logcursor_create(&logcursor, log_dir);
1323 if (r != 0) {
1324 needs_recovery = true; goto exit;
1325 }
1326
1327 struct log_entry *le;
1328 le = NULL;
1329 r = toku_logcursor_last(logcursor, &le);
1330 if (r == 0) {
1331 needs_recovery = le->cmd != LT_shutdown;
1332 }
1333 else {
1334 needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty);
1335 }
1336 exit:
1337 if (logcursor) {
1338 r = toku_logcursor_destroy(&logcursor);
1339 assert(r == 0);
1340 }
1341 return needs_recovery;
1342}
1343
1344static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) {
1345 return toku_txn_manager_num_live_root_txns(renv->logger->txn_manager);
1346}
1347
1348static int is_txn_unprepared(TOKUTXN txn, void* extra) {
1349 TOKUTXN* ptxn = (TOKUTXN *)extra;
1350 if (txn->state != TOKUTXN_PREPARING) {
1351 *ptxn = txn;
1352 return -1; // return -1 to get iterator to return
1353 }
1354 return 0;
1355}
1356
1357static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) {
1358 TOKUTXN txn = nullptr;
1359 int r = toku_txn_manager_iter_over_live_root_txns(
1360 renv->logger->txn_manager,
1361 is_txn_unprepared,
1362 &txn
1363 );
1364 assert(r == 0 || r == -1);
1365 if (txn != nullptr) {
1366 *txnp = txn;
1367 return 0;
1368 }
1369 return DB_NOTFOUND;
1370}
1371
1372static int call_prepare_txn_callback_iter(TOKUTXN txn, void* extra) {
1373 RECOVER_ENV* renv = (RECOVER_ENV *)extra;
1374 invariant(txn->state == TOKUTXN_PREPARING);
1375 invariant(txn->child == NULL);
1376 (*renv)->prepared_txn_callback((*renv)->env, txn);
1377 return 0;
1378}
1379
1380static void recover_abort_live_txn(TOKUTXN txn) {
1381 fprintf(stderr, "%s %" PRIu64 "\n", __FUNCTION__, txn->txnid.parent_id64);
1382 // recursively abort all children first
1383 if (txn->child != NULL) {
1384 recover_abort_live_txn(txn->child);
1385 }
1386 // sanity check that the recursive call successfully NULLs out txn->child
1387 invariant(txn->child == NULL);
1388 // abort the transaction
1389 toku_txn_progress_extra extra = { time(NULL), ZERO_LSN, "abort live", txn->txnid, 0 };
1390 int r = toku_txn_abort_txn(txn, toku_recover_txn_progress, &extra);
1391 assert(r == 0);
1392
1393 // close the transaction
1394 toku_txn_close_txn(txn);
1395}
1396
1397// abort all of the remaining live transactions in descending transaction id order
1398static void recover_abort_all_live_txns(RECOVER_ENV renv) {
1399 while (1) {
1400 TOKUTXN txn;
1401 int r = find_an_unprepared_txn(renv, &txn);
1402 if (r==0) {
1403 recover_abort_live_txn(txn);
1404 } else if (r==DB_NOTFOUND) {
1405 break;
1406 } else {
1407 abort();
1408 }
1409 }
1410
1411 // Now we have only prepared txns. These prepared txns don't have full DB_TXNs in them, so we need to make some.
1412 int r = toku_txn_manager_iter_over_live_root_txns(
1413 renv->logger->txn_manager,
1414 call_prepare_txn_callback_iter,
1415 &renv
1416 );
1417 assert_zero(r);
1418}
1419
1420static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) {
1421 if (le) {
1422 LSN thislsn = toku_log_entry_get_lsn(le);
1423 fprintf(stderr, "%s:%d r=%d cmd=%c lsn=%" PRIu64 "\n", f, l, r, le->cmd, thislsn.lsn);
1424 } else
1425 fprintf(stderr, "%s:%d r=%d cmd=?\n", f, l, r);
1426}
1427
1428// For test purposes only.
1429static void (*recover_callback_fx)(void*) = NULL;
1430static void * recover_callback_args = NULL;
1431static void (*recover_callback2_fx)(void*) = NULL;
1432static void * recover_callback2_args = NULL;
1433
1434
1435static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_dir) {
1436 int r;
1437 int rr = 0;
1438 TOKULOGCURSOR logcursor = NULL;
1439 struct log_entry *le = NULL;
1440
1441 time_t tnow = time(NULL);
1442 fprintf(stderr, "%.24s PerconaFT recovery starting in env %s\n", ctime(&tnow), env_dir);
1443
1444 char org_wd[1000];
1445 {
1446 char *wd=getcwd(org_wd, sizeof(org_wd));
1447 assert(wd!=0);
1448 }
1449
1450 r = toku_logger_open(log_dir, renv->logger);
1451 assert(r == 0);
1452
1453 // grab the last LSN so that it can be restored when the log is restarted
1454 LSN lastlsn = toku_logger_last_lsn(renv->logger);
1455 LSN thislsn;
1456
1457 // there must be at least one log entry
1458 r = toku_logcursor_create(&logcursor, log_dir);
1459 assert(r == 0);
1460
1461 r = toku_logcursor_last(logcursor, &le);
1462 if (r != 0) {
1463 if (tokuft_recovery_trace)
1464 fprintf(stderr, "RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1465 rr = DB_RUNRECOVERY; goto errorexit;
1466 }
1467
1468 r = toku_logcursor_destroy(&logcursor);
1469 assert(r == 0);
1470
1471 r = toku_logcursor_create(&logcursor, log_dir);
1472 assert(r == 0);
1473
1474 {
1475 toku_struct_stat buf;
1476 if (toku_stat(env_dir, &buf, toku_uninstrumented)) {
1477 rr = get_error_errno();
1478 fprintf(stderr,
1479 "%.24s PerconaFT recovery error: directory does not exist: "
1480 "%s\n",
1481 ctime(&tnow),
1482 env_dir);
1483 goto errorexit;
1484 } else if (!S_ISDIR(buf.st_mode)) {
1485 fprintf(stderr, "%.24s PerconaFT recovery error: this file is supposed to be a directory, but is not: %s\n", ctime(&tnow), env_dir);
1486 rr = ENOTDIR; goto errorexit;
1487 }
1488 }
1489 // scan backwards
1490 scan_state_init(&renv->ss);
1491 tnow = time(NULL);
1492 time_t tlast;
1493 tlast = tnow;
1494 fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 "\n", ctime(&tnow), lastlsn.lsn);
1495 for (unsigned i=0; 1; i++) {
1496
1497 // get the previous log entry (first time gets the last one)
1498 le = NULL;
1499 r = toku_logcursor_prev(logcursor, &le);
1500 if (tokuft_recovery_trace)
1501 recover_trace_le(__FUNCTION__, __LINE__, r, le);
1502 if (r != 0) {
1503 if (r == DB_NOTFOUND)
1504 break;
1505 rr = DB_RUNRECOVERY;
1506 goto errorexit;
1507 }
1508
1509 // trace progress
1510 if ((i % 1000) == 0) {
1511 tnow = time(NULL);
1512 if (tnow - tlast >= tokuft_recovery_progress_time) {
1513 thislsn = toku_log_entry_get_lsn(le);
1514 fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 " at %" PRIu64 " (%s)\n",
1515 ctime(&tnow), lastlsn.lsn, thislsn.lsn, recover_state(renv));
1516 tlast = tnow;
1517 }
1518 }
1519
1520 // dispatch the log entry handler
1521 assert(renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
1522 renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END);
1523 logtype_dispatch_assign(le, toku_recover_backward_, r, renv);
1524 if (tokuft_recovery_trace)
1525 recover_trace_le(__FUNCTION__, __LINE__, r, le);
1526 if (r != 0) {
1527 if (tokuft_recovery_trace)
1528 fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1529 rr = DB_RUNRECOVERY;
1530 goto errorexit;
1531 }
1532 if (renv->goforward)
1533 break;
1534 }
1535
1536 // run first callback
1537 if (recover_callback_fx)
1538 recover_callback_fx(recover_callback_args);
1539
1540 // scan forwards
1541 assert(le);
1542 thislsn = toku_log_entry_get_lsn(le);
1543 tnow = time(NULL);
1544 fprintf(stderr, "%.24s PerconaFT recovery starts scanning forward to %" PRIu64 " from %" PRIu64 " left %" PRIu64 " (%s)\n",
1545 ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
1546
1547 for (unsigned i=0; 1; i++) {
1548
1549 // trace progress
1550 if ((i % 1000) == 0) {
1551 tnow = time(NULL);
1552 if (tnow - tlast >= tokuft_recovery_progress_time) {
1553 thislsn = toku_log_entry_get_lsn(le);
1554 fprintf(stderr, "%.24s PerconaFT recovery scanning forward to %" PRIu64 " at %" PRIu64 " left %" PRIu64 " (%s)\n",
1555 ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
1556 tlast = tnow;
1557 }
1558 }
1559
1560 // dispatch the log entry handler (first time calls the forward handler for the log entry at the turnaround
1561 assert(renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
1562 renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
1563 logtype_dispatch_assign(le, toku_recover_, r, renv);
1564 if (tokuft_recovery_trace)
1565 recover_trace_le(__FUNCTION__, __LINE__, r, le);
1566 if (r != 0) {
1567 if (tokuft_recovery_trace)
1568 fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1569 rr = DB_RUNRECOVERY;
1570 goto errorexit;
1571 }
1572
1573 // get the next log entry
1574 le = NULL;
1575 r = toku_logcursor_next(logcursor, &le);
1576 if (tokuft_recovery_trace)
1577 recover_trace_le(__FUNCTION__, __LINE__, r, le);
1578 if (r != 0) {
1579 if (r == DB_NOTFOUND)
1580 break;
1581 rr = DB_RUNRECOVERY;
1582 goto errorexit;
1583 }
1584 }
1585
1586 // verify the final recovery state
1587 assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
1588
1589 r = toku_logcursor_destroy(&logcursor);
1590 assert(r == 0);
1591
1592 // run second callback
1593 if (recover_callback2_fx)
1594 recover_callback2_fx(recover_callback2_args);
1595
1596 // restart logging
1597 toku_logger_restart(renv->logger, lastlsn);
1598
1599 // abort the live transactions
1600 {
1601 uint32_t n = recover_get_num_live_txns(renv);
1602 if (n > 0) {
1603 tnow = time(NULL);
1604 fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " live transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
1605 }
1606 }
1607 recover_abort_all_live_txns(renv);
1608 {
1609 uint32_t n = recover_get_num_live_txns(renv);
1610 if (n > 0) {
1611 tnow = time(NULL);
1612 fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " prepared transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
1613 }
1614 }
1615
1616 // close the open dictionaries
1617 uint32_t n;
1618 n = file_map_get_num_dictionaries(&renv->fmap);
1619 if (n > 0) {
1620 tnow = time(NULL);
1621 fprintf(stderr, "%.24s PerconaFT recovery closing %" PRIu32 " dictionar%s\n", ctime(&tnow), n, n > 1 ? "ies" : "y");
1622 }
1623 file_map_close_dictionaries(&renv->fmap, lastlsn);
1624
1625 {
1626 // write a recovery log entry
1627 BYTESTRING recover_comment = { static_cast<uint32_t>(strlen("recover")), (char *) "recover" };
1628 toku_log_comment(renv->logger, NULL, true, 0, recover_comment);
1629 }
1630
1631 // checkpoint
1632 tnow = time(NULL);
1633 fprintf(stderr, "%.24s PerconaFT recovery making a checkpoint\n", ctime(&tnow));
1634 r = toku_checkpoint(renv->cp, renv->logger, NULL, NULL, NULL, NULL, RECOVERY_CHECKPOINT);
1635 assert(r == 0);
1636 tnow = time(NULL);
1637 fprintf(stderr, "%.24s PerconaFT recovery done\n", ctime(&tnow));
1638
1639 return 0;
1640
1641 errorexit:
1642 tnow = time(NULL);
1643 fprintf(stderr, "%.24s PerconaFT recovery failed %d\n", ctime(&tnow), rr);
1644
1645 if (logcursor) {
1646 r = toku_logcursor_destroy(&logcursor);
1647 assert(r == 0);
1648 }
1649
1650 return rr;
1651}
1652
1653int
1654toku_recover_lock(const char *lock_dir, int *lockfd) {
1655 int e = toku_single_process_lock(lock_dir, "recovery", lockfd);
1656 if (e != 0 && e != ENOENT) {
1657 fprintf(stderr, "Couldn't run recovery because some other process holds the recovery lock\n");
1658 }
1659 return e;
1660}
1661
1662int
1663toku_recover_unlock(int lockfd) {
1664 int lockfd_copy = lockfd;
1665 return toku_single_process_unlock(&lockfd_copy);
1666}
1667
1668int tokuft_recover(DB_ENV *env,
1669 prepared_txn_callback_t prepared_txn_callback,
1670 keep_cachetable_callback_t keep_cachetable_callback,
1671 TOKULOGGER logger,
1672 const char *env_dir, const char *log_dir,
1673 ft_compare_func bt_compare,
1674 ft_update_func update_function,
1675 generate_row_for_put_func generate_row_for_put,
1676 generate_row_for_del_func generate_row_for_del,
1677 size_t cachetable_size) {
1678 int r;
1679 int lockfd = -1;
1680
1681 r = toku_recover_lock(log_dir, &lockfd);
1682 if (r != 0)
1683 return r;
1684
1685 int rr = 0;
1686 if (tokuft_needs_recovery(log_dir, false)) {
1687 struct recover_env renv;
1688 r = recover_env_init(&renv,
1689 env_dir,
1690 env,
1691 prepared_txn_callback,
1692 keep_cachetable_callback,
1693 logger,
1694 bt_compare,
1695 update_function,
1696 generate_row_for_put,
1697 generate_row_for_del,
1698 cachetable_size);
1699 assert(r == 0);
1700
1701 rr = do_recovery(&renv, env_dir, log_dir);
1702
1703 recover_env_cleanup(&renv);
1704 }
1705
1706 r = toku_recover_unlock(lockfd);
1707 if (r != 0)
1708 return r;
1709
1710 return rr;
1711}
1712
1713// Return 0 if recovery log exists, ENOENT if log is missing
1714int
1715tokuft_recover_log_exists(const char * log_dir) {
1716 int r;
1717 TOKULOGCURSOR logcursor;
1718
1719 r = toku_logcursor_create(&logcursor, log_dir);
1720 if (r == 0) {
1721 int rclose;
1722 r = toku_logcursor_log_exists(logcursor); // return ENOENT if no log
1723 rclose = toku_logcursor_destroy(&logcursor);
1724 assert(rclose == 0);
1725 }
1726 else
1727 r = ENOENT;
1728
1729 return r;
1730}
1731
1732void toku_recover_set_callback (void (*callback_fx)(void*), void* callback_args) {
1733 recover_callback_fx = callback_fx;
1734 recover_callback_args = callback_args;
1735}
1736
1737void toku_recover_set_callback2 (void (*callback_fx)(void*), void* callback_args) {
1738 recover_callback2_fx = callback_fx;
1739 recover_callback2_args = callback_args;
1740}
1741