1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <memory> |
40 | #include "ft/cachetable/cachetable.h" |
41 | #include "ft/cachetable/checkpoint.h" |
42 | #include "ft/ft.h" |
43 | #include "ft/log_header.h" |
44 | #include "ft/logger/log-internal.h" |
45 | #include "ft/logger/logcursor.h" |
46 | #include "ft/txn/txn_manager.h" |
47 | #include "util/omt.h" |
48 | |
49 | int tokuft_recovery_trace = 0; // turn on recovery tracing, default off. |
50 | |
51 | //#define DO_VERIFY_COUNTS |
52 | #ifdef DO_VERIFY_COUNTS |
53 | #define VERIFY_COUNTS(n) toku_verify_or_set_counts(n, false) |
54 | #else |
55 | #define VERIFY_COUNTS(n) ((void)0) |
56 | #endif |
57 | |
58 | // time in seconds between recovery progress reports |
59 | #define TOKUFT_RECOVERY_PROGRESS_TIME 15 |
60 | time_t tokuft_recovery_progress_time = TOKUFT_RECOVERY_PROGRESS_TIME; |
61 | |
62 | enum ss { |
63 | BACKWARD_NEWER_CHECKPOINT_END = 1, |
64 | BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END, |
65 | FORWARD_BETWEEN_CHECKPOINT_BEGIN_END, |
66 | FORWARD_NEWER_CHECKPOINT_END, |
67 | }; |
68 | |
69 | struct scan_state { |
70 | enum ss ss; |
71 | LSN checkpoint_begin_lsn; |
72 | LSN checkpoint_end_lsn; |
73 | uint64_t checkpoint_end_timestamp; |
74 | uint64_t checkpoint_begin_timestamp; |
75 | uint32_t checkpoint_num_fassociate; |
76 | uint32_t checkpoint_num_xstillopen; |
77 | TXNID last_xid; |
78 | }; |
79 | |
80 | static const char *scan_state_strings[] = { |
81 | "?" , "bw_newer" , "bw_between" , "fw_between" , "fw_newer" , |
82 | }; |
83 | |
84 | static void scan_state_init(struct scan_state *ss) { |
85 | ss->ss = BACKWARD_NEWER_CHECKPOINT_END; |
86 | ss->checkpoint_begin_lsn = ZERO_LSN; |
87 | ss->checkpoint_end_lsn = ZERO_LSN; |
88 | ss->checkpoint_num_fassociate = 0; |
89 | ss->checkpoint_num_xstillopen = 0; |
90 | ss->last_xid = 0; |
91 | } |
92 | |
93 | static const char *scan_state_string(struct scan_state *ss) { |
94 | assert(BACKWARD_NEWER_CHECKPOINT_END <= ss->ss && ss->ss <= FORWARD_NEWER_CHECKPOINT_END); |
95 | return scan_state_strings[ss->ss]; |
96 | } |
97 | |
98 | // File map tuple |
99 | struct file_map_tuple { |
100 | FILENUM filenum; |
101 | FT_HANDLE ft_handle; // NULL ft_handle means it's a rollback file. |
102 | char *iname; |
103 | struct __toku_db fake_db; |
104 | }; |
105 | |
106 | static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, FT_HANDLE ft_handle, char *iname) { |
107 | tuple->filenum = filenum; |
108 | tuple->ft_handle = ft_handle; |
109 | tuple->iname = iname; |
110 | // use a fake DB for comparisons, using the ft's cmp descriptor |
111 | memset(&tuple->fake_db, 0, sizeof(tuple->fake_db)); |
112 | tuple->fake_db.cmp_descriptor = &tuple->ft_handle->ft->cmp_descriptor; |
113 | tuple->fake_db.descriptor = &tuple->ft_handle->ft->descriptor; |
114 | } |
115 | |
116 | static void file_map_tuple_destroy(struct file_map_tuple *tuple) { |
117 | if (tuple->iname) { |
118 | toku_free(tuple->iname); |
119 | tuple->iname = NULL; |
120 | } |
121 | } |
122 | |
123 | // Map filenum to ft_handle |
124 | struct file_map { |
125 | toku::omt<struct file_map_tuple *> *filenums; |
126 | }; |
127 | |
128 | // The recovery environment |
129 | struct recover_env { |
130 | DB_ENV *env; |
131 | prepared_txn_callback_t prepared_txn_callback; // at the end of recovery, all the prepared txns are passed back to the ydb layer to make them into valid transactions. |
132 | keep_cachetable_callback_t keep_cachetable_callback; // after recovery, store the cachetable into the environment. |
133 | CACHETABLE ct; |
134 | TOKULOGGER logger; |
135 | CHECKPOINTER cp; |
136 | ft_compare_func bt_compare; |
137 | ft_update_func update_function; |
138 | generate_row_for_put_func generate_row_for_put; |
139 | generate_row_for_del_func generate_row_for_del; |
140 | DBT_ARRAY dest_keys; |
141 | DBT_ARRAY dest_vals; |
142 | struct scan_state ss; |
143 | struct file_map fmap; |
144 | bool goforward; |
145 | bool destroy_logger_at_end; // If true then destroy the logger when we are done. If false then set the logger into write-files mode when we are done with recovery.*/ |
146 | }; |
147 | typedef struct recover_env *RECOVER_ENV; |
148 | |
149 | |
150 | static void file_map_init(struct file_map *fmap) { |
151 | XMALLOC(fmap->filenums); |
152 | fmap->filenums->create(); |
153 | } |
154 | |
155 | static void file_map_destroy(struct file_map *fmap) { |
156 | fmap->filenums->destroy(); |
157 | toku_free(fmap->filenums); |
158 | fmap->filenums = nullptr; |
159 | } |
160 | |
161 | static uint32_t file_map_get_num_dictionaries(struct file_map *fmap) { |
162 | return fmap->filenums->size(); |
163 | } |
164 | |
165 | static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) { |
166 | int r; |
167 | |
168 | while (1) { |
169 | uint32_t n = fmap->filenums->size(); |
170 | if (n == 0) { |
171 | break; |
172 | } |
173 | struct file_map_tuple *tuple; |
174 | r = fmap->filenums->fetch(n - 1, &tuple); |
175 | assert(r == 0); |
176 | r = fmap->filenums->delete_at(n - 1); |
177 | assert(r == 0); |
178 | assert(tuple->ft_handle); |
179 | // Logging is on again, but we must pass the right LSN into close. |
180 | if (tuple->ft_handle) { // it's a DB, not a rollback file |
181 | toku_ft_handle_close_recovery(tuple->ft_handle, oplsn); |
182 | } |
183 | file_map_tuple_destroy(tuple); |
184 | toku_free(tuple); |
185 | } |
186 | } |
187 | |
188 | static int file_map_h(struct file_map_tuple *const &a, const FILENUM &b) { |
189 | if (a->filenum.fileid < b.fileid) { |
190 | return -1; |
191 | } else if (a->filenum.fileid > b.fileid) { |
192 | return 1; |
193 | } else { |
194 | return 0; |
195 | } |
196 | } |
197 | |
198 | static int file_map_insert (struct file_map *fmap, FILENUM fnum, FT_HANDLE ft_handle, char *iname) { |
199 | struct file_map_tuple *XMALLOC(tuple); |
200 | file_map_tuple_init(tuple, fnum, ft_handle, iname); |
201 | int r = fmap->filenums->insert<FILENUM, file_map_h>(tuple, fnum, nullptr); |
202 | return r; |
203 | } |
204 | |
205 | static void file_map_remove(struct file_map *fmap, FILENUM fnum) { |
206 | uint32_t idx; |
207 | struct file_map_tuple *tuple; |
208 | int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx); |
209 | if (r == 0) { |
210 | r = fmap->filenums->delete_at(idx); |
211 | file_map_tuple_destroy(tuple); |
212 | toku_free(tuple); |
213 | } |
214 | } |
215 | |
216 | // Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND) |
217 | static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) { |
218 | uint32_t idx; |
219 | struct file_map_tuple *tuple; |
220 | int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx); |
221 | if (r == 0) { |
222 | assert(tuple->filenum.fileid == fnum.fileid); |
223 | *file_map_tuple = tuple; |
224 | } else { |
225 | assert(r == DB_NOTFOUND); |
226 | } |
227 | return r; |
228 | } |
229 | |
230 | static int recover_env_init (RECOVER_ENV renv, |
231 | const char *env_dir, |
232 | DB_ENV *env, |
233 | prepared_txn_callback_t prepared_txn_callback, |
234 | keep_cachetable_callback_t keep_cachetable_callback, |
235 | TOKULOGGER logger, |
236 | ft_compare_func bt_compare, |
237 | ft_update_func update_function, |
238 | generate_row_for_put_func generate_row_for_put, |
239 | generate_row_for_del_func generate_row_for_del, |
240 | size_t cachetable_size) { |
241 | int r = 0; |
242 | |
243 | // If we are passed a logger use it, otherwise create one. |
244 | renv->destroy_logger_at_end = logger==NULL; |
245 | if (logger) { |
246 | renv->logger = logger; |
247 | } else { |
248 | r = toku_logger_create(&renv->logger); |
249 | assert(r == 0); |
250 | } |
251 | toku_logger_write_log_files(renv->logger, false); |
252 | toku_cachetable_create(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, renv->logger); |
253 | toku_cachetable_set_env_dir(renv->ct, env_dir); |
254 | if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct); |
255 | toku_logger_set_cachetable(renv->logger, renv->ct); |
256 | renv->env = env; |
257 | renv->prepared_txn_callback = prepared_txn_callback; |
258 | renv->keep_cachetable_callback = keep_cachetable_callback; |
259 | renv->bt_compare = bt_compare; |
260 | renv->update_function = update_function; |
261 | renv->generate_row_for_put = generate_row_for_put; |
262 | renv->generate_row_for_del = generate_row_for_del; |
263 | file_map_init(&renv->fmap); |
264 | renv->goforward = false; |
265 | renv->cp = toku_cachetable_get_checkpointer(renv->ct); |
266 | toku_dbt_array_init(&renv->dest_keys, 1); |
267 | toku_dbt_array_init(&renv->dest_vals, 1); |
268 | if (tokuft_recovery_trace) |
269 | fprintf(stderr, "%s:%d\n" , __FUNCTION__, __LINE__); |
270 | return r; |
271 | } |
272 | |
273 | static void recover_env_cleanup (RECOVER_ENV renv) { |
274 | invariant_zero(renv->fmap.filenums->size()); |
275 | file_map_destroy(&renv->fmap); |
276 | |
277 | if (renv->destroy_logger_at_end) { |
278 | toku_logger_close_rollback(renv->logger); |
279 | int r = toku_logger_close(&renv->logger); |
280 | assert(r == 0); |
281 | } else { |
282 | toku_logger_write_log_files(renv->logger, true); |
283 | } |
284 | |
285 | if (renv->keep_cachetable_callback) { |
286 | renv->ct = NULL; |
287 | } else { |
288 | toku_cachetable_close(&renv->ct); |
289 | } |
290 | toku_dbt_array_destroy(&renv->dest_keys); |
291 | toku_dbt_array_destroy(&renv->dest_vals); |
292 | |
293 | if (tokuft_recovery_trace) |
294 | fprintf(stderr, "%s:%d\n" , __FUNCTION__, __LINE__); |
295 | } |
296 | |
297 | static const char *recover_state(RECOVER_ENV renv) { |
298 | return scan_state_string(&renv->ss); |
299 | } |
300 | |
301 | // Open the file if it is not already open. If it is already open, then do nothing. |
302 | static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, uint32_t treeflags, |
303 | TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) { |
304 | int r = 0; |
305 | FT_HANDLE ft_handle = NULL; |
306 | char *iname = fixup_fname(bs_iname); |
307 | |
308 | toku_ft_handle_create(&ft_handle); |
309 | toku_ft_set_flags(ft_handle, treeflags); |
310 | |
311 | if (nodesize != 0) { |
312 | toku_ft_handle_set_nodesize(ft_handle, nodesize); |
313 | } |
314 | |
315 | if (basementnodesize != 0) { |
316 | toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize); |
317 | } |
318 | |
319 | if (compression_method != TOKU_DEFAULT_COMPRESSION_METHOD) { |
320 | toku_ft_handle_set_compression_method(ft_handle, compression_method); |
321 | } |
322 | |
323 | // set the key compare functions |
324 | if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare) { |
325 | toku_ft_set_bt_compare(ft_handle, renv->bt_compare); |
326 | } |
327 | |
328 | if (renv->update_function) { |
329 | toku_ft_set_update(ft_handle, renv->update_function); |
330 | } |
331 | |
332 | // TODO mode (FUTURE FEATURE) |
333 | //mode = mode; |
334 | |
335 | r = toku_ft_handle_open_recovery(ft_handle, iname, must_create, must_create, renv->ct, txn, filenum, max_acceptable_lsn); |
336 | if (r != 0) { |
337 | //Note: If ft_handle_open fails, then close_ft will NOT write a header to disk. |
338 | //No need to provide lsn, so use the regular toku_ft_handle_close function |
339 | toku_ft_handle_close(ft_handle); |
340 | toku_free(iname); |
341 | if (r == ENOENT) //Not an error to simply be missing. |
342 | r = 0; |
343 | return r; |
344 | } |
345 | |
346 | file_map_insert(&renv->fmap, filenum, ft_handle, iname); |
347 | return 0; |
348 | } |
349 | |
350 | static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) { |
351 | int r; |
352 | TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger); |
353 | switch (renv->ss.ss) { |
354 | case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: |
355 | assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn); |
356 | invariant(renv->ss.last_xid == TXNID_NONE); |
357 | renv->ss.last_xid = l->last_xid; |
358 | toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid); |
359 | |
360 | r = 0; |
361 | break; |
362 | case FORWARD_NEWER_CHECKPOINT_END: |
363 | assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn); |
364 | // Verify last_xid is no older than the previous begin |
365 | invariant(l->last_xid >= renv->ss.last_xid); |
366 | // Verify last_xid is no older than the newest txn |
367 | invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr)); |
368 | |
369 | r = 0; // ignore it (log only has a begin checkpoint) |
370 | break; |
371 | default: |
372 | fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n" , __FILE__, __LINE__, (int)renv->ss.ss); |
373 | abort(); |
374 | break; |
375 | } |
376 | return r; |
377 | } |
378 | |
379 | static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) { |
380 | int r; |
381 | time_t tnow = time(NULL); |
382 | fprintf(stderr, "%.24s PerconaFT recovery bw_begin_checkpoint at %" PRIu64 " timestamp %" PRIu64 " (%s)\n" , ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv)); |
383 | switch (renv->ss.ss) { |
384 | case BACKWARD_NEWER_CHECKPOINT_END: |
385 | // incomplete checkpoint, nothing to do |
386 | r = 0; |
387 | break; |
388 | case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END: |
389 | assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn); |
390 | renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END; |
391 | renv->ss.checkpoint_begin_timestamp = l->timestamp; |
392 | renv->goforward = true; |
393 | tnow = time(NULL); |
394 | fprintf(stderr, "%.24s PerconaFT recovery turning around at begin checkpoint %" PRIu64 " time %" PRIu64 "\n" , |
395 | ctime(&tnow), l->lsn.lsn, |
396 | renv->ss.checkpoint_end_timestamp - renv->ss.checkpoint_begin_timestamp); |
397 | r = 0; |
398 | break; |
399 | default: |
400 | fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n" , __FILE__, __LINE__, (int)renv->ss.ss); |
401 | abort(); |
402 | break; |
403 | } |
404 | return r; |
405 | } |
406 | |
407 | static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) { |
408 | int r; |
409 | switch (renv->ss.ss) { |
410 | case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: |
411 | assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn); |
412 | assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn); |
413 | assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate); |
414 | assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen); |
415 | renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END; |
416 | r = 0; |
417 | break; |
418 | case FORWARD_NEWER_CHECKPOINT_END: |
419 | assert(0); |
420 | return 0; |
421 | default: |
422 | assert(0); |
423 | return 0; |
424 | } |
425 | return r; |
426 | } |
427 | |
428 | static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) { |
429 | time_t tnow = time(NULL); |
430 | fprintf(stderr, "%.24s PerconaFT recovery bw_end_checkpoint at %" PRIu64 " timestamp %" PRIu64 " xid %" PRIu64 " (%s)\n" , ctime(&tnow), l->lsn.lsn, l->timestamp, l->lsn_begin_checkpoint.lsn, recover_state(renv)); |
431 | switch (renv->ss.ss) { |
432 | case BACKWARD_NEWER_CHECKPOINT_END: |
433 | renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END; |
434 | renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn; |
435 | renv->ss.checkpoint_end_lsn.lsn = l->lsn.lsn; |
436 | renv->ss.checkpoint_end_timestamp = l->timestamp; |
437 | return 0; |
438 | case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END: |
439 | fprintf(stderr, "PerconaFT recovery %s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n" , __FILE__, __LINE__); |
440 | abort(); |
441 | default: |
442 | break; |
443 | } |
444 | fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n" , __FILE__, __LINE__, (int)renv->ss.ss); |
445 | abort(); |
446 | } |
447 | |
448 | static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) { |
449 | struct file_map_tuple *tuple = NULL; |
450 | int r = file_map_find(&renv->fmap, l->filenum, &tuple); |
451 | char *fname = fixup_fname(&l->iname); |
452 | switch (renv->ss.ss) { |
453 | case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: |
454 | renv->ss.checkpoint_num_fassociate++; |
455 | assert(r==DB_NOTFOUND); //Not open |
456 | // Open it if it exists. |
457 | // If rollback file, specify which checkpointed version of file we need (not just the latest) |
458 | // because we cannot use a rollback log that is later than the last complete checkpoint. See #3113. |
459 | { |
460 | bool rollback_file = (0==strcmp(fname, toku_product_name_strings.rollback_cachefile)); |
461 | LSN max_acceptable_lsn = MAX_LSN; |
462 | if (rollback_file) { |
463 | max_acceptable_lsn = renv->ss.checkpoint_begin_lsn; |
464 | FT_HANDLE t; |
465 | toku_ft_handle_create(&t); |
466 | r = toku_ft_handle_open_recovery(t, toku_product_name_strings.rollback_cachefile, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn); |
467 | renv->logger->rollback_cachefile = t->ft->cf; |
468 | toku_logger_initialize_rollback_cache(renv->logger, t->ft); |
469 | } else { |
470 | r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn); |
471 | assert(r==0); |
472 | } |
473 | } |
474 | // try to open the file again and if we get it, restore |
475 | // the unlink on close bit. |
476 | int ret; |
477 | ret = file_map_find(&renv->fmap, l->filenum, &tuple); |
478 | if (ret == 0 && l->unlink_on_close) { |
479 | toku_cachefile_unlink_on_close(tuple->ft_handle->ft->cf); |
480 | } |
481 | break; |
482 | case FORWARD_NEWER_CHECKPOINT_END: |
483 | if (r == 0) { //IF it is open |
484 | // assert that the filenum maps to the correct iname |
485 | assert(strcmp(fname, tuple->iname) == 0); |
486 | } |
487 | r = 0; |
488 | break; |
489 | default: |
490 | assert(0); |
491 | return 0; |
492 | } |
493 | toku_free(fname); |
494 | |
495 | return r; |
496 | } |
497 | |
498 | static int toku_recover_backward_fassociate (struct logtype_fassociate *UU(l), RECOVER_ENV UU(renv)) { |
499 | // nothing |
500 | return 0; |
501 | } |
502 | |
503 | static int |
504 | recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOGGER logger) { |
505 | int r; |
506 | |
507 | // lookup the parent |
508 | TOKUTXN parent = NULL; |
509 | if (!txn_pair_is_none(parentxid)) { |
510 | toku_txnid2txn(logger, parentxid, &parent); |
511 | assert(parent!=NULL); |
512 | } |
513 | else { |
514 | invariant(xid.child_id64 == TXNID_NONE); |
515 | } |
516 | |
517 | // create a transaction and bind it to the transaction id |
518 | TOKUTXN txn = NULL; |
519 | { |
520 | //Verify it does not yet exist. |
521 | toku_txnid2txn(logger, xid, &txn); |
522 | assert(txn==NULL); |
523 | } |
524 | r = toku_txn_begin_with_xid( |
525 | parent, |
526 | &txn, |
527 | logger, |
528 | xid, |
529 | TXN_SNAPSHOT_NONE, |
530 | NULL, |
531 | true, // for_recovery |
532 | false // read_only |
533 | ); |
534 | assert(r == 0); |
535 | // We only know about it because it was logged. Restore the log bit. |
536 | // Logging is 'off' but it will still set the bit. |
537 | toku_maybe_log_begin_txn_for_write_operation(txn); |
538 | if (txnp) *txnp = txn; |
539 | return 0; |
540 | } |
541 | |
542 | static int recover_xstillopen_internal (TOKUTXN *txnp, |
543 | LSN UU(lsn), |
544 | TXNID_PAIR xid, |
545 | TXNID_PAIR parentxid, |
546 | uint64_t rollentry_raw_count, |
547 | FILENUMS open_filenums, |
548 | bool force_fsync_on_commit, |
549 | uint64_t num_rollback_nodes, |
550 | uint64_t num_rollentries, |
551 | BLOCKNUM spilled_rollback_head, |
552 | BLOCKNUM spilled_rollback_tail, |
553 | BLOCKNUM current_rollback, |
554 | uint32_t UU(crc), |
555 | uint32_t UU(len), |
556 | RECOVER_ENV renv) { |
557 | int r; |
558 | *txnp = NULL; |
559 | switch (renv->ss.ss) { |
560 | case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: { |
561 | renv->ss.checkpoint_num_xstillopen++; |
562 | invariant(renv->ss.last_xid != TXNID_NONE); |
563 | invariant(xid.parent_id64 <= renv->ss.last_xid); |
564 | TOKUTXN txn = NULL; |
565 | { //Create the transaction. |
566 | r = recover_transaction(&txn, xid, parentxid, renv->logger); |
567 | assert(r==0); |
568 | assert(txn!=NULL); |
569 | *txnp = txn; |
570 | } |
571 | { //Recover rest of transaction. |
572 | #define COPY_TO_INFO(field) .field = field |
573 | struct txninfo info = { |
574 | COPY_TO_INFO(rollentry_raw_count), |
575 | .num_fts = 0, //Set afterwards |
576 | .open_fts = NULL, //Set afterwards |
577 | COPY_TO_INFO(force_fsync_on_commit), |
578 | COPY_TO_INFO(num_rollback_nodes), |
579 | COPY_TO_INFO(num_rollentries), |
580 | COPY_TO_INFO(spilled_rollback_head), |
581 | COPY_TO_INFO(spilled_rollback_tail), |
582 | COPY_TO_INFO(current_rollback) |
583 | }; |
584 | #undef COPY_TO_INFO |
585 | //Generate open_fts |
586 | FT array[open_filenums.num]; //Allocate maximum possible requirement |
587 | info.open_fts = array; |
588 | uint32_t i; |
589 | for (i = 0; i < open_filenums.num; i++) { |
590 | //open_filenums.filenums[] |
591 | struct file_map_tuple *tuple = NULL; |
592 | r = file_map_find(&renv->fmap, open_filenums.filenums[i], &tuple); |
593 | if (r==0) { |
594 | info.open_fts[info.num_fts++] = tuple->ft_handle->ft; |
595 | } |
596 | else { |
597 | assert(r==DB_NOTFOUND); |
598 | } |
599 | } |
600 | r = toku_txn_load_txninfo(txn, &info); |
601 | assert(r==0); |
602 | } |
603 | break; |
604 | } |
605 | case FORWARD_NEWER_CHECKPOINT_END: { |
606 | // assert that the transaction exists |
607 | TOKUTXN txn = NULL; |
608 | toku_txnid2txn(renv->logger, xid, &txn); |
609 | r = 0; |
610 | *txnp = txn; |
611 | break; |
612 | } |
613 | default: |
614 | assert(0); |
615 | return 0; |
616 | } |
617 | return r; |
618 | } |
619 | |
620 | static int toku_recover_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) { |
621 | TOKUTXN txn; |
622 | return recover_xstillopen_internal (&txn, |
623 | l->lsn, |
624 | l->xid, |
625 | l->parentxid, |
626 | l->rollentry_raw_count, |
627 | l->open_filenums, |
628 | l->force_fsync_on_commit, |
629 | l->num_rollback_nodes, |
630 | l->num_rollentries, |
631 | l->spilled_rollback_head, |
632 | l->spilled_rollback_tail, |
633 | l->current_rollback, |
634 | l->crc, |
635 | l->len, |
636 | renv); |
637 | } |
638 | |
639 | static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l, RECOVER_ENV renv) { |
640 | TOKUTXN txn; |
641 | int r = recover_xstillopen_internal (&txn, |
642 | l->lsn, |
643 | l->xid, |
644 | TXNID_PAIR_NONE, |
645 | l->rollentry_raw_count, |
646 | l->open_filenums, |
647 | l->force_fsync_on_commit, |
648 | l->num_rollback_nodes, |
649 | l->num_rollentries, |
650 | l->spilled_rollback_head, |
651 | l->spilled_rollback_tail, |
652 | l->current_rollback, |
653 | l->crc, |
654 | l->len, |
655 | renv); |
656 | if (r != 0) { |
657 | goto exit; |
658 | } |
659 | switch (renv->ss.ss) { |
660 | case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: { |
661 | toku_txn_prepare_txn(txn, l->xa_xid, 0); |
662 | break; |
663 | } |
664 | case FORWARD_NEWER_CHECKPOINT_END: { |
665 | assert(txn->state == TOKUTXN_PREPARING); |
666 | break; |
667 | } |
668 | default: { |
669 | assert(0); |
670 | } |
671 | } |
672 | exit: |
673 | return r; |
674 | } |
675 | |
676 | static int toku_recover_backward_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) { |
677 | // nothing |
678 | return 0; |
679 | } |
680 | static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenprepared *UU(l), RECOVER_ENV UU(renv)) { |
681 | // nothing |
682 | return 0; |
683 | } |
684 | |
685 | static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) { |
686 | int r; |
687 | r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger); |
688 | return r; |
689 | } |
690 | |
691 | static int toku_recover_backward_xbegin (struct logtype_xbegin *UU(l), RECOVER_ENV UU(renv)) { |
692 | // nothing |
693 | return 0; |
694 | } |
695 | |
696 | struct { |
697 | time_t ; |
698 | LSN ; |
699 | const char *; |
700 | TXNID_PAIR ; |
701 | uint64_t ; |
702 | }; |
703 | |
704 | static void toku_recover_txn_progress(TOKU_TXN_PROGRESS txn_progress, void *) { |
705 | toku_txn_progress_extra * = static_cast<toku_txn_progress_extra *>(extra); |
706 | if (txn_progress_extra->last_total == 0) |
707 | txn_progress_extra->last_total = txn_progress->entries_total; |
708 | else |
709 | assert(txn_progress_extra->last_total == txn_progress->entries_total); |
710 | time_t tnow = time(NULL); |
711 | if (tnow - txn_progress_extra->tlast >= tokuft_recovery_progress_time) { |
712 | txn_progress_extra->tlast = tnow; |
713 | fprintf(stderr, "%.24s PerconaFT " , ctime(&tnow)); |
714 | if (txn_progress_extra->lsn.lsn != 0) |
715 | fprintf(stderr, "lsn %" PRIu64 " " , txn_progress_extra->lsn.lsn); |
716 | fprintf(stderr, "%s xid %" PRIu64 ":%" PRIu64 " " , |
717 | txn_progress_extra->type, txn_progress_extra->xid.parent_id64, txn_progress_extra->xid.child_id64); |
718 | fprintf(stderr, "%" PRIu64 "/%" PRIu64 " " , |
719 | txn_progress->entries_processed, txn_progress->entries_total); |
720 | if (txn_progress->entries_total > 0) |
721 | fprintf(stderr, "%.0f%% " , ((double) txn_progress->entries_processed / (double) txn_progress->entries_total) * 100.0); |
722 | fprintf(stderr, "\n" ); |
723 | } |
724 | } |
725 | |
726 | static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) { |
727 | // find the transaction by transaction id |
728 | TOKUTXN txn = NULL; |
729 | toku_txnid2txn(renv->logger, l->xid, &txn); |
730 | assert(txn!=NULL); |
731 | |
732 | // commit the transaction |
733 | toku_txn_progress_extra = { time(NULL), l->lsn, "commit" , l->xid, 0 }; |
734 | int r = toku_txn_commit_with_lsn(txn, true, l->lsn, toku_recover_txn_progress, &extra); |
735 | assert(r == 0); |
736 | |
737 | // close the transaction |
738 | toku_txn_close_txn(txn); |
739 | |
740 | return 0; |
741 | } |
742 | |
743 | static int toku_recover_backward_xcommit (struct logtype_xcommit *UU(l), RECOVER_ENV UU(renv)) { |
744 | // nothing |
745 | return 0; |
746 | } |
747 | |
748 | static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) { |
749 | // find the transaction by transaction id |
750 | TOKUTXN txn = NULL; |
751 | toku_txnid2txn(renv->logger, l->xid, &txn); |
752 | assert(txn!=NULL); |
753 | |
754 | // Save the transaction |
755 | toku_txn_prepare_txn(txn, l->xa_xid, 0); |
756 | |
757 | return 0; |
758 | } |
759 | |
760 | static int toku_recover_backward_xprepare (struct logtype_xprepare *UU(l), RECOVER_ENV UU(renv)) { |
761 | // nothing |
762 | return 0; |
763 | } |
764 | |
765 | |
766 | |
767 | static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { |
768 | int r; |
769 | |
770 | // find the transaction by transaction id |
771 | TOKUTXN txn = NULL; |
772 | toku_txnid2txn(renv->logger, l->xid, &txn); |
773 | assert(txn!=NULL); |
774 | |
775 | // abort the transaction |
776 | toku_txn_progress_extra = { time(NULL), l->lsn, "abort" , l->xid, 0 }; |
777 | r = toku_txn_abort_with_lsn(txn, l->lsn, toku_recover_txn_progress, &extra); |
778 | assert(r == 0); |
779 | |
780 | // close the transaction |
781 | toku_txn_close_txn(txn); |
782 | |
783 | return 0; |
784 | } |
785 | |
786 | static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) { |
787 | // nothing |
788 | return 0; |
789 | } |
790 | |
791 | // fcreate is like fopen except that the file must be created. |
792 | static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) { |
793 | int r; |
794 | |
795 | TOKUTXN txn = NULL; |
796 | toku_txnid2txn(renv->logger, l->xid, &txn); |
797 | |
798 | // assert that filenum is closed |
799 | struct file_map_tuple *tuple = NULL; |
800 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
801 | assert(r==DB_NOTFOUND); |
802 | |
803 | assert(txn!=NULL); |
804 | |
805 | //unlink if it exists (recreate from scratch). |
806 | char *iname = fixup_fname(&l->iname); |
807 | char *iname_in_cwd = toku_cachetable_get_fname_in_cwd(renv->ct, iname); |
808 | r = unlink(iname_in_cwd); |
809 | if (r != 0) { |
810 | int er = get_error_errno(); |
811 | if (er != ENOENT) { |
812 | fprintf(stderr, "PerconaFT recovery %s:%d unlink %s %d\n" , __FUNCTION__, __LINE__, iname, er); |
813 | toku_free(iname); |
814 | return r; |
815 | } |
816 | } |
817 | assert(0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)); //Creation of rollback cachefile never gets logged. |
818 | toku_free(iname_in_cwd); |
819 | toku_free(iname); |
820 | |
821 | bool must_create = true; |
822 | r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, txn, l->nodesize, l->basementnodesize, (enum toku_compression_method) l->compression_method, MAX_LSN); |
823 | return r; |
824 | } |
825 | |
826 | static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) { |
827 | // nothing |
828 | return 0; |
829 | } |
830 | |
831 | |
832 | |
833 | static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) { |
834 | int r; |
835 | |
836 | // assert that filenum is closed |
837 | struct file_map_tuple *tuple = NULL; |
838 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
839 | assert(r==DB_NOTFOUND); |
840 | |
841 | bool must_create = false; |
842 | TOKUTXN txn = NULL; |
843 | char *fname = fixup_fname(&l->iname); |
844 | |
845 | assert(0!=strcmp(fname, toku_product_name_strings.rollback_cachefile)); //Rollback cachefile can be opened only via fassociate. |
846 | r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, txn, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, MAX_LSN); |
847 | |
848 | toku_free(fname); |
849 | return r; |
850 | } |
851 | |
852 | static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), RECOVER_ENV UU(renv)) { |
853 | // nothing |
854 | return 0; |
855 | } |
856 | |
857 | static int toku_recover_change_fdescriptor (struct logtype_change_fdescriptor *l, RECOVER_ENV renv) { |
858 | int r; |
859 | struct file_map_tuple *tuple = NULL; |
860 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
861 | if (r==0) { |
862 | TOKUTXN txn = NULL; |
863 | //Maybe do the descriptor (lsn filter) |
864 | toku_txnid2txn(renv->logger, l->xid, &txn); |
865 | DBT old_descriptor, new_descriptor; |
866 | toku_fill_dbt( |
867 | &old_descriptor, |
868 | l->old_descriptor.data, |
869 | l->old_descriptor.len |
870 | ); |
871 | toku_fill_dbt( |
872 | &new_descriptor, |
873 | l->new_descriptor.data, |
874 | l->new_descriptor.len |
875 | ); |
876 | toku_ft_change_descriptor( |
877 | tuple->ft_handle, |
878 | &old_descriptor, |
879 | &new_descriptor, |
880 | false, |
881 | txn, |
882 | l->update_cmp_descriptor |
883 | ); |
884 | } |
885 | return 0; |
886 | } |
887 | |
888 | static int toku_recover_backward_change_fdescriptor (struct logtype_change_fdescriptor *UU(l), RECOVER_ENV UU(renv)) { |
889 | return 0; |
890 | } |
891 | |
892 | |
893 | // if file referred to in l is open, close it |
894 | static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV renv) { |
895 | struct file_map_tuple *tuple = NULL; |
896 | int r = file_map_find(&renv->fmap, l->filenum, &tuple); |
897 | if (r == 0) { // if file is open |
898 | char *iname = fixup_fname(&l->iname); |
899 | assert(strcmp(tuple->iname, iname) == 0); // verify that file_map has same iname as log entry |
900 | |
901 | if (0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)) { |
902 | //Rollback cachefile is closed manually at end of recovery, not here |
903 | toku_ft_handle_close_recovery(tuple->ft_handle, l->lsn); |
904 | } |
905 | file_map_remove(&renv->fmap, l->filenum); |
906 | toku_free(iname); |
907 | } |
908 | return 0; |
909 | } |
910 | |
911 | static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) { |
912 | // nothing |
913 | return 0; |
914 | } |
915 | |
916 | // fdelete is a transactional file delete. |
917 | static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) { |
918 | TOKUTXN txn = NULL; |
919 | toku_txnid2txn(renv->logger, l->xid, &txn); |
920 | assert(txn != NULL); |
921 | |
922 | // if the forward scan in recovery found this file and opened it, we |
923 | // need to mark the txn to remove the ft on commit. if the file was |
924 | // not found and not opened, we don't need to do anything - the ft |
925 | // is already gone, so we're happy. |
926 | struct file_map_tuple *tuple; |
927 | int r = file_map_find(&renv->fmap, l->filenum, &tuple); |
928 | if (r == 0) { |
929 | toku_ft_unlink_on_commit(tuple->ft_handle, txn); |
930 | } |
931 | return 0; |
932 | } |
933 | |
934 | static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER_ENV UU(renv)) { |
935 | // nothing |
936 | return 0; |
937 | } |
938 | |
939 | static int toku_recover_frename(struct logtype_frename *l, RECOVER_ENV renv) { |
940 | assert(renv); |
941 | assert(renv->env); |
942 | |
943 | toku_struct_stat stat; |
944 | const char *data_dir = renv->env->get_data_dir(renv->env); |
945 | bool old_exist = true; |
946 | bool new_exist = true; |
947 | |
948 | assert(data_dir); |
949 | |
950 | struct file_map_tuple *tuple; |
951 | |
952 | std::unique_ptr<char[], decltype(&toku_free)> old_iname_full( |
953 | toku_construct_full_name(2, data_dir, l->old_iname.data), &toku_free); |
954 | std::unique_ptr<char[], decltype(&toku_free)> new_iname_full( |
955 | toku_construct_full_name(2, data_dir, l->new_iname.data), &toku_free); |
956 | |
957 | if (toku_stat(old_iname_full.get(), &stat, toku_uninstrumented) == -1) { |
958 | if (ENOENT == errno) |
959 | old_exist = false; |
960 | else |
961 | return 1; |
962 | } |
963 | |
964 | if (toku_stat(new_iname_full.get(), &stat, toku_uninstrumented) == -1) { |
965 | if (ENOENT == errno) |
966 | new_exist = false; |
967 | else |
968 | return 1; |
969 | } |
970 | |
971 | // Both old and new files can exist if: |
972 | // - rename() is not completed |
973 | // - fcreate was replayed during recovery |
974 | // 'Stalled cachefiles' container cachefile_list::m_stale_fileid contains |
975 | // closed but not yet evicted cachefiles and the key of this container is |
976 | // fs-dependent file id - (device id, inode number) pair. As it is supposed |
977 | // new file have not yet created during recovery process the 'stalled |
978 | // cachefile' container can contain only cache file of old file. |
979 | // To preserve the old cachefile file's id and keep it in |
980 | // 'stalled cachefiles' container the new file is removed |
981 | // and the old file is renamed. |
982 | if (old_exist && new_exist && |
983 | (toku_os_delete(new_iname_full.get()) == -1 || |
984 | toku_os_rename(old_iname_full.get(), new_iname_full.get()) == -1 || |
985 | toku_fsync_directory(old_iname_full.get()) == -1 || |
986 | toku_fsync_directory(new_iname_full.get()) == -1)) |
987 | return 1; |
988 | |
989 | if (old_exist && !new_exist && |
990 | (!toku_create_subdirs_if_needed(new_iname_full.get()) || |
991 | toku_os_rename(old_iname_full.get(), new_iname_full.get()) == -1 || |
992 | toku_fsync_directory(old_iname_full.get()) == -1 || |
993 | toku_fsync_directory(new_iname_full.get()) == -1)) |
994 | return 1; |
995 | |
996 | if (file_map_find(&renv->fmap, l->old_filenum, &tuple) != DB_NOTFOUND) { |
997 | if (tuple->iname) |
998 | toku_free(tuple->iname); |
999 | tuple->iname = toku_xstrdup(l->new_iname.data); |
1000 | } |
1001 | |
1002 | TOKUTXN txn = NULL; |
1003 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1004 | |
1005 | if (txn) |
1006 | toku_logger_save_rollback_frename(txn, &l->old_iname, &l->new_iname); |
1007 | |
1008 | return 0; |
1009 | } |
1010 | |
1011 | static int toku_recover_backward_frename(struct logtype_frename *UU(l), |
1012 | RECOVER_ENV UU(renv)) { |
1013 | // nothing |
1014 | return 0; |
1015 | } |
1016 | |
1017 | static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) { |
1018 | int r; |
1019 | TOKUTXN txn = NULL; |
1020 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1021 | assert(txn!=NULL); |
1022 | struct file_map_tuple *tuple = NULL; |
1023 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
1024 | if (r==0) { |
1025 | //Maybe do the insertion if we found the cachefile. |
1026 | DBT keydbt, valdbt; |
1027 | toku_fill_dbt(&keydbt, l->key.data, l->key.len); |
1028 | toku_fill_dbt(&valdbt, l->value.data, l->value.len); |
1029 | toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT); |
1030 | toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft); |
1031 | } |
1032 | return 0; |
1033 | } |
1034 | |
1035 | static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) { |
1036 | // nothing |
1037 | return 0; |
1038 | } |
1039 | |
1040 | static int toku_recover_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *l, RECOVER_ENV renv) { |
1041 | int r; |
1042 | TOKUTXN txn = NULL; |
1043 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1044 | assert(txn!=NULL); |
1045 | struct file_map_tuple *tuple = NULL; |
1046 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
1047 | if (r==0) { |
1048 | //Maybe do the insertion if we found the cachefile. |
1049 | DBT keydbt, valdbt; |
1050 | toku_fill_dbt(&keydbt, l->key.data, l->key.len); |
1051 | toku_fill_dbt(&valdbt, l->value.data, l->value.len); |
1052 | toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT_NO_OVERWRITE); |
1053 | } |
1054 | return 0; |
1055 | } |
1056 | |
1057 | static int toku_recover_backward_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *UU(l), RECOVER_ENV UU(renv)) { |
1058 | // nothing |
1059 | return 0; |
1060 | } |
1061 | |
1062 | static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVER_ENV renv) { |
1063 | int r; |
1064 | TOKUTXN txn = NULL; |
1065 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1066 | assert(txn!=NULL); |
1067 | struct file_map_tuple *tuple = NULL; |
1068 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
1069 | if (r==0) { |
1070 | //Maybe do the deletion if we found the cachefile. |
1071 | DBT keydbt; |
1072 | toku_fill_dbt(&keydbt, l->key.data, l->key.len); |
1073 | toku_ft_maybe_delete(tuple->ft_handle, &keydbt, txn, true, l->lsn, false); |
1074 | } |
1075 | return 0; |
1076 | } |
1077 | |
1078 | static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(renv)) { |
1079 | // nothing |
1080 | return 0; |
1081 | } |
1082 | |
1083 | static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) { |
1084 | int r; |
1085 | TOKUTXN txn = NULL; |
1086 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1087 | assert(txn!=NULL); |
1088 | DB *src_db = NULL; |
1089 | bool do_inserts = true; |
1090 | { |
1091 | struct file_map_tuple *tuple = NULL; |
1092 | r = file_map_find(&renv->fmap, l->src_filenum, &tuple); |
1093 | if (l->src_filenum.fileid == FILENUM_NONE.fileid) |
1094 | assert(r==DB_NOTFOUND); |
1095 | else { |
1096 | if (r == 0) |
1097 | src_db = &tuple->fake_db; |
1098 | else |
1099 | do_inserts = false; // src file was probably deleted, #3129 |
1100 | } |
1101 | } |
1102 | |
1103 | if (do_inserts) { |
1104 | DBT src_key, src_val; |
1105 | |
1106 | toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len); |
1107 | toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len); |
1108 | |
1109 | for (uint32_t file = 0; file < l->dest_filenums.num; file++) { |
1110 | struct file_map_tuple *tuple = NULL; |
1111 | r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple); |
1112 | if (r==0) { |
1113 | // We found the cachefile. (maybe) Do the insert. |
1114 | DB *db = &tuple->fake_db; |
1115 | |
1116 | DBT_ARRAY key_array; |
1117 | DBT_ARRAY val_array; |
1118 | if (db != src_db) { |
1119 | r = renv->generate_row_for_put(db, src_db, &renv->dest_keys, &renv->dest_vals, &src_key, &src_val); |
1120 | assert(r==0); |
1121 | invariant(renv->dest_keys.size <= renv->dest_keys.capacity); |
1122 | invariant(renv->dest_vals.size <= renv->dest_vals.capacity); |
1123 | invariant(renv->dest_keys.size == renv->dest_vals.size); |
1124 | key_array = renv->dest_keys; |
1125 | val_array = renv->dest_vals; |
1126 | } else { |
1127 | key_array.size = key_array.capacity = 1; |
1128 | key_array.dbts = &src_key; |
1129 | |
1130 | val_array.size = val_array.capacity = 1; |
1131 | val_array.dbts = &src_val; |
1132 | } |
1133 | for (uint32_t i = 0; i < key_array.size; i++) { |
1134 | toku_ft_maybe_insert(tuple->ft_handle, &key_array.dbts[i], &val_array.dbts[i], txn, true, l->lsn, false, FT_INSERT); |
1135 | } |
1136 | } |
1137 | } |
1138 | } |
1139 | |
1140 | return 0; |
1141 | } |
1142 | |
1143 | static int toku_recover_backward_enq_insert_multiple (struct logtype_enq_insert_multiple *UU(l), RECOVER_ENV UU(renv)) { |
1144 | // nothing |
1145 | return 0; |
1146 | } |
1147 | |
1148 | static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple *l, RECOVER_ENV renv) { |
1149 | int r; |
1150 | TOKUTXN txn = NULL; |
1151 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1152 | assert(txn!=NULL); |
1153 | DB *src_db = NULL; |
1154 | bool do_deletes = true; |
1155 | { |
1156 | struct file_map_tuple *tuple = NULL; |
1157 | r = file_map_find(&renv->fmap, l->src_filenum, &tuple); |
1158 | if (l->src_filenum.fileid == FILENUM_NONE.fileid) |
1159 | assert(r==DB_NOTFOUND); |
1160 | else { |
1161 | if (r == 0) { |
1162 | src_db = &tuple->fake_db; |
1163 | } else { |
1164 | do_deletes = false; // src file was probably deleted, #3129 |
1165 | } |
1166 | } |
1167 | } |
1168 | |
1169 | if (do_deletes) { |
1170 | DBT src_key, src_val; |
1171 | toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len); |
1172 | toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len); |
1173 | |
1174 | for (uint32_t file = 0; file < l->dest_filenums.num; file++) { |
1175 | struct file_map_tuple *tuple = NULL; |
1176 | r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple); |
1177 | if (r==0) { |
1178 | // We found the cachefile. (maybe) Do the delete. |
1179 | DB *db = &tuple->fake_db; |
1180 | |
1181 | DBT_ARRAY key_array; |
1182 | if (db != src_db) { |
1183 | r = renv->generate_row_for_del(db, src_db, &renv->dest_keys, &src_key, &src_val); |
1184 | assert(r==0); |
1185 | invariant(renv->dest_keys.size <= renv->dest_keys.capacity); |
1186 | key_array = renv->dest_keys; |
1187 | } else { |
1188 | key_array.size = key_array.capacity = 1; |
1189 | key_array.dbts = &src_key; |
1190 | } |
1191 | for (uint32_t i = 0; i < key_array.size; i++) { |
1192 | toku_ft_maybe_delete(tuple->ft_handle, &key_array.dbts[i], txn, true, l->lsn, false); |
1193 | } |
1194 | } |
1195 | } |
1196 | } |
1197 | |
1198 | return 0; |
1199 | } |
1200 | |
1201 | static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_multiple *UU(l), RECOVER_ENV UU(renv)) { |
1202 | // nothing |
1203 | return 0; |
1204 | } |
1205 | |
1206 | static int toku_recover_enq_update(struct logtype_enq_update *l, RECOVER_ENV renv) { |
1207 | int r; |
1208 | TOKUTXN txn = NULL; |
1209 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1210 | assert(txn != NULL); |
1211 | struct file_map_tuple *tuple = NULL; |
1212 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
1213 | if (r == 0) { |
1214 | // Maybe do the update if we found the cachefile. |
1215 | DBT key, ; |
1216 | toku_fill_dbt(&key, l->key.data, l->key.len); |
1217 | toku_fill_dbt(&extra, l->extra.data, l->extra.len); |
1218 | toku_ft_maybe_update(tuple->ft_handle, &key, &extra, txn, true, l->lsn, false); |
1219 | } |
1220 | return 0; |
1221 | } |
1222 | |
1223 | static int toku_recover_enq_updatebroadcast(struct logtype_enq_updatebroadcast *l, RECOVER_ENV renv) { |
1224 | int r; |
1225 | TOKUTXN txn = NULL; |
1226 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1227 | assert(txn != NULL); |
1228 | struct file_map_tuple *tuple = NULL; |
1229 | r = file_map_find(&renv->fmap, l->filenum, &tuple); |
1230 | if (r == 0) { |
1231 | // Maybe do the update broadcast if we found the cachefile. |
1232 | DBT ; |
1233 | toku_fill_dbt(&extra, l->extra.data, l->extra.len); |
1234 | toku_ft_maybe_update_broadcast(tuple->ft_handle, &extra, txn, true, |
1235 | l->lsn, false, l->is_resetting_op); |
1236 | } |
1237 | return 0; |
1238 | } |
1239 | |
1240 | static int toku_recover_backward_enq_update(struct logtype_enq_update *UU(l), RECOVER_ENV UU(renv)) { |
1241 | // nothing |
1242 | return 0; |
1243 | } |
1244 | |
1245 | static int toku_recover_backward_enq_updatebroadcast(struct logtype_enq_updatebroadcast *UU(l), RECOVER_ENV UU(renv)) { |
1246 | // nothing |
1247 | return 0; |
1248 | } |
1249 | |
1250 | static int (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) { |
1251 | // nothing |
1252 | return 0; |
1253 | } |
1254 | |
1255 | static int (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) { |
1256 | // nothing |
1257 | return 0; |
1258 | } |
1259 | |
1260 | static int toku_recover_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) { |
1261 | // nothing |
1262 | return 0; |
1263 | } |
1264 | |
1265 | static int toku_recover_backward_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) { |
1266 | // nothing |
1267 | return 0; |
1268 | } |
1269 | |
1270 | static int toku_recover_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) { |
1271 | // nothing |
1272 | return 0; |
1273 | } |
1274 | |
1275 | static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) { |
1276 | // nothing |
1277 | return 0; |
1278 | } |
1279 | |
1280 | static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) { |
1281 | TOKUTXN txn = NULL; |
1282 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1283 | assert(txn!=NULL); |
1284 | char *new_iname = fixup_fname(&l->new_iname); |
1285 | |
1286 | toku_ft_load_recovery(txn, l->old_filenum, new_iname, 0, 0, (LSN*)NULL); |
1287 | |
1288 | toku_free(new_iname); |
1289 | return 0; |
1290 | } |
1291 | |
1292 | static int toku_recover_backward_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) { |
1293 | // nothing |
1294 | return 0; |
1295 | } |
1296 | |
1297 | // #2954 |
1298 | static int toku_recover_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) { |
1299 | TOKUTXN txn = NULL; |
1300 | toku_txnid2txn(renv->logger, l->xid, &txn); |
1301 | assert(txn!=NULL); |
1302 | // just make an entry in the rollback log |
1303 | // - set do_log = 0 -> don't write to recovery log |
1304 | toku_ft_hot_index_recovery(txn, l->hot_index_filenums, 0, 0, (LSN*)NULL); |
1305 | return 0; |
1306 | } |
1307 | |
1308 | // #2954 |
1309 | static int toku_recover_backward_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) { |
1310 | // nothing |
1311 | return 0; |
1312 | } |
1313 | |
1314 | // Effects: If there are no log files, or if there is a clean "shutdown" at |
1315 | // the end of the log, then we don't need recovery to run. |
1316 | // Returns: true if we need recovery, otherwise false. |
1317 | int tokuft_needs_recovery(const char *log_dir, bool ignore_log_empty) { |
1318 | int needs_recovery; |
1319 | int r; |
1320 | TOKULOGCURSOR logcursor = NULL; |
1321 | |
1322 | r = toku_logcursor_create(&logcursor, log_dir); |
1323 | if (r != 0) { |
1324 | needs_recovery = true; goto exit; |
1325 | } |
1326 | |
1327 | struct log_entry *le; |
1328 | le = NULL; |
1329 | r = toku_logcursor_last(logcursor, &le); |
1330 | if (r == 0) { |
1331 | needs_recovery = le->cmd != LT_shutdown; |
1332 | } |
1333 | else { |
1334 | needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty); |
1335 | } |
1336 | exit: |
1337 | if (logcursor) { |
1338 | r = toku_logcursor_destroy(&logcursor); |
1339 | assert(r == 0); |
1340 | } |
1341 | return needs_recovery; |
1342 | } |
1343 | |
1344 | static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) { |
1345 | return toku_txn_manager_num_live_root_txns(renv->logger->txn_manager); |
1346 | } |
1347 | |
1348 | static int is_txn_unprepared(TOKUTXN txn, void* ) { |
1349 | TOKUTXN* ptxn = (TOKUTXN *)extra; |
1350 | if (txn->state != TOKUTXN_PREPARING) { |
1351 | *ptxn = txn; |
1352 | return -1; // return -1 to get iterator to return |
1353 | } |
1354 | return 0; |
1355 | } |
1356 | |
1357 | static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) { |
1358 | TOKUTXN txn = nullptr; |
1359 | int r = toku_txn_manager_iter_over_live_root_txns( |
1360 | renv->logger->txn_manager, |
1361 | is_txn_unprepared, |
1362 | &txn |
1363 | ); |
1364 | assert(r == 0 || r == -1); |
1365 | if (txn != nullptr) { |
1366 | *txnp = txn; |
1367 | return 0; |
1368 | } |
1369 | return DB_NOTFOUND; |
1370 | } |
1371 | |
1372 | static int call_prepare_txn_callback_iter(TOKUTXN txn, void* ) { |
1373 | RECOVER_ENV* renv = (RECOVER_ENV *)extra; |
1374 | invariant(txn->state == TOKUTXN_PREPARING); |
1375 | invariant(txn->child == NULL); |
1376 | (*renv)->prepared_txn_callback((*renv)->env, txn); |
1377 | return 0; |
1378 | } |
1379 | |
1380 | static void recover_abort_live_txn(TOKUTXN txn) { |
1381 | fprintf(stderr, "%s %" PRIu64 "\n" , __FUNCTION__, txn->txnid.parent_id64); |
1382 | // recursively abort all children first |
1383 | if (txn->child != NULL) { |
1384 | recover_abort_live_txn(txn->child); |
1385 | } |
1386 | // sanity check that the recursive call successfully NULLs out txn->child |
1387 | invariant(txn->child == NULL); |
1388 | // abort the transaction |
1389 | toku_txn_progress_extra = { time(NULL), ZERO_LSN, "abort live" , txn->txnid, 0 }; |
1390 | int r = toku_txn_abort_txn(txn, toku_recover_txn_progress, &extra); |
1391 | assert(r == 0); |
1392 | |
1393 | // close the transaction |
1394 | toku_txn_close_txn(txn); |
1395 | } |
1396 | |
1397 | // abort all of the remaining live transactions in descending transaction id order |
1398 | static void recover_abort_all_live_txns(RECOVER_ENV renv) { |
1399 | while (1) { |
1400 | TOKUTXN txn; |
1401 | int r = find_an_unprepared_txn(renv, &txn); |
1402 | if (r==0) { |
1403 | recover_abort_live_txn(txn); |
1404 | } else if (r==DB_NOTFOUND) { |
1405 | break; |
1406 | } else { |
1407 | abort(); |
1408 | } |
1409 | } |
1410 | |
1411 | // Now we have only prepared txns. These prepared txns don't have full DB_TXNs in them, so we need to make some. |
1412 | int r = toku_txn_manager_iter_over_live_root_txns( |
1413 | renv->logger->txn_manager, |
1414 | call_prepare_txn_callback_iter, |
1415 | &renv |
1416 | ); |
1417 | assert_zero(r); |
1418 | } |
1419 | |
1420 | static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) { |
1421 | if (le) { |
1422 | LSN thislsn = toku_log_entry_get_lsn(le); |
1423 | fprintf(stderr, "%s:%d r=%d cmd=%c lsn=%" PRIu64 "\n" , f, l, r, le->cmd, thislsn.lsn); |
1424 | } else |
1425 | fprintf(stderr, "%s:%d r=%d cmd=?\n" , f, l, r); |
1426 | } |
1427 | |
1428 | // For test purposes only. |
1429 | static void (*recover_callback_fx)(void*) = NULL; |
1430 | static void * recover_callback_args = NULL; |
1431 | static void (*recover_callback2_fx)(void*) = NULL; |
1432 | static void * recover_callback2_args = NULL; |
1433 | |
1434 | |
1435 | static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_dir) { |
1436 | int r; |
1437 | int rr = 0; |
1438 | TOKULOGCURSOR logcursor = NULL; |
1439 | struct log_entry *le = NULL; |
1440 | |
1441 | time_t tnow = time(NULL); |
1442 | fprintf(stderr, "%.24s PerconaFT recovery starting in env %s\n" , ctime(&tnow), env_dir); |
1443 | |
1444 | char org_wd[1000]; |
1445 | { |
1446 | char *wd=getcwd(org_wd, sizeof(org_wd)); |
1447 | assert(wd!=0); |
1448 | } |
1449 | |
1450 | r = toku_logger_open(log_dir, renv->logger); |
1451 | assert(r == 0); |
1452 | |
1453 | // grab the last LSN so that it can be restored when the log is restarted |
1454 | LSN lastlsn = toku_logger_last_lsn(renv->logger); |
1455 | LSN thislsn; |
1456 | |
1457 | // there must be at least one log entry |
1458 | r = toku_logcursor_create(&logcursor, log_dir); |
1459 | assert(r == 0); |
1460 | |
1461 | r = toku_logcursor_last(logcursor, &le); |
1462 | if (r != 0) { |
1463 | if (tokuft_recovery_trace) |
1464 | fprintf(stderr, "RUNRECOVERY: %s:%d r=%d\n" , __FUNCTION__, __LINE__, r); |
1465 | rr = DB_RUNRECOVERY; goto errorexit; |
1466 | } |
1467 | |
1468 | r = toku_logcursor_destroy(&logcursor); |
1469 | assert(r == 0); |
1470 | |
1471 | r = toku_logcursor_create(&logcursor, log_dir); |
1472 | assert(r == 0); |
1473 | |
1474 | { |
1475 | toku_struct_stat buf; |
1476 | if (toku_stat(env_dir, &buf, toku_uninstrumented)) { |
1477 | rr = get_error_errno(); |
1478 | fprintf(stderr, |
1479 | "%.24s PerconaFT recovery error: directory does not exist: " |
1480 | "%s\n" , |
1481 | ctime(&tnow), |
1482 | env_dir); |
1483 | goto errorexit; |
1484 | } else if (!S_ISDIR(buf.st_mode)) { |
1485 | fprintf(stderr, "%.24s PerconaFT recovery error: this file is supposed to be a directory, but is not: %s\n" , ctime(&tnow), env_dir); |
1486 | rr = ENOTDIR; goto errorexit; |
1487 | } |
1488 | } |
1489 | // scan backwards |
1490 | scan_state_init(&renv->ss); |
1491 | tnow = time(NULL); |
1492 | time_t tlast; |
1493 | tlast = tnow; |
1494 | fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 "\n" , ctime(&tnow), lastlsn.lsn); |
1495 | for (unsigned i=0; 1; i++) { |
1496 | |
1497 | // get the previous log entry (first time gets the last one) |
1498 | le = NULL; |
1499 | r = toku_logcursor_prev(logcursor, &le); |
1500 | if (tokuft_recovery_trace) |
1501 | recover_trace_le(__FUNCTION__, __LINE__, r, le); |
1502 | if (r != 0) { |
1503 | if (r == DB_NOTFOUND) |
1504 | break; |
1505 | rr = DB_RUNRECOVERY; |
1506 | goto errorexit; |
1507 | } |
1508 | |
1509 | // trace progress |
1510 | if ((i % 1000) == 0) { |
1511 | tnow = time(NULL); |
1512 | if (tnow - tlast >= tokuft_recovery_progress_time) { |
1513 | thislsn = toku_log_entry_get_lsn(le); |
1514 | fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 " at %" PRIu64 " (%s)\n" , |
1515 | ctime(&tnow), lastlsn.lsn, thislsn.lsn, recover_state(renv)); |
1516 | tlast = tnow; |
1517 | } |
1518 | } |
1519 | |
1520 | // dispatch the log entry handler |
1521 | assert(renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END || |
1522 | renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END); |
1523 | logtype_dispatch_assign(le, toku_recover_backward_, r, renv); |
1524 | if (tokuft_recovery_trace) |
1525 | recover_trace_le(__FUNCTION__, __LINE__, r, le); |
1526 | if (r != 0) { |
1527 | if (tokuft_recovery_trace) |
1528 | fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n" , __FUNCTION__, __LINE__, r); |
1529 | rr = DB_RUNRECOVERY; |
1530 | goto errorexit; |
1531 | } |
1532 | if (renv->goforward) |
1533 | break; |
1534 | } |
1535 | |
1536 | // run first callback |
1537 | if (recover_callback_fx) |
1538 | recover_callback_fx(recover_callback_args); |
1539 | |
1540 | // scan forwards |
1541 | assert(le); |
1542 | thislsn = toku_log_entry_get_lsn(le); |
1543 | tnow = time(NULL); |
1544 | fprintf(stderr, "%.24s PerconaFT recovery starts scanning forward to %" PRIu64 " from %" PRIu64 " left %" PRIu64 " (%s)\n" , |
1545 | ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv)); |
1546 | |
1547 | for (unsigned i=0; 1; i++) { |
1548 | |
1549 | // trace progress |
1550 | if ((i % 1000) == 0) { |
1551 | tnow = time(NULL); |
1552 | if (tnow - tlast >= tokuft_recovery_progress_time) { |
1553 | thislsn = toku_log_entry_get_lsn(le); |
1554 | fprintf(stderr, "%.24s PerconaFT recovery scanning forward to %" PRIu64 " at %" PRIu64 " left %" PRIu64 " (%s)\n" , |
1555 | ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv)); |
1556 | tlast = tnow; |
1557 | } |
1558 | } |
1559 | |
1560 | // dispatch the log entry handler (first time calls the forward handler for the log entry at the turnaround |
1561 | assert(renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END || |
1562 | renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END); |
1563 | logtype_dispatch_assign(le, toku_recover_, r, renv); |
1564 | if (tokuft_recovery_trace) |
1565 | recover_trace_le(__FUNCTION__, __LINE__, r, le); |
1566 | if (r != 0) { |
1567 | if (tokuft_recovery_trace) |
1568 | fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n" , __FUNCTION__, __LINE__, r); |
1569 | rr = DB_RUNRECOVERY; |
1570 | goto errorexit; |
1571 | } |
1572 | |
1573 | // get the next log entry |
1574 | le = NULL; |
1575 | r = toku_logcursor_next(logcursor, &le); |
1576 | if (tokuft_recovery_trace) |
1577 | recover_trace_le(__FUNCTION__, __LINE__, r, le); |
1578 | if (r != 0) { |
1579 | if (r == DB_NOTFOUND) |
1580 | break; |
1581 | rr = DB_RUNRECOVERY; |
1582 | goto errorexit; |
1583 | } |
1584 | } |
1585 | |
1586 | // verify the final recovery state |
1587 | assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END); |
1588 | |
1589 | r = toku_logcursor_destroy(&logcursor); |
1590 | assert(r == 0); |
1591 | |
1592 | // run second callback |
1593 | if (recover_callback2_fx) |
1594 | recover_callback2_fx(recover_callback2_args); |
1595 | |
1596 | // restart logging |
1597 | toku_logger_restart(renv->logger, lastlsn); |
1598 | |
1599 | // abort the live transactions |
1600 | { |
1601 | uint32_t n = recover_get_num_live_txns(renv); |
1602 | if (n > 0) { |
1603 | tnow = time(NULL); |
1604 | fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " live transaction%s\n" , ctime(&tnow), n, n > 1 ? "s" : "" ); |
1605 | } |
1606 | } |
1607 | recover_abort_all_live_txns(renv); |
1608 | { |
1609 | uint32_t n = recover_get_num_live_txns(renv); |
1610 | if (n > 0) { |
1611 | tnow = time(NULL); |
1612 | fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " prepared transaction%s\n" , ctime(&tnow), n, n > 1 ? "s" : "" ); |
1613 | } |
1614 | } |
1615 | |
1616 | // close the open dictionaries |
1617 | uint32_t n; |
1618 | n = file_map_get_num_dictionaries(&renv->fmap); |
1619 | if (n > 0) { |
1620 | tnow = time(NULL); |
1621 | fprintf(stderr, "%.24s PerconaFT recovery closing %" PRIu32 " dictionar%s\n" , ctime(&tnow), n, n > 1 ? "ies" : "y" ); |
1622 | } |
1623 | file_map_close_dictionaries(&renv->fmap, lastlsn); |
1624 | |
1625 | { |
1626 | // write a recovery log entry |
1627 | BYTESTRING = { static_cast<uint32_t>(strlen("recover" )), (char *) "recover" }; |
1628 | toku_log_comment(renv->logger, NULL, true, 0, recover_comment); |
1629 | } |
1630 | |
1631 | // checkpoint |
1632 | tnow = time(NULL); |
1633 | fprintf(stderr, "%.24s PerconaFT recovery making a checkpoint\n" , ctime(&tnow)); |
1634 | r = toku_checkpoint(renv->cp, renv->logger, NULL, NULL, NULL, NULL, RECOVERY_CHECKPOINT); |
1635 | assert(r == 0); |
1636 | tnow = time(NULL); |
1637 | fprintf(stderr, "%.24s PerconaFT recovery done\n" , ctime(&tnow)); |
1638 | |
1639 | return 0; |
1640 | |
1641 | errorexit: |
1642 | tnow = time(NULL); |
1643 | fprintf(stderr, "%.24s PerconaFT recovery failed %d\n" , ctime(&tnow), rr); |
1644 | |
1645 | if (logcursor) { |
1646 | r = toku_logcursor_destroy(&logcursor); |
1647 | assert(r == 0); |
1648 | } |
1649 | |
1650 | return rr; |
1651 | } |
1652 | |
1653 | int |
1654 | toku_recover_lock(const char *lock_dir, int *lockfd) { |
1655 | int e = toku_single_process_lock(lock_dir, "recovery" , lockfd); |
1656 | if (e != 0 && e != ENOENT) { |
1657 | fprintf(stderr, "Couldn't run recovery because some other process holds the recovery lock\n" ); |
1658 | } |
1659 | return e; |
1660 | } |
1661 | |
1662 | int |
1663 | toku_recover_unlock(int lockfd) { |
1664 | int lockfd_copy = lockfd; |
1665 | return toku_single_process_unlock(&lockfd_copy); |
1666 | } |
1667 | |
1668 | int tokuft_recover(DB_ENV *env, |
1669 | prepared_txn_callback_t prepared_txn_callback, |
1670 | keep_cachetable_callback_t keep_cachetable_callback, |
1671 | TOKULOGGER logger, |
1672 | const char *env_dir, const char *log_dir, |
1673 | ft_compare_func bt_compare, |
1674 | ft_update_func update_function, |
1675 | generate_row_for_put_func generate_row_for_put, |
1676 | generate_row_for_del_func generate_row_for_del, |
1677 | size_t cachetable_size) { |
1678 | int r; |
1679 | int lockfd = -1; |
1680 | |
1681 | r = toku_recover_lock(log_dir, &lockfd); |
1682 | if (r != 0) |
1683 | return r; |
1684 | |
1685 | int rr = 0; |
1686 | if (tokuft_needs_recovery(log_dir, false)) { |
1687 | struct recover_env renv; |
1688 | r = recover_env_init(&renv, |
1689 | env_dir, |
1690 | env, |
1691 | prepared_txn_callback, |
1692 | keep_cachetable_callback, |
1693 | logger, |
1694 | bt_compare, |
1695 | update_function, |
1696 | generate_row_for_put, |
1697 | generate_row_for_del, |
1698 | cachetable_size); |
1699 | assert(r == 0); |
1700 | |
1701 | rr = do_recovery(&renv, env_dir, log_dir); |
1702 | |
1703 | recover_env_cleanup(&renv); |
1704 | } |
1705 | |
1706 | r = toku_recover_unlock(lockfd); |
1707 | if (r != 0) |
1708 | return r; |
1709 | |
1710 | return rr; |
1711 | } |
1712 | |
1713 | // Return 0 if recovery log exists, ENOENT if log is missing |
1714 | int |
1715 | tokuft_recover_log_exists(const char * log_dir) { |
1716 | int r; |
1717 | TOKULOGCURSOR logcursor; |
1718 | |
1719 | r = toku_logcursor_create(&logcursor, log_dir); |
1720 | if (r == 0) { |
1721 | int rclose; |
1722 | r = toku_logcursor_log_exists(logcursor); // return ENOENT if no log |
1723 | rclose = toku_logcursor_destroy(&logcursor); |
1724 | assert(rclose == 0); |
1725 | } |
1726 | else |
1727 | r = ENOENT; |
1728 | |
1729 | return r; |
1730 | } |
1731 | |
1732 | void toku_recover_set_callback (void (*callback_fx)(void*), void* callback_args) { |
1733 | recover_callback_fx = callback_fx; |
1734 | recover_callback_args = callback_args; |
1735 | } |
1736 | |
1737 | void toku_recover_set_callback2 (void (*callback_fx)(void*), void* callback_args) { |
1738 | recover_callback2_fx = callback_fx; |
1739 | recover_callback2_args = callback_args; |
1740 | } |
1741 | |