1/* Copyright (C) 2006, 2007 MySQL AB
2 Copyright (C) 2010, 2013, Monty Program Ab.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17/*
18 WL#3072 Maria recovery
19 First version written by Guilhem Bichot on 2006-04-27.
20*/
21
22/* Here is the implementation of this module */
23
24#include "maria_def.h"
25#include "ma_recovery.h"
26#include "ma_blockrec.h"
27#include "ma_checkpoint.h"
28#include "trnman.h"
29#include "ma_key_recover.h"
30#include "ma_recovery_util.h"
31#include "hash.h"
32#include <my_check_opt.h>
33
34struct st_trn_for_recovery /* used only in the REDO phase */
35{
36 LSN group_start_lsn, undo_lsn, first_undo_lsn;
37 TrID long_trid;
38};
39struct st_table_for_recovery /* used in the REDO and UNDO phase */
40{
41 MARIA_HA *info;
42};
43/* Variables used by all functions of this module. Ok as single-threaded */
44static struct st_trn_for_recovery *all_active_trans;
45static struct st_table_for_recovery *all_tables;
46static struct st_dirty_page *dirty_pages_pool;
47static LSN current_group_end_lsn;
48#ifndef DBUG_OFF
49/** Current group of REDOs is about this table and only this one */
50static MARIA_HA *current_group_table;
51#endif
52static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
53static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
54/** @brief to avoid writing a checkpoint if recovery did nothing. */
55static my_bool checkpoint_useful;
56static my_bool in_redo_phase;
57static my_bool trns_created;
58static ulong skipped_undo_phase;
59static ulonglong now; /**< for tracking execution time of phases */
60static void (*save_error_handler_hook)(uint, const char *,myf);
61static uint recovery_warnings; /**< count of warnings */
62static uint recovery_found_crashed_tables;
63HASH tables_to_redo; /* For maria_read_log */
64ulong maria_recovery_force_crash_counter;
65
66#define prototype_redo_exec_hook(R) \
67 static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
68
69#define prototype_redo_exec_hook_dummy(R) \
70 static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
71 __attribute__ ((unused)))
72
73#define prototype_undo_exec_hook(R) \
74 static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
75
76prototype_redo_exec_hook(LONG_TRANSACTION_ID);
77prototype_redo_exec_hook_dummy(CHECKPOINT);
78prototype_redo_exec_hook(REDO_CREATE_TABLE);
79prototype_redo_exec_hook(REDO_RENAME_TABLE);
80prototype_redo_exec_hook(REDO_REPAIR_TABLE);
81prototype_redo_exec_hook(REDO_DROP_TABLE);
82prototype_redo_exec_hook(FILE_ID);
83prototype_redo_exec_hook(INCOMPLETE_LOG);
84prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
85prototype_redo_exec_hook(UNDO_BULK_INSERT);
86prototype_redo_exec_hook(IMPORTED_TABLE);
87prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
88prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
89prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
90prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
91prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
92prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
93prototype_redo_exec_hook(REDO_FREE_BLOCKS);
94prototype_redo_exec_hook(REDO_DELETE_ALL);
95prototype_redo_exec_hook(REDO_INDEX);
96prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
97prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
98prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
99prototype_redo_exec_hook(UNDO_ROW_INSERT);
100prototype_redo_exec_hook(UNDO_ROW_DELETE);
101prototype_redo_exec_hook(UNDO_ROW_UPDATE);
102prototype_redo_exec_hook(UNDO_KEY_INSERT);
103prototype_redo_exec_hook(UNDO_KEY_DELETE);
104prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
105prototype_redo_exec_hook(COMMIT);
106prototype_redo_exec_hook(CLR_END);
107prototype_redo_exec_hook(DEBUG_INFO);
108prototype_undo_exec_hook(UNDO_ROW_INSERT);
109prototype_undo_exec_hook(UNDO_ROW_DELETE);
110prototype_undo_exec_hook(UNDO_ROW_UPDATE);
111prototype_undo_exec_hook(UNDO_KEY_INSERT);
112prototype_undo_exec_hook(UNDO_KEY_DELETE);
113prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
114prototype_undo_exec_hook(UNDO_BULK_INSERT);
115
116static int run_redo_phase(LSN lsn, LSN end_lsn,
117 enum maria_apply_log_way apply);
118static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
119static int run_undo_phase(uint uncommitted);
120static void display_record_position(const LOG_DESC *log_desc,
121 const TRANSLOG_HEADER_BUFFER *rec,
122 uint number);
123static int display_and_apply_record(const LOG_DESC *log_desc,
124 const TRANSLOG_HEADER_BUFFER *rec);
125static MARIA_HA *get_MARIA_HA_from_REDO_record(const
126 TRANSLOG_HEADER_BUFFER *rec);
127static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
128 TRANSLOG_HEADER_BUFFER *rec);
129static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
130static LSN parse_checkpoint_record(LSN lsn);
131static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
132 LSN first_undo_lsn);
133static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id);
134static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
135 struct st_dirty_page *dirty_page);
136static int close_all_tables(void);
137static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
138static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
139static void delete_all_transactions();
140
141/** @brief global [out] buffer for translog_read_record(); never shrinks */
142static struct
143{
144 /*
145 uchar* is more adapted (less casts) than char*, thus we don't use
146 LEX_STRING.
147 */
148 uchar *str;
149 size_t length;
150} log_record_buffer;
151static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec)
152{
153 if (log_record_buffer.length < rec->record_length)
154 {
155 log_record_buffer.length= rec->record_length;
156 log_record_buffer.str= my_realloc(log_record_buffer.str,
157 rec->record_length,
158 MYF(MY_WME | MY_ALLOW_ZERO_PTR));
159 }
160}
161/** @brief Tells what kind of progress message was printed to the error log */
162static enum recovery_message_type
163{
164 REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
165} recovery_message_printed;
166
167
168/* Hook to ensure we get nicer output if we get an error */
169
170void maria_recover_error_handler_hook(uint error, const char *str,
171 myf flags)
172{
173 if (procent_printed)
174 {
175 procent_printed= 0;
176 fputc('\n', stderr);
177 fflush(stderr);
178 }
179 (*save_error_handler_hook)(error, str, flags);
180}
181
182/* Define this if you want gdb to break in some interesting situations */
183#define ALERT_USER()
184
185static void print_preamble()
186{
187 ma_message_no_user(ME_JUST_INFO, "starting recovery");
188}
189
190
191static my_bool table_is_part_of_recovery_set(LEX_STRING *file_name)
192{
193 uint offset =0;
194 if (!tables_to_redo.records)
195 return 1; /* Default, recover table */
196
197 /* Skip base directory */
198 if (file_name->str[0] == '.' &&
199 (file_name->str[1] == '/' || file_name->str[1] == '\\'))
200 offset= 2;
201 /* Only recover if table is in hash */
202 return my_hash_search(&tables_to_redo, (uchar*) file_name->str + offset,
203 file_name->length - offset) != 0;
204}
205
206/**
207 @brief Recovers from the last checkpoint.
208
209 Runs the REDO phase using special structures, then sets up the playground
210 of runtime: recreates transactions inside trnman, open tables with their
211 two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
212 tables.
213
214 @return Operation status
215 @retval 0 OK
216 @retval !=0 Error
217*/
218
219int maria_recovery_from_log(void)
220{
221 int res= 1;
222 FILE *trace_file;
223 uint warnings_count;
224#ifdef EXTRA_DEBUG
225 char name_buff[FN_REFLEN];
226#endif
227 DBUG_ENTER("maria_recovery_from_log");
228
229 DBUG_ASSERT(!maria_in_recovery);
230 maria_in_recovery= TRUE;
231
232#ifdef EXTRA_DEBUG
233 fn_format(name_buff, "aria_recovery.trace", maria_data_root, "", MYF(0));
234 trace_file= my_fopen(name_buff, O_WRONLY|O_APPEND|O_CREAT, MYF(MY_WME));
235#else
236 trace_file= NULL; /* no trace file for being fast */
237#endif
238 tprint(trace_file, "TRACE of the last Aria recovery from mysqld\n");
239 DBUG_ASSERT(maria_pagecache->inited);
240 res= maria_apply_log(LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, MARIA_LOG_APPLY,
241 trace_file, TRUE, TRUE, TRUE, &warnings_count);
242 if (!res)
243 {
244 if (warnings_count == 0 && recovery_found_crashed_tables == 0)
245 tprint(trace_file, "SUCCESS\n");
246 else
247 tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
248 warnings_count);
249 }
250 if (trace_file)
251 my_fclose(trace_file, MYF(0));
252 maria_in_recovery= FALSE;
253 DBUG_RETURN(res);
254}
255
256
257/**
258 @brief Displays and/or applies the log
259
260 @param from_lsn LSN from which log reading/applying should start;
261 LSN_IMPOSSIBLE means "use last checkpoint"
262 @param end_lsn Apply until this. LSN_IMPOSSIBLE means until end.
263 @param apply how log records should be applied or not
264 @param trace_file trace file where progress/debug messages will go
265 @param skip_DDLs_arg Should DDL records (CREATE/RENAME/DROP/REPAIR)
266 be skipped by the REDO phase or not
267 @param take_checkpoints Should we take checkpoints or not.
268 @param[out] warnings_count Count of warnings will be put there
269
270 @todo This trace_file thing is primitive; soon we will make it similar to
271 ma_check_print_warning() etc, and a successful recovery does not need to
272 create a trace file. But for debugging now it is useful.
273
274 @return Operation status
275 @retval 0 OK
276 @retval !=0 Error
277*/
278
279int maria_apply_log(LSN from_lsn, LSN end_lsn,
280 enum maria_apply_log_way apply,
281 FILE *trace_file,
282 my_bool should_run_undo_phase, my_bool skip_DDLs_arg,
283 my_bool take_checkpoints, uint *warnings_count)
284{
285 int error= 0;
286 uint uncommitted_trans;
287 ulonglong old_now;
288 my_bool abort_message_printed= 0;
289 DBUG_ENTER("maria_apply_log");
290
291 DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase);
292 DBUG_ASSERT(!maria_multi_threaded);
293 recovery_warnings= recovery_found_crashed_tables= 0;
294 maria_recovery_changed_data= 0;
295 /* checkpoints can happen only if TRNs have been built */
296 DBUG_ASSERT(should_run_undo_phase || !take_checkpoints);
297 DBUG_ASSERT(end_lsn == LSN_IMPOSSIBLE || should_run_undo_phase == 0);
298 all_active_trans= (struct st_trn_for_recovery *)
299 my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
300 MYF(MY_ZEROFILL));
301 all_tables= (struct st_table_for_recovery *)
302 my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
303 MYF(MY_ZEROFILL));
304
305 save_error_handler_hook= error_handler_hook;
306 error_handler_hook= maria_recover_error_handler_hook;
307
308 if (!all_active_trans || !all_tables)
309 goto err;
310
311 if (take_checkpoints && ma_checkpoint_init(0))
312 goto err;
313
314 recovery_message_printed= REC_MSG_NONE;
315 checkpoint_useful= trns_created= FALSE;
316 tracef= trace_file;
317#ifdef INSTANT_FLUSH_OF_MESSAGES
318 /* enable this for instant flush of messages to trace file */
319 setbuf(tracef, NULL);
320#endif
321 skip_DDLs= skip_DDLs_arg;
322 skipped_undo_phase= 0;
323
324 trnman_init(max_trid_in_control_file);
325
326 if (from_lsn == LSN_IMPOSSIBLE)
327 {
328 if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
329 {
330 from_lsn= translog_first_lsn_in_log();
331 if (unlikely(from_lsn == LSN_ERROR))
332 {
333 trnman_destroy();
334 goto err;
335 }
336 }
337 else
338 {
339 from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
340 if (from_lsn == LSN_ERROR)
341 {
342 trnman_destroy();
343 goto err;
344 }
345 }
346 }
347
348 now= microsecond_interval_timer();
349 in_redo_phase= TRUE;
350 if (run_redo_phase(from_lsn, end_lsn, apply))
351 {
352 ma_message_no_user(0, "Redo phase failed");
353 trnman_destroy();
354 goto err;
355 }
356 trnman_destroy();
357
358 if (end_lsn != LSN_IMPOSSIBLE)
359 {
360 abort_message_printed= 1;
361 if (!trace_file)
362 fputc('\n', stderr);
363 my_message(HA_ERR_INITIALIZATION,
364 "Maria recovery aborted as end_lsn/end of file was reached",
365 MYF(0));
366 goto err2;
367 }
368
369 if ((uncommitted_trans=
370 end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
371 {
372 ma_message_no_user(0, "End of redo phase failed");
373 goto err;
374 }
375 in_redo_phase= FALSE;
376
377 old_now= now;
378 now= microsecond_interval_timer();
379 if (recovery_message_printed == REC_MSG_REDO)
380 {
381 double phase_took= (now - old_now)/1000000.0;
382 /*
383 Detailed progress info goes to stderr, because ma_message_no_user()
384 cannot put several messages on one line.
385 */
386 procent_printed= 1;
387 fprintf(stderr, " (%.1f seconds); ", phase_took);
388 fflush(stderr);
389 }
390
391 /**
392 REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
393 wrong: if a future recovery used it, the REDO phase would always
394 start from the checkpoint and never from before, wrongly skipping REDOs
395 (tested). Another problem is that the REDO phase uses
396 PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.
397
398 @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
399 could make a function which goes through pages at end of REDO phase and
400 changes their type.
401 */
402#ifdef FIX_AND_ENABLE_LATER
403 if (take_checkpoints && checkpoint_useful)
404 {
405 /*
406 We take a checkpoint as it can save future recovery work if we crash
407 during the UNDO phase. But we don't flush pages, as UNDOs will change
408 them again probably.
409 If we wanted to take checkpoints in the middle of the REDO phase, at a
410 moment when we haven't reached the end of log so don't have exact data
411 about transactions, we could write a special checkpoint: containing only
412 the list of dirty pages, otherwise to be treated as if it was at the
413 same LSN as the last checkpoint.
414 */
415 if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE))
416 goto err;
417 }
418#endif
419
420 if (should_run_undo_phase)
421 {
422 if (run_undo_phase(uncommitted_trans))
423 {
424 ma_message_no_user(0, "Undo phase failed");
425 goto err;
426 }
427 }
428 else if (uncommitted_trans > 0)
429 {
430 eprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
431 " be left inconsistent!***", uncommitted_trans);
432 recovery_warnings++;
433 }
434
435 if (skipped_undo_phase)
436 {
437 /*
438 We could want to print a list of tables for which UNDOs were skipped,
439 but not one line per skipped UNDO.
440 */
441 eprint(tracef, "***WARNING: %lu UNDO records skipped in UNDO phase; some"
442 " tables may be left inconsistent!***", skipped_undo_phase);
443 recovery_warnings++;
444 }
445
446 old_now= now;
447 now= microsecond_interval_timer();
448 if (recovery_message_printed == REC_MSG_UNDO)
449 {
450 double phase_took= (now - old_now)/1000000.0;
451 procent_printed= 1;
452 fprintf(stderr, " (%.1f seconds); ", phase_took);
453 fflush(stderr);
454 }
455
456 /*
457 we don't use maria_panic() because it would maria_end(), and Recovery does
458 not want that (we want to keep some modules initialized for runtime).
459 */
460 if (close_all_tables())
461 {
462 ma_message_no_user(0, "closing of tables failed");
463 goto err;
464 }
465
466 old_now= now;
467 now= microsecond_interval_timer();
468 if (recovery_message_printed == REC_MSG_FLUSH)
469 {
470 double phase_took= (now - old_now)/1000000.0;
471 procent_printed= 1;
472 fprintf(stderr, " (%.1f seconds); ", phase_took);
473 fflush(stderr);
474 }
475
476 if (take_checkpoints && checkpoint_useful)
477 {
478 /* No dirty pages, all tables are closed, no active transactions, save: */
479 if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE))
480 goto err;
481 }
482
483 goto end;
484err:
485 tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
486err2:
487 if (trns_created)
488 delete_all_transactions();
489 error= 1;
490 if (close_all_tables())
491 {
492 ma_message_no_user(0, "closing of tables failed");
493 }
494end:
495 error_handler_hook= save_error_handler_hook;
496 my_hash_free(&all_dirty_pages);
497 bzero(&all_dirty_pages, sizeof(all_dirty_pages));
498 my_free(dirty_pages_pool);
499 dirty_pages_pool= NULL;
500 my_free(all_tables);
501 all_tables= NULL;
502 my_free(all_active_trans);
503 all_active_trans= NULL;
504 my_free(log_record_buffer.str);
505 log_record_buffer.str= NULL;
506 log_record_buffer.length= 0;
507 ma_checkpoint_end();
508 *warnings_count= recovery_warnings + recovery_found_crashed_tables;
509 if (recovery_message_printed != REC_MSG_NONE)
510 {
511 if (procent_printed)
512 {
513 procent_printed= 0;
514 fprintf(stderr, "\n");
515 fflush(stderr);
516 }
517 if (!error)
518 {
519 ma_message_no_user(ME_JUST_INFO, "recovery done");
520 maria_recovery_changed_data= 1;
521 }
522 }
523 else if (!error && max_trid_in_control_file != max_long_trid)
524 {
525 /*
526 maria_end() will set max trid in log file so that one can run
527 maria_chk on the tables
528 */
529 maria_recovery_changed_data= 1;
530 }
531
532 if (error && !abort_message_printed)
533 {
534 if (!trace_file)
535 fputc('\n', stderr);
536 my_message(HA_ERR_INITIALIZATION,
537 "Aria recovery failed. Please run aria_chk -r on all Aria "
538 "tables and delete all aria_log.######## files", MYF(0));
539 }
540 procent_printed= 0;
541 /*
542 We don't cleanly close tables if we hit some error (may corrupt them by
543 flushing some wrong blocks made from wrong REDOs). It also leaves their
544 open_count>0, which ensures that --aria-recover, if used, will try to
545 repair them.
546 */
547 DBUG_RETURN(error);
548}
549
550
551/* very basic info about the record's header */
552static void display_record_position(const LOG_DESC *log_desc,
553 const TRANSLOG_HEADER_BUFFER *rec,
554 uint number)
555{
556 /*
557 if number==0, we're going over records which we had already seen and which
558 form a group, so we indent below the group's end record
559 */
560 tprint(tracef,
561 "%sRec#%u LSN " LSN_FMT " short_trid %u %s(num_type:%u) len %lu\n",
562 number ? "" : " ", number, LSN_IN_PARTS(rec->lsn),
563 rec->short_trid, log_desc->name, rec->type,
564 (ulong)rec->record_length);
565 if (rec->type == LOGREC_DEBUG_INFO)
566 {
567 /* Print some extra information */
568 (*log_desc->record_execute_in_redo_phase)(rec);
569 }
570}
571
572
573static int display_and_apply_record(const LOG_DESC *log_desc,
574 const TRANSLOG_HEADER_BUFFER *rec)
575{
576 int error;
577 if (log_desc->record_execute_in_redo_phase == NULL)
578 {
579 /* die on all not-yet-handled records :) */
580 DBUG_ASSERT("one more hook to write" == 0);
581 return 1;
582 }
583 if (rec->type == LOGREC_DEBUG_INFO)
584 {
585 /* Query already printed by display_record_position() */
586 return 0;
587 }
588 if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
589 eprint(tracef, "Got error %d when executing record %s",
590 my_errno, log_desc->name);
591 return error;
592}
593
594
595prototype_redo_exec_hook(LONG_TRANSACTION_ID)
596{
597 uint16 sid= rec->short_trid;
598 TrID long_trid= all_active_trans[sid].long_trid;
599 /*
600 Any incomplete group should be of an old crash which already had a
601 recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
602 */
603 DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
604 if (long_trid != 0)
605 {
606 LSN ulsn= all_active_trans[sid].undo_lsn;
607 /*
608 If the first record of that transaction is after 'rec', it's probably
609 because that transaction was found in the checkpoint record, and then
610 it's ok, we can forget about that transaction (we'll meet it later
611 again in the REDO phase) and replace it with the one in 'rec'.
612 */
613 if ((ulsn != LSN_IMPOSSIBLE) &&
614 (cmp_translog_addr(ulsn, rec->lsn) < 0))
615 {
616 char llbuf[22];
617 llstr(long_trid, llbuf);
618 eprint(tracef, "Found an old transaction long_trid %s short_trid %u"
619 " with same short id as this new transaction, and has neither"
620 " committed nor rollback (undo_lsn: " LSN_FMT ")",
621 llbuf, sid, LSN_IN_PARTS(ulsn));
622 goto err;
623 }
624 }
625 long_trid= uint6korr(rec->header);
626 new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
627 goto end;
628err:
629 ALERT_USER();
630 return 1;
631end:
632 return 0;
633}
634
635
636static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
637 LSN first_undo_lsn)
638{
639 char llbuf[22];
640 all_active_trans[sid].long_trid= long_id;
641 llstr(long_id, llbuf);
642 tprint(tracef, "Transaction long_trid %s short_trid %u starts,"
643 " undo_lsn " LSN_FMT " first_undo_lsn " LSN_FMT "\n",
644 llbuf, sid, LSN_IN_PARTS(undo_lsn), LSN_IN_PARTS(first_undo_lsn));
645 all_active_trans[sid].undo_lsn= undo_lsn;
646 all_active_trans[sid].first_undo_lsn= first_undo_lsn;
647 set_if_bigger(max_long_trid, long_id);
648}
649
650
651prototype_redo_exec_hook_dummy(CHECKPOINT)
652{
653 /* the only checkpoint we care about was found via control file, ignore */
654 tprint(tracef, "CHECKPOINT found\n");
655 return 0;
656}
657
658
659prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
660{
661 /* abortion was already made */
662 return 0;
663}
664
665
666prototype_redo_exec_hook(INCOMPLETE_LOG)
667{
668 MARIA_HA *info;
669
670 if (skip_DDLs)
671 {
672 tprint(tracef, "we skip DDLs\n");
673 return 0;
674 }
675
676 if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
677 {
678 /* no such table, don't need to warn */
679 return 0;
680 }
681
682 if (maria_is_crashed(info))
683 return 0;
684
685 if (info->s->state.is_of_horizon > rec->lsn)
686 {
687 /*
688 This table was repaired at a time after this log entry.
689 We can assume that all rows was inserted sucessfully and we don't
690 have to warn about that the inserted data was not logged
691 */
692 return 0;
693 }
694
695 /*
696 Example of what can go wrong when replaying DDLs:
697 CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged);
698 ALTER TABLE t ... which does
699 CREATE a temporary table #sql... (logged)
700 INSERT data from t into #sql... (not logged)
701 RENAME #sql TO t (logged)
702 Removing tables by hand and replaying the log will leave in the
703 end an empty table "t": missing records. If after the RENAME an INSERT
704 into t was done, that row had number 1 in its page, executing the
705 REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion
706 failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is
707 created whereas rownr is not 0).
708 So when the server disables logging for ALTER TABLE or CREATE SELECT, it
709 logs LOGREC_INCOMPLETE_LOG to warn aria_read_log and then the user.
710
711 Another issue is that replaying of DDLs is not correct enough to work if
712 there was a crash during a DDL (see comment in execution of
713 REDO_RENAME_TABLE ).
714 */
715
716 eprint(tracef, "***WARNING: Aria engine currently logs no records "
717 "about insertion of data by ALTER TABLE and CREATE SELECT, "
718 "as they are not necessary for recovery; "
719 "present applying of log records to table '%s' may well not work."
720 "***", info->s->index_file_name.str);
721
722 /* Prevent using the table for anything else than undo repair */
723 _ma_mark_file_crashed(info->s);
724 recovery_warnings++;
725 return 0;
726}
727
728
729static my_bool create_database_if_not_exists(const char *name)
730{
731 char dirname[FN_REFLEN];
732 size_t length;
733 MY_STAT stat_info;
734 DBUG_ENTER("create_database_if_not_exists");
735
736 dirname_part(dirname, name, &length);
737 if (!length)
738 {
739 /* Skip files without directores */
740 DBUG_RETURN(0);
741 }
742 /*
743 Safety; Don't create files with hard path;
744 Should never happen with MariaDB
745 If hard path, then error will be detected when trying to create index file
746 */
747 if (test_if_hard_path(dirname))
748 DBUG_RETURN(0);
749
750 if (my_stat(dirname,&stat_info,MYF(0)))
751 DBUG_RETURN(0);
752
753
754 tprint(tracef, "Creating not existing database '%s'\n", dirname);
755 if (my_mkdir(dirname, 0777, MYF(MY_WME)))
756 {
757 eprint(tracef, "***WARNING: Can't create not existing database '%s'",
758 dirname);
759 DBUG_RETURN(1);
760 }
761 DBUG_RETURN(0);
762}
763
764
765
766
767
768prototype_redo_exec_hook(REDO_CREATE_TABLE)
769{
770 File dfile= -1, kfile= -1;
771 char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *ptr2,
772 *data_file_name, *index_file_name;
773 uchar *kfile_header;
774 myf create_flag;
775 uint flags;
776 int error= 1, create_mode= O_RDWR | O_TRUNC, i;
777 MARIA_HA *info= NULL;
778 uint kfile_size_before_extension, keystart;
779 DBUG_ENTER("exec_REDO_LOGREC_REDO_CREATE_TABLE");
780
781 if (skip_DDLs)
782 {
783 tprint(tracef, "we skip DDLs\n");
784 DBUG_RETURN(0);
785 }
786 enlarge_buffer(rec);
787 if (log_record_buffer.str == NULL ||
788 translog_read_record(rec->lsn, 0, rec->record_length,
789 log_record_buffer.str, NULL) !=
790 rec->record_length)
791 {
792 eprint(tracef, "Failed to read record");
793 goto end;
794 }
795 name= (char *)log_record_buffer.str;
796 /*
797 TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can
798 find a REDO_CREATE_TABLE for a table which we have open, that's why we
799 need to look for any open instances and close them first.
800 */
801 if (close_one_table(name, rec->lsn))
802 {
803 eprint(tracef, "Table '%s' got error %d on close", name, my_errno);
804 ALERT_USER();
805 goto end;
806 }
807 /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
808 info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
809 if (info)
810 {
811 MARIA_SHARE *share= info->s;
812 /* check that we're not already using it */
813 if (share->reopen != 1)
814 {
815 eprint(tracef, "Table '%s is already open (reopen=%u)",
816 name, share->reopen);
817 ALERT_USER();
818 goto end;
819 }
820 DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
821 if (!share->base.born_transactional)
822 {
823 /*
824 could be that transactional table was later dropped, and a non-trans
825 one was renamed to its name, thus create_rename_lsn is 0 and should
826 not be trusted.
827 */
828 tprint(tracef, "Table '%s' is not transactional, ignoring creation\n",
829 name);
830 ALERT_USER();
831 error= 0;
832 goto end;
833 }
834 if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
835 {
836 tprint(tracef, "Table '%s' has create_rename_lsn " LSN_FMT " more "
837 "recent than record, ignoring creation",
838 name, LSN_IN_PARTS(share->state.create_rename_lsn));
839 error= 0;
840 goto end;
841 }
842 if (maria_is_crashed(info))
843 {
844 eprint(tracef, "Table '%s' is crashed, can't recreate it", name);
845 ALERT_USER();
846 goto end;
847 }
848 maria_close(info);
849 info= NULL;
850 }
851 else
852 {
853 /* one or two files absent, or header corrupted... */
854 tprint(tracef, "Table '%s' can't be opened (Error: %d)\n",
855 name, my_errno);
856 }
857 /* if does not exist, or is older, overwrite it */
858 ptr= name + strlen(name) + 1;
859 if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
860 tprint(tracef, ", we will only touch index file");
861 ptr++;
862 kfile_size_before_extension= uint2korr(ptr);
863 ptr+= 2;
864 keystart= uint2korr(ptr);
865 ptr+= 2;
866 kfile_header= (uchar *)ptr;
867 ptr+= kfile_size_before_extension;
868 /* set header lsns */
869 ptr2= (char *) kfile_header + sizeof(info->s->state.header) +
870 MARIA_FILE_CREATE_RENAME_LSN_OFFSET;
871 for (i= 0; i<3; i++)
872 {
873 lsn_store(ptr2, rec->lsn);
874 ptr2+= LSN_STORE_SIZE;
875 }
876 data_file_name= ptr;
877 ptr+= strlen(data_file_name) + 1;
878 index_file_name= ptr;
879 ptr+= strlen(index_file_name) + 1;
880 /** @todo handle symlinks */
881 if (data_file_name[0] || index_file_name[0])
882 {
883 eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled",
884 name);
885 goto end;
886 }
887 if (create_database_if_not_exists(name))
888 goto end;
889 fn_format(filename, name, "", MARIA_NAME_IEXT,
890 MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH | MY_APPEND_EXT);
891 linkname_ptr= NULL;
892 create_flag= MY_DELETE_OLD;
893 tprint(tracef, "Table '%s' creating as '%s'\n", name, filename);
894 if ((kfile= mysql_file_create_with_symlink(key_file_kfile, linkname_ptr,
895 filename, 0, create_mode,
896 MYF(MY_WME|create_flag))) < 0)
897 {
898 eprint(tracef, "Failed to create index file");
899 goto end;
900 }
901 if (my_pwrite(kfile, kfile_header,
902 kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
903 mysql_file_chsize(kfile, keystart, 0, MYF(MY_WME)))
904 {
905 eprint(tracef, "Failed to write to index file");
906 goto end;
907 }
908 if (!(flags & HA_DONT_TOUCH_DATA))
909 {
910 fn_format(filename,name,"", MARIA_NAME_DEXT,
911 MY_UNPACK_FILENAME | MY_APPEND_EXT);
912 linkname_ptr= NULL;
913 create_flag=MY_DELETE_OLD;
914 if (((dfile=
915 mysql_file_create_with_symlink(key_file_dfile, linkname_ptr,
916 filename, 0, create_mode,
917 MYF(MY_WME | create_flag))) < 0) ||
918 mysql_file_close(dfile, MYF(MY_WME)))
919 {
920 eprint(tracef, "Failed to create data file");
921 goto end;
922 }
923 /*
924 we now have an empty data file. To be able to
925 _ma_initialize_data_file() we need some pieces of the share to be
926 correctly filled. So we just open the table (fortunately, an empty
927 data file does not preclude this).
928 */
929 if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
930 _ma_initialize_data_file(info->s, info->dfile.file))
931 {
932 eprint(tracef, "Failed to open new table or write to data file");
933 goto end;
934 }
935 }
936 error= 0;
937end:
938 if (kfile >= 0)
939 error|= mysql_file_close(kfile, MYF(MY_WME));
940 if (info != NULL)
941 error|= maria_close(info);
942 DBUG_RETURN(error);
943}
944
945
946prototype_redo_exec_hook(REDO_RENAME_TABLE)
947{
948 char *old_name, *new_name;
949 int error= 1;
950 MARIA_HA *info= NULL;
951 DBUG_ENTER("exec_REDO_LOGREC_REDO_RENAME_TABLE");
952
953 if (skip_DDLs)
954 {
955 tprint(tracef, "we skip DDLs\n");
956 DBUG_RETURN(0);
957 }
958 enlarge_buffer(rec);
959 if (log_record_buffer.str == NULL ||
960 translog_read_record(rec->lsn, 0, rec->record_length,
961 log_record_buffer.str, NULL) !=
962 rec->record_length)
963 {
964 eprint(tracef, "Failed to read record");
965 goto end;
966 }
967 old_name= (char *)log_record_buffer.str;
968 new_name= old_name + strlen(old_name) + 1;
969 tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name,
970 new_name);
971 /*
972 Here is why we skip CREATE/DROP/RENAME when doing a recovery from
973 ha_maria (whereas we do when called from aria_read_log). Consider:
974 CREATE TABLE t;
975 RENAME TABLE t to u;
976 DROP TABLE u;
977 RENAME TABLE v to u; # crash between index rename and data rename.
978 And do a Recovery (not removing tables beforehand).
979 Recovery replays CREATE, then RENAME: the maria_open("t") works,
980 maria_open("u") does not (no data file) so table "u" is considered
981 inexistent and so maria_rename() is done which overwrites u's index file,
982 which is lost. Ok, the data file (v.MAD) is still available, but only a
983 REPAIR USE_FRM can rebuild the index, which is unsafe and downtime.
984 So it is preferrable to not execute RENAME, and leave the "mess" of files,
985 rather than possibly destroy a file. DBA will manually rename files.
986 A safe recovery method would probably require checking the existence of
987 the index file and of the data file separately (not via maria_open()), and
988 maybe also to store a create_rename_lsn in the data file too
989 For now, all we risk is to leave the mess (half-renamed files) left by the
990 crash. We however sync files and directories at each file rename. The SQL
991 layer is anyway not crash-safe for DDLs (except the repartioning-related
992 ones).
993 We replay DDLs in aria_read_log to be able to recreate tables from
994 scratch. It means that "aria_read_log -a" should not be used on a
995 database which just crashed during a DDL. And also ALTER TABLE does not
996 log insertions of records into the temporary table, so replaying may
997 fail (grep for INCOMPLETE_LOG in files).
998 */
999 info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
1000 if (info)
1001 {
1002 MARIA_SHARE *share= info->s;
1003 if (!share->base.born_transactional)
1004 {
1005 tprint(tracef, ", is not transactional, ignoring renaming\n");
1006 ALERT_USER();
1007 error= 0;
1008 goto end;
1009 }
1010 if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1011 {
1012 tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1013 " record, ignoring renaming",
1014 LSN_IN_PARTS(share->state.create_rename_lsn));
1015 error= 0;
1016 goto end;
1017 }
1018 if (maria_is_crashed(info))
1019 {
1020 tprint(tracef, ", is crashed, can't rename it");
1021 ALERT_USER();
1022 goto end;
1023 }
1024 if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
1025 maria_close(info))
1026 goto end;
1027 info= NULL;
1028 tprint(tracef, ", is ok for renaming; new-name table ");
1029 }
1030 else /* one or two files absent, or header corrupted... */
1031 {
1032 tprint(tracef, ", can't be opened, probably does not exist");
1033 error= 0;
1034 goto end;
1035 }
1036 /*
1037 We must also check the create_rename_lsn of the 'new_name' table if it
1038 exists: otherwise we may, with our rename which overwrites, destroy
1039 another table. For example:
1040 CREATE TABLE t;
1041 RENAME t to u;
1042 DROP TABLE u;
1043 RENAME v to u; # v is an old table, its creation/insertions not in log
1044 And start executing the log (without removing tables beforehand): creates
1045 t, renames it to u (if not testing create_rename_lsn) thus overwriting
1046 old-named v, drops u, and we are stuck, we have lost data.
1047 */
1048 info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
1049 if (info)
1050 {
1051 MARIA_SHARE *share= info->s;
1052 /* We should not have open instances on this table. */
1053 if (share->reopen != 1)
1054 {
1055 tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
1056 ALERT_USER();
1057 goto end;
1058 }
1059 if (!share->base.born_transactional)
1060 {
1061 tprint(tracef, ", is not transactional, ignoring renaming\n");
1062 ALERT_USER();
1063 goto drop;
1064 }
1065 if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1066 {
1067 tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1068 " record, ignoring renaming",
1069 LSN_IN_PARTS(share->state.create_rename_lsn));
1070 /*
1071 We have to drop the old_name table. Consider:
1072 CREATE TABLE t;
1073 CREATE TABLE v;
1074 RENAME TABLE t to u;
1075 DROP TABLE u;
1076 RENAME TABLE v to u;
1077 and apply the log without removing tables beforehand. t will be
1078 created, v too; in REDO_RENAME u will be more recent, but we still
1079 have to drop t otherwise it stays.
1080 */
1081 goto drop;
1082 }
1083 if (maria_is_crashed(info))
1084 {
1085 tprint(tracef, ", is crashed, can't rename it");
1086 ALERT_USER();
1087 goto end;
1088 }
1089 if (maria_close(info))
1090 goto end;
1091 info= NULL;
1092 /* abnormal situation */
1093 tprint(tracef, ", exists but is older than record, can't rename it");
1094 goto end;
1095 }
1096 else /* one or two files absent, or header corrupted... */
1097 tprint(tracef, ", can't be opened, probably does not exist");
1098 tprint(tracef, ", renaming '%s'", old_name);
1099 if (maria_rename(old_name, new_name))
1100 {
1101 eprint(tracef, "Failed to rename table");
1102 goto end;
1103 }
1104 info= maria_open(new_name, O_RDONLY, 0);
1105 if (info == NULL)
1106 {
1107 eprint(tracef, "Failed to open renamed table");
1108 goto end;
1109 }
1110 if (_ma_update_state_lsns(info->s, rec->lsn, info->s->state.create_trid,
1111 TRUE, TRUE))
1112 goto end;
1113 if (maria_close(info))
1114 goto end;
1115 info= NULL;
1116 error= 0;
1117 goto end;
1118drop:
1119 tprint(tracef, ", only dropping '%s'", old_name);
1120 if (maria_delete_table(old_name))
1121 {
1122 eprint(tracef, "Failed to drop table");
1123 goto end;
1124 }
1125 error= 0;
1126 goto end;
1127end:
1128 tprint(tracef, "\n");
1129 if (info != NULL)
1130 error|= maria_close(info);
1131 DBUG_RETURN(error);
1132}
1133
1134
1135/*
1136 The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE.
1137*/
1138prototype_redo_exec_hook(REDO_REPAIR_TABLE)
1139{
1140 int error= 1;
1141 MARIA_HA *info;
1142 HA_CHECK param;
1143 char *name;
1144 my_bool quick_repair;
1145 DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE");
1146
1147 if (skip_DDLs)
1148 {
1149 /*
1150 REPAIR is not exactly a DDL, but it manipulates files without logging
1151 insertions into them.
1152 */
1153 tprint(tracef, "we skip DDLs\n");
1154 DBUG_RETURN(0);
1155 }
1156 if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
1157 DBUG_RETURN(0);
1158 if (maria_is_crashed(info))
1159 {
1160 tprint(tracef, "we skip repairing crashed table\n");
1161 DBUG_RETURN(0);
1162 }
1163 /*
1164 Otherwise, the mapping is newer than the table, and our record is newer
1165 than the mapping, so we can repair.
1166 */
1167 tprint(tracef, " repairing...\n");
1168
1169 maria_chk_init(&param);
1170 param.isam_file_name= name= info->s->open_file_name.str;
1171 param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
1172 param.tmpdir= maria_tmpdir;
1173 param.max_trid= max_long_trid;
1174 DBUG_ASSERT(maria_tmpdir);
1175
1176 info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);
1177 quick_repair= MY_TEST(param.testflag & T_QUICK);
1178
1179 if (param.testflag & T_REP_PARALLEL)
1180 {
1181 if (maria_repair_parallel(&param, info, name, quick_repair))
1182 goto end;
1183 }
1184 else if (param.testflag & T_REP_BY_SORT)
1185 {
1186 if (maria_repair_by_sort(&param, info, name, quick_repair))
1187 goto end;
1188 }
1189 else if (maria_repair(&param, info, name, quick_repair))
1190 goto end;
1191
1192 if (_ma_update_state_lsns(info->s, rec->lsn, trnman_get_min_safe_trid(),
1193 TRUE, !(param.testflag & T_NO_CREATE_RENAME_LSN)))
1194 goto end;
1195 error= 0;
1196
1197end:
1198 DBUG_RETURN(error);
1199}
1200
1201
1202prototype_redo_exec_hook(REDO_DROP_TABLE)
1203{
1204 char *name;
1205 int error= 1;
1206 MARIA_HA *info;
1207 if (skip_DDLs)
1208 {
1209 tprint(tracef, "we skip DDLs\n");
1210 return 0;
1211 }
1212 enlarge_buffer(rec);
1213 if (log_record_buffer.str == NULL ||
1214 translog_read_record(rec->lsn, 0, rec->record_length,
1215 log_record_buffer.str, NULL) !=
1216 rec->record_length)
1217 {
1218 eprint(tracef, "Failed to read record");
1219 return 1;
1220 }
1221 name= (char *)log_record_buffer.str;
1222 tprint(tracef, "Table '%s'", name);
1223 info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
1224 if (info)
1225 {
1226 MARIA_SHARE *share= info->s;
1227 if (!share->base.born_transactional)
1228 {
1229 tprint(tracef, ", is not transactional, ignoring removal\n");
1230 ALERT_USER();
1231 error= 0;
1232 goto end;
1233 }
1234 if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
1235 {
1236 tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1237 " record, ignoring removal",
1238 LSN_IN_PARTS(share->state.create_rename_lsn));
1239 error= 0;
1240 goto end;
1241 }
1242 if (maria_is_crashed(info))
1243 {
1244 tprint(tracef, ", is crashed, can't drop it");
1245 ALERT_USER();
1246 goto end;
1247 }
1248 if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
1249 maria_close(info))
1250 goto end;
1251 info= NULL;
1252 /* if it is older, or its header is corrupted, drop it */
1253 tprint(tracef, ", dropping '%s'", name);
1254 if (maria_delete_table(name))
1255 {
1256 eprint(tracef, "Failed to drop table");
1257 goto end;
1258 }
1259 }
1260 else /* one or two files absent, or header corrupted... */
1261 tprint(tracef,", can't be opened, probably does not exist");
1262 error= 0;
1263end:
1264 tprint(tracef, "\n");
1265 if (info != NULL)
1266 error|= maria_close(info);
1267 return error;
1268}
1269
1270
1271prototype_redo_exec_hook(FILE_ID)
1272{
1273 uint16 sid;
1274 int error= 1;
1275 const char *name;
1276 MARIA_HA *info;
1277 DBUG_ENTER("exec_REDO_LOGREC_FILE_ID");
1278
1279 if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
1280 {
1281 /*
1282 If that mapping was still true at checkpoint time, it was found in
1283 checkpoint record, no need to recreate it. If that mapping had ended at
1284 checkpoint time (table was closed or repaired), a flush and force
1285 happened and so mapping is not needed.
1286 */
1287 tprint(tracef, "ignoring because before checkpoint\n");
1288 DBUG_RETURN(0);
1289 }
1290
1291 enlarge_buffer(rec);
1292 if (log_record_buffer.str == NULL ||
1293 translog_read_record(rec->lsn, 0, rec->record_length,
1294 log_record_buffer.str, NULL) !=
1295 rec->record_length)
1296 {
1297 eprint(tracef, "Failed to read record");
1298 goto end;
1299 }
1300 sid= fileid_korr(log_record_buffer.str);
1301 info= all_tables[sid].info;
1302 if (info != NULL)
1303 {
1304 tprint(tracef, " Closing table '%s'\n", info->s->open_file_name.str);
1305 prepare_table_for_close(info, rec->lsn);
1306
1307 /*
1308 Ensure that open count is 1 on close. This is needed as the
1309 table may initially had an open_count > 0 when we initially
1310 opened it as the server may have crashed without closing it
1311 properly. As we now have applied all redo's for the table up to
1312 now, we know the table is ok, so it's safe to reset the open
1313 count to 0.
1314 */
1315 if (info->s->state.open_count != 0 && info->s->reopen == 1)
1316 {
1317 /* let ma_close() mark the table properly closed */
1318 info->s->state.open_count= 1;
1319 info->s->global_changed= 1;
1320 info->s->changed= 1;
1321 }
1322 if (maria_close(info))
1323 {
1324 eprint(tracef, "Failed to close table");
1325 goto end;
1326 }
1327 all_tables[sid].info= NULL;
1328 }
1329 name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
1330 if (new_table(sid, name, rec->lsn))
1331 goto end;
1332 error= 0;
1333end:
1334 DBUG_RETURN(error);
1335}
1336
1337
1338static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
1339{
1340 /*
1341 -1 (skip table): close table and return 0;
1342 1 (error): close table and return 1;
1343 0 (success): leave table open and return 0.
1344 */
1345 int error= 1;
1346 MARIA_HA *info;
1347 MARIA_SHARE *share;
1348 my_off_t dfile_len, kfile_len;
1349 DBUG_ENTER("new_table");
1350
1351 checkpoint_useful= TRUE;
1352 if ((name == NULL) || (name[0] == 0))
1353 {
1354 /*
1355 we didn't use DBUG_ASSERT() because such record corruption could
1356 silently pass in the "info == NULL" test below.
1357 */
1358 tprint(tracef, ", record is corrupted");
1359 info= NULL;
1360 recovery_warnings++;
1361 goto end;
1362 }
1363 tprint(tracef, "Table '%s', id %u", name, sid);
1364 info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
1365 if (info == NULL)
1366 {
1367 tprint(tracef, ", is absent (must have been dropped later?)"
1368 " or its header is so corrupted that we cannot open it;"
1369 " we skip it");
1370 if (my_errno != ENOENT)
1371 recovery_found_crashed_tables++;
1372 error= 0;
1373 goto end;
1374 }
1375 share= info->s;
1376 /* check that we're not already using it */
1377 if (share->reopen != 1)
1378 {
1379 tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
1380 /*
1381 It could be that we have in the log
1382 FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12);
1383 */
1384 if (close_one_table(share->open_file_name.str, lsn_of_file_id))
1385 goto end;
1386 /*
1387 We should not try to get length of data/index files as the files
1388 are not on disk yet.
1389 */
1390 _ma_tmp_disable_logging_for_table(info, FALSE);
1391 goto set_lsn_of_file_id;
1392 }
1393 if (!share->base.born_transactional)
1394 {
1395 /*
1396 This can happen if one converts a transactional table to a
1397 not transactional table
1398 */
1399 tprint(tracef, ", is not transactional. Ignoring open request");
1400 error= -1;
1401 recovery_warnings++;
1402 goto end;
1403 }
1404 if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0)
1405 {
1406 tprint(tracef, ", has create_rename_lsn " LSN_FMT " more recent than"
1407 " LOGREC_FILE_ID's LSN " LSN_FMT ", ignoring open request",
1408 LSN_IN_PARTS(share->state.create_rename_lsn),
1409 LSN_IN_PARTS(lsn_of_file_id));
1410 recovery_warnings++;
1411 error= -1;
1412 goto end;
1413 /*
1414 Note that we tested that before testing corruption; a recent corrupted
1415 table is not a blocker for the present log record.
1416 */
1417 }
1418 if (maria_is_crashed(info))
1419 {
1420 eprint(tracef, "Table '%s' is crashed, skipping it. Please repair it with"
1421 " aria_chk -r", share->open_file_name.str);
1422 recovery_found_crashed_tables++;
1423 error= -1; /* not fatal, try with other tables */
1424 goto end;
1425 /*
1426 Note that if a first recovery fails to apply a REDO, it marks the table
1427 corrupted and stops the entire recovery. A second recovery will find the
1428 table is marked corrupted and skip it (and thus possibly handle other
1429 tables).
1430 */
1431 }
1432 /* don't log any records for this work */
1433 _ma_tmp_disable_logging_for_table(info, FALSE);
1434 /* execution of some REDO records relies on data_file_length */
1435 dfile_len= mysql_file_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
1436 kfile_len= mysql_file_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
1437 if ((dfile_len == MY_FILEPOS_ERROR) ||
1438 (kfile_len == MY_FILEPOS_ERROR))
1439 {
1440 tprint(tracef, ", length unknown\n");
1441 recovery_warnings++;
1442 goto end;
1443 }
1444 if (share->state.state.data_file_length != dfile_len)
1445 {
1446 tprint(tracef, ", has wrong state.data_file_length (fixing it)");
1447 share->state.state.data_file_length= dfile_len;
1448 }
1449 if (share->state.state.key_file_length != kfile_len)
1450 {
1451 tprint(tracef, ", has wrong state.key_file_length (fixing it)");
1452 share->state.state.key_file_length= kfile_len;
1453 }
1454 if ((dfile_len % share->block_size) || (kfile_len % share->block_size))
1455 {
1456 tprint(tracef, ", has too short last page\n");
1457 /* Recovery will fix this, no error */
1458 ALERT_USER();
1459 }
1460
1461set_lsn_of_file_id:
1462 /*
1463 This LSN serves in this situation; assume log is:
1464 FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1")
1465 then crash, checkpoint record is parsed and opens "t1" with id 6; assume
1466 REDO phase starts from the REDO_INSERT above: it will wrongly try to
1467 update a page of "t1". With this LSN below, REDO_INSERT can realize the
1468 mapping is newer than itself, and not execute.
1469 Same example is possible with UNDO_INSERT (update of the state).
1470 */
1471 info->s->lsn_of_file_id= lsn_of_file_id;
1472 all_tables[sid].info= info;
1473 /*
1474 We don't set info->s->id, it would be useless (no logging in REDO phase);
1475 if you change that, know that some records in REDO phase call
1476 _ma_update_state_lsns() which resets info->s->id.
1477 */
1478 tprint(tracef, ", opened");
1479 error= 0;
1480end:
1481 tprint(tracef, "\n");
1482 if (error)
1483 {
1484 if (info != NULL)
1485 {
1486 /* let maria_close() mark the table properly closed */
1487 info->s->state.open_count= 1;
1488 info->s->global_changed= 1;
1489 info->s->changed= 1;
1490 maria_close(info);
1491 }
1492 if (error == -1)
1493 error= 0;
1494 }
1495 DBUG_RETURN(error);
1496}
1497
1498/*
1499 NOTE
1500 This is called for REDO_INSERT_ROW_HEAD and READ_NEW_ROW_HEAD
1501*/
1502
1503prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
1504{
1505 int error= 1;
1506 uchar *buff= NULL;
1507 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1508 if (info == NULL || maria_is_crashed(info))
1509
1510 {
1511 /*
1512 Table was skipped at open time (because later dropped/renamed, not
1513 transactional, or create_rename_lsn newer than LOGREC_FILE_ID), or
1514 record was skipped due to skip_redo_lsn; it is not an error.
1515 */
1516 return 0;
1517 }
1518 /*
1519 Note that REDO is per page, we still consider it if its transaction
1520 committed long ago and is unknown.
1521 */
1522 /*
1523 If REDO's LSN is > page's LSN (read from disk), we are going to modify the
1524 page and change its LSN. The normal runtime code stores the UNDO's LSN
1525 into the page. Here storing the REDO's LSN (rec->lsn) would work
1526 (we are not writing to the log here, so don't have to "flush up to UNDO's
1527 LSN"). But in a test scenario where we do updates at runtime, then remove
1528 tables, apply the log and check that this results in the same table as at
1529 runtime, putting the same LSN as runtime had done will decrease
1530 differences. So we use the UNDO's LSN which is current_group_end_lsn.
1531 */
1532 enlarge_buffer(rec);
1533 if (log_record_buffer.str == NULL)
1534 {
1535 eprint(tracef, "Failed to read allocate buffer for record");
1536 goto end;
1537 }
1538 if (translog_read_record(rec->lsn, 0, rec->record_length,
1539 log_record_buffer.str, NULL) !=
1540 rec->record_length)
1541 {
1542 eprint(tracef, "Failed to read record");
1543 goto end;
1544 }
1545 buff= log_record_buffer.str;
1546 if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
1547 HEAD_PAGE,
1548 (rec->type ==
1549 LOGREC_REDO_NEW_ROW_HEAD),
1550 buff + FILEID_STORE_SIZE,
1551 buff +
1552 FILEID_STORE_SIZE +
1553 PAGE_STORE_SIZE +
1554 DIRPOS_STORE_SIZE,
1555 rec->record_length -
1556 (FILEID_STORE_SIZE +
1557 PAGE_STORE_SIZE +
1558 DIRPOS_STORE_SIZE)))
1559 goto end;
1560 error= 0;
1561end:
1562 return error;
1563}
1564
1565/*
1566 NOTE
1567 This is called for REDO_INSERT_ROW_TAIL and READ_NEW_ROW_TAIL
1568*/
1569
1570prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
1571{
1572 int error= 1;
1573 uchar *buff;
1574 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1575 if (info == NULL || maria_is_crashed(info))
1576 return 0;
1577 enlarge_buffer(rec);
1578 if (log_record_buffer.str == NULL ||
1579 translog_read_record(rec->lsn, 0, rec->record_length,
1580 log_record_buffer.str, NULL) !=
1581 rec->record_length)
1582 {
1583 eprint(tracef, "Failed to read record");
1584 goto end;
1585 }
1586 buff= log_record_buffer.str;
1587 if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
1588 TAIL_PAGE,
1589 (rec->type ==
1590 LOGREC_REDO_NEW_ROW_TAIL),
1591 buff + FILEID_STORE_SIZE,
1592 buff +
1593 FILEID_STORE_SIZE +
1594 PAGE_STORE_SIZE +
1595 DIRPOS_STORE_SIZE,
1596 rec->record_length -
1597 (FILEID_STORE_SIZE +
1598 PAGE_STORE_SIZE +
1599 DIRPOS_STORE_SIZE)))
1600 goto end;
1601 error= 0;
1602
1603end:
1604 return error;
1605}
1606
1607
1608prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
1609{
1610 int error= 1;
1611 uchar *buff;
1612 uint number_of_blobs, number_of_ranges;
1613 pgcache_page_no_t first_page, last_page;
1614 char llbuf1[22], llbuf2[22];
1615 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1616 if (info == NULL || maria_is_crashed(info))
1617 return 0;
1618 enlarge_buffer(rec);
1619 if (log_record_buffer.str == NULL ||
1620 translog_read_record(rec->lsn, 0, rec->record_length,
1621 log_record_buffer.str, NULL) !=
1622 rec->record_length)
1623 {
1624 eprint(tracef, "Failed to read record");
1625 goto end;
1626 }
1627 buff= log_record_buffer.str;
1628 if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
1629 buff, rec->lsn, &number_of_blobs,
1630 &number_of_ranges,
1631 &first_page, &last_page))
1632 goto end;
1633 llstr(first_page, llbuf1);
1634 llstr(last_page, llbuf2);
1635 tprint(tracef, " %u blobs %u ranges, first page %s last %s",
1636 number_of_blobs, number_of_ranges, llbuf1, llbuf2);
1637
1638 error= 0;
1639
1640end:
1641 tprint(tracef, " \n");
1642 return error;
1643}
1644
1645
1646prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
1647{
1648 int error= 1;
1649 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1650 if (info == NULL || maria_is_crashed(info))
1651 return 0;
1652 if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
1653 HEAD_PAGE,
1654 rec->header + FILEID_STORE_SIZE))
1655 goto end;
1656 error= 0;
1657end:
1658 return error;
1659}
1660
1661
1662prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
1663{
1664 int error= 1;
1665 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1666 if (info == NULL || maria_is_crashed(info))
1667 return 0;
1668 if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
1669 TAIL_PAGE,
1670 rec->header + FILEID_STORE_SIZE))
1671 goto end;
1672 error= 0;
1673end:
1674 return error;
1675}
1676
1677
1678prototype_redo_exec_hook(REDO_FREE_BLOCKS)
1679{
1680 int error= 1;
1681 uchar *buff;
1682 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1683 if (info == NULL || maria_is_crashed(info))
1684 return 0;
1685 enlarge_buffer(rec);
1686
1687 if (log_record_buffer.str == NULL ||
1688 translog_read_record(rec->lsn, 0, rec->record_length,
1689 log_record_buffer.str, NULL) !=
1690 rec->record_length)
1691 {
1692 eprint(tracef, "Failed to read record");
1693 goto end;
1694 }
1695
1696 buff= log_record_buffer.str;
1697 if (_ma_apply_redo_free_blocks(info, current_group_end_lsn, rec->lsn,
1698 buff))
1699 goto end;
1700 error= 0;
1701end:
1702 return error;
1703}
1704
1705
1706prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
1707{
1708 int error= 1;
1709 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1710 if (info == NULL || maria_is_crashed(info))
1711 return 0;
1712
1713 if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
1714 rec->header + FILEID_STORE_SIZE))
1715 goto end;
1716 error= 0;
1717end:
1718 return error;
1719}
1720
1721
1722prototype_redo_exec_hook(REDO_DELETE_ALL)
1723{
1724 int error= 1;
1725 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1726 if (info == NULL)
1727 return 0;
1728 tprint(tracef, " deleting all %lu rows\n",
1729 (ulong)info->s->state.state.records);
1730 if (maria_delete_all_rows(info))
1731 goto end;
1732 error= 0;
1733end:
1734 return error;
1735}
1736
1737
1738prototype_redo_exec_hook(REDO_INDEX)
1739{
1740 int error= 1;
1741 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1742 if (info == NULL || maria_is_crashed(info))
1743 return 0;
1744 enlarge_buffer(rec);
1745
1746 if (log_record_buffer.str == NULL ||
1747 translog_read_record(rec->lsn, 0, rec->record_length,
1748 log_record_buffer.str, NULL) !=
1749 rec->record_length)
1750 {
1751 eprint(tracef, "Failed to read record");
1752 goto end;
1753 }
1754
1755 if (_ma_apply_redo_index(info, current_group_end_lsn,
1756 log_record_buffer.str + FILEID_STORE_SIZE,
1757 rec->record_length - FILEID_STORE_SIZE))
1758 goto end;
1759 error= 0;
1760end:
1761 return error;
1762}
1763
1764prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)
1765{
1766 int error= 1;
1767 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1768 if (info == NULL || maria_is_crashed(info))
1769 return 0;
1770 enlarge_buffer(rec);
1771
1772 if (log_record_buffer.str == NULL ||
1773 translog_read_record(rec->lsn, 0, rec->record_length,
1774 log_record_buffer.str, NULL) !=
1775 rec->record_length)
1776 {
1777 eprint(tracef, "Failed to read record");
1778 goto end;
1779 }
1780
1781 if (_ma_apply_redo_index_new_page(info, current_group_end_lsn,
1782 log_record_buffer.str + FILEID_STORE_SIZE,
1783 rec->record_length - FILEID_STORE_SIZE))
1784 goto end;
1785 error= 0;
1786end:
1787 return error;
1788}
1789
1790
1791prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)
1792{
1793 int error= 1;
1794 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1795 if (info == NULL || maria_is_crashed(info))
1796 return 0;
1797
1798 if (_ma_apply_redo_index_free_page(info, current_group_end_lsn,
1799 rec->header + FILEID_STORE_SIZE))
1800 goto end;
1801 error= 0;
1802end:
1803 return error;
1804}
1805
1806
1807prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)
1808{
1809 int error= 1;
1810 MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
1811 if (info == NULL || maria_is_crashed(info))
1812 return 0;
1813 enlarge_buffer(rec);
1814
1815 if (log_record_buffer.str == NULL ||
1816 translog_read_record(rec->lsn, 0, rec->record_length,
1817 log_record_buffer.str, NULL) !=
1818 rec->record_length)
1819 {
1820 eprint(tracef, "Failed to read record");
1821 goto end;
1822 }
1823
1824 if (cmp_translog_addr(rec->lsn, checkpoint_start) >= 0)
1825 {
1826 /*
1827 Record is potentially after the bitmap flush made by Checkpoint, so has
1828 to be replayed. It may overwrite a more recent state but that will be
1829 corrected by all upcoming REDOs for data pages.
1830 If the condition is false, we must not apply the record: it is unneeded
1831 and nocive (may not be corrected as REDOs can be skipped due to
1832 dirty-pages list).
1833 */
1834 if (_ma_apply_redo_bitmap_new_page(info, current_group_end_lsn,
1835 log_record_buffer.str +
1836 FILEID_STORE_SIZE))
1837 goto end;
1838 }
1839 error= 0;
1840end:
1841 return error;
1842}
1843
1844
1845static inline void set_undo_lsn_for_active_trans(uint16 short_trid, LSN lsn)
1846{
1847 if (all_active_trans[short_trid].long_trid == 0)
1848 {
1849 /* transaction unknown, so has committed or fully rolled back long ago */
1850 return;
1851 }
1852 all_active_trans[short_trid].undo_lsn= lsn;
1853 if (all_active_trans[short_trid].first_undo_lsn == LSN_IMPOSSIBLE)
1854 all_active_trans[short_trid].first_undo_lsn= lsn;
1855}
1856
1857
1858prototype_redo_exec_hook(UNDO_ROW_INSERT)
1859{
1860 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1861 MARIA_SHARE *share;
1862
1863 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1864 if (info == NULL)
1865 {
1866 /*
1867 Note that we set undo_lsn anyway. So that if the transaction is later
1868 rolled back, this UNDO is tried for execution and we get a warning (as
1869 it would then be abnormal that info==NULL).
1870 */
1871 return 0;
1872 }
1873 share= info->s;
1874 if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1875 {
1876 tprint(tracef, " state has LSN " LSN_FMT " older than record, updating"
1877 " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon));
1878 share->state.state.records++;
1879 if (share->calc_checksum)
1880 {
1881 uchar buff[HA_CHECKSUM_STORE_SIZE];
1882 if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1883 PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
1884 HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1885 HA_CHECKSUM_STORE_SIZE)
1886 {
1887 eprint(tracef, "Failed to read record");
1888 return 1;
1889 }
1890 share->state.state.checksum+= ha_checksum_korr(buff);
1891 }
1892 info->s->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1893 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
1894 }
1895 tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
1896 /* Unpin all pages, stamp them with UNDO's LSN */
1897 _ma_unpin_all_pages(info, rec->lsn);
1898 return 0;
1899}
1900
1901
1902prototype_redo_exec_hook(UNDO_ROW_DELETE)
1903{
1904 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1905 MARIA_SHARE *share;
1906
1907 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1908 if (info == NULL)
1909 return 0;
1910 share= info->s;
1911 if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1912 {
1913 tprint(tracef, " state older than record\n");
1914 share->state.state.records--;
1915 if (share->calc_checksum)
1916 {
1917 uchar buff[HA_CHECKSUM_STORE_SIZE];
1918 if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1919 PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
1920 PAGERANGE_STORE_SIZE,
1921 HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1922 HA_CHECKSUM_STORE_SIZE)
1923 {
1924 eprint(tracef, "Failed to read record");
1925 return 1;
1926 }
1927 share->state.state.checksum+= ha_checksum_korr(buff);
1928 }
1929 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1930 STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
1931 STATE_NOT_MOVABLE);
1932 }
1933 tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
1934 _ma_unpin_all_pages(info, rec->lsn);
1935 return 0;
1936}
1937
1938
1939prototype_redo_exec_hook(UNDO_ROW_UPDATE)
1940{
1941 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
1942 MARIA_SHARE *share;
1943
1944 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1945 if (info == NULL)
1946 return 0;
1947 share= info->s;
1948 if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1949 {
1950 if (share->calc_checksum)
1951 {
1952 uchar buff[HA_CHECKSUM_STORE_SIZE];
1953 if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
1954 PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
1955 HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
1956 HA_CHECKSUM_STORE_SIZE)
1957 {
1958 eprint(tracef, "Failed to read record");
1959 return 1;
1960 }
1961 share->state.state.checksum+= ha_checksum_korr(buff);
1962 }
1963 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
1964 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
1965 }
1966 _ma_unpin_all_pages(info, rec->lsn);
1967 return 0;
1968}
1969
1970
1971prototype_redo_exec_hook(UNDO_KEY_INSERT)
1972{
1973 MARIA_HA *info;
1974 MARIA_SHARE *share;
1975
1976 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
1977 if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
1978 return 0;
1979 share= info->s;
1980 if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
1981 {
1982 const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
1983 uint keynr= key_nr_korr(ptr);
1984 if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */
1985 {
1986 const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg;
1987 ulonglong value;
1988 char llbuf[22];
1989 uchar reversed[MARIA_MAX_KEY_BUFF], *to;
1990 tprint(tracef, " state older than record\n");
1991 /* we read the record to find the auto_increment value */
1992 enlarge_buffer(rec);
1993 if (log_record_buffer.str == NULL ||
1994 translog_read_record(rec->lsn, 0, rec->record_length,
1995 log_record_buffer.str, NULL) !=
1996 rec->record_length)
1997 {
1998 eprint(tracef, "Failed to read record");
1999 return 1;
2000 }
2001 to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2002 KEY_NR_STORE_SIZE;
2003 if (keyseg->flag & HA_SWAP_KEY)
2004 {
2005 /* We put key from log record to "data record" packing format... */
2006 uchar *key_ptr= to;
2007 uchar *key_end= key_ptr + keyseg->length;
2008 to= reversed + keyseg->length;
2009 do
2010 {
2011 *--to= *key_ptr++;
2012 } while (key_ptr != key_end);
2013 /* ... so that we can read it with: */
2014 }
2015 value= ma_retrieve_auto_increment(to, keyseg->type);
2016 set_if_bigger(share->state.auto_increment, value);
2017 llstr(share->state.auto_increment, llbuf);
2018 tprint(tracef, " auto-inc %s\n", llbuf);
2019 }
2020 }
2021 _ma_unpin_all_pages(info, rec->lsn);
2022 return 0;
2023}
2024
2025
2026prototype_redo_exec_hook(UNDO_KEY_DELETE)
2027{
2028 MARIA_HA *info;
2029
2030 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2031 if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
2032 return 0;
2033 _ma_unpin_all_pages(info, rec->lsn);
2034 return 0;
2035}
2036
2037
2038prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
2039{
2040 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2041 MARIA_SHARE *share;
2042
2043 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2044 if (info == NULL)
2045 return 0;
2046 share= info->s;
2047 if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2048 {
2049 uint key_nr;
2050 my_off_t page;
2051 key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
2052 page= page_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2053 KEY_NR_STORE_SIZE);
2054 share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
2055 HA_OFFSET_ERROR :
2056 page * share->block_size);
2057 }
2058 _ma_unpin_all_pages(info, rec->lsn);
2059 return 0;
2060}
2061
2062
2063prototype_redo_exec_hook(UNDO_BULK_INSERT)
2064{
2065 /*
2066 If the repair finished it wrote and sync the state. If it didn't finish,
2067 we are going to empty the table and that will fix the state.
2068 */
2069 set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
2070 return 0;
2071}
2072
2073
2074prototype_redo_exec_hook(IMPORTED_TABLE)
2075{
2076 char *name;
2077 enlarge_buffer(rec);
2078 if (log_record_buffer.str == NULL ||
2079 translog_read_record(rec->lsn, 0, rec->record_length,
2080 log_record_buffer.str, NULL) !=
2081 rec->record_length)
2082 {
2083 eprint(tracef, "Failed to read record");
2084 return 1;
2085 }
2086 name= (char *)log_record_buffer.str;
2087 tprint(tracef, "Table '%s' was imported (auto-zerofilled) in this Aria instance\n", name);
2088 return 0;
2089}
2090
2091
2092prototype_redo_exec_hook(COMMIT)
2093{
2094 uint16 sid= rec->short_trid;
2095 TrID long_trid= all_active_trans[sid].long_trid;
2096 char llbuf[22];
2097 if (long_trid == 0)
2098 {
2099 tprint(tracef, "We don't know about transaction with short_trid %u;"
2100 "it probably committed long ago, forget it\n", sid);
2101 bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
2102 return 0;
2103 }
2104 llstr(long_trid, llbuf);
2105 tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
2106 llbuf, sid);
2107 bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
2108#ifdef MARIA_VERSIONING
2109 /*
2110 if real recovery:
2111 transaction was committed, move it to some separate list for later
2112 purging (but don't purge now! purging may have been started before, we
2113 may find REDO_PURGE records soon).
2114 */
2115#endif
2116 return 0;
2117}
2118
2119prototype_redo_exec_hook(CLR_END)
2120{
2121 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2122 MARIA_SHARE *share;
2123 LSN previous_undo_lsn;
2124 enum translog_record_type undone_record_type;
2125 const LOG_DESC *log_desc;
2126 my_bool row_entry= 0;
2127 uchar *logpos;
2128 DBUG_ENTER("exec_REDO_LOGREC_CLR_END");
2129
2130 previous_undo_lsn= lsn_korr(rec->header);
2131 undone_record_type=
2132 clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
2133 log_desc= &log_record_type_descriptor[undone_record_type];
2134
2135 set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
2136 if (info == NULL)
2137 DBUG_RETURN(0);
2138 share= info->s;
2139 tprint(tracef, " CLR_END was about %s, undo_lsn now LSN " LSN_FMT "\n",
2140 log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
2141
2142 enlarge_buffer(rec);
2143 if (log_record_buffer.str == NULL ||
2144 translog_read_record(rec->lsn, 0, rec->record_length,
2145 log_record_buffer.str, NULL) !=
2146 rec->record_length)
2147 {
2148 eprint(tracef, "Failed to read record");
2149 return 1;
2150 }
2151 logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
2152 CLR_TYPE_STORE_SIZE);
2153
2154 if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
2155 {
2156 tprint(tracef, " state older than record\n");
2157 switch (undone_record_type) {
2158 case LOGREC_UNDO_ROW_DELETE:
2159 row_entry= 1;
2160 share->state.state.records++;
2161 break;
2162 case LOGREC_UNDO_ROW_INSERT:
2163 share->state.state.records--;
2164 share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
2165 row_entry= 1;
2166 break;
2167 case LOGREC_UNDO_ROW_UPDATE:
2168 row_entry= 1;
2169 break;
2170 case LOGREC_UNDO_KEY_INSERT:
2171 case LOGREC_UNDO_KEY_DELETE:
2172 break;
2173 case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
2174 case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
2175 {
2176 uint key_nr;
2177 my_off_t page;
2178 key_nr= key_nr_korr(logpos);
2179 page= page_korr(logpos + KEY_NR_STORE_SIZE);
2180 share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
2181 HA_OFFSET_ERROR :
2182 page * share->block_size);
2183 break;
2184 }
2185 case LOGREC_UNDO_BULK_INSERT:
2186 break;
2187 default:
2188 DBUG_ASSERT(0);
2189 }
2190 if (row_entry && share->calc_checksum)
2191 share->state.state.checksum+= ha_checksum_korr(logpos);
2192 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2193 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2194 }
2195 if (row_entry)
2196 tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
2197 _ma_unpin_all_pages(info, rec->lsn);
2198 DBUG_RETURN(0);
2199}
2200
2201
2202/**
2203 Hock to print debug information (like MySQL query)
2204*/
2205
2206prototype_redo_exec_hook(DEBUG_INFO)
2207{
2208 uchar *data;
2209 enum translog_debug_info_type debug_info;
2210
2211 enlarge_buffer(rec);
2212 if (log_record_buffer.str == NULL ||
2213 translog_read_record(rec->lsn, 0, rec->record_length,
2214 log_record_buffer.str, NULL) !=
2215 rec->record_length)
2216 {
2217 eprint(tracef, "Failed to read record debug record");
2218 return 1;
2219 }
2220 debug_info= (enum translog_debug_info_type) log_record_buffer.str[0];
2221 data= log_record_buffer.str + 1;
2222 switch (debug_info) {
2223 case LOGREC_DEBUG_INFO_QUERY:
2224 tprint(tracef, "Query: %.*s\n", rec->record_length - 1,
2225 (char*) data);
2226 break;
2227 default:
2228 DBUG_ASSERT(0);
2229 }
2230 return 0;
2231}
2232
2233
2234/**
2235 In some cases we have to skip execution of an UNDO record during the UNDO
2236 phase.
2237*/
2238
2239static void skip_undo_record(LSN previous_undo_lsn, TRN *trn)
2240{
2241 trn->undo_lsn= previous_undo_lsn;
2242 if (previous_undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
2243 trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
2244 skipped_undo_phase++;
2245}
2246
2247
2248prototype_undo_exec_hook(UNDO_ROW_INSERT)
2249{
2250 my_bool error;
2251 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2252 LSN previous_undo_lsn= lsn_korr(rec->header);
2253 MARIA_SHARE *share;
2254 const uchar *record_ptr;
2255
2256 if (info == NULL || maria_is_crashed(info))
2257 {
2258 /*
2259 Unlike for REDOs, if the table was skipped it is abnormal; we have a
2260 transaction to rollback which used this table, as it is not rolled back
2261 it was supposed to hold this table and so the table should still be
2262 there. Skip it (user may have repaired the table with maria_chk because
2263 it was so badly corrupted that a previous recovery failed) but warn.
2264 */
2265 skip_undo_record(previous_undo_lsn, trn);
2266 return 0;
2267 }
2268 share= info->s;
2269 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2270 STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
2271 STATE_NOT_MOVABLE);
2272 record_ptr= rec->header;
2273 if (share->calc_checksum)
2274 {
2275 /*
2276 We need to read more of the record to put the checksum into the record
2277 buffer used by _ma_apply_undo_row_insert().
2278 If the table has no live checksum, rec->header will be enough.
2279 */
2280 enlarge_buffer(rec);
2281 if (log_record_buffer.str == NULL ||
2282 translog_read_record(rec->lsn, 0, rec->record_length,
2283 log_record_buffer.str, NULL) !=
2284 rec->record_length)
2285 {
2286 eprint(tracef, "Failed to read record");
2287 return 1;
2288 }
2289 record_ptr= log_record_buffer.str;
2290 }
2291
2292 info->trn= trn;
2293 error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
2294 record_ptr + LSN_STORE_SIZE +
2295 FILEID_STORE_SIZE);
2296 info->trn= 0;
2297 /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2298 tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
2299 tprint(tracef, " undo_lsn now LSN " LSN_FMT "\n",
2300 LSN_IN_PARTS(trn->undo_lsn));
2301 return error;
2302}
2303
2304
2305prototype_undo_exec_hook(UNDO_ROW_DELETE)
2306{
2307 my_bool error;
2308 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2309 LSN previous_undo_lsn= lsn_korr(rec->header);
2310 MARIA_SHARE *share;
2311
2312 if (info == NULL || maria_is_crashed(info))
2313 {
2314 skip_undo_record(previous_undo_lsn, trn);
2315 return 0;
2316 }
2317
2318 share= info->s;
2319 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2320 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2321 enlarge_buffer(rec);
2322 if (log_record_buffer.str == NULL ||
2323 translog_read_record(rec->lsn, 0, rec->record_length,
2324 log_record_buffer.str, NULL) !=
2325 rec->record_length)
2326 {
2327 eprint(tracef, "Failed to read record");
2328 return 1;
2329 }
2330
2331 info->trn= trn;
2332 error= _ma_apply_undo_row_delete(info, previous_undo_lsn,
2333 log_record_buffer.str + LSN_STORE_SIZE +
2334 FILEID_STORE_SIZE,
2335 rec->record_length -
2336 (LSN_STORE_SIZE + FILEID_STORE_SIZE));
2337 info->trn= 0;
2338 tprint(tracef, " rows' count %lu\n undo_lsn now LSN " LSN_FMT "\n",
2339 (ulong)share->state.state.records, LSN_IN_PARTS(trn->undo_lsn));
2340 return error;
2341}
2342
2343
2344prototype_undo_exec_hook(UNDO_ROW_UPDATE)
2345{
2346 my_bool error;
2347 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2348 LSN previous_undo_lsn= lsn_korr(rec->header);
2349 MARIA_SHARE *share;
2350
2351 if (info == NULL || maria_is_crashed(info))
2352 {
2353 skip_undo_record(previous_undo_lsn, trn);
2354 return 0;
2355 }
2356
2357 share= info->s;
2358 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2359 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2360 enlarge_buffer(rec);
2361 if (log_record_buffer.str == NULL ||
2362 translog_read_record(rec->lsn, 0, rec->record_length,
2363 log_record_buffer.str, NULL) !=
2364 rec->record_length)
2365 {
2366 eprint(tracef, "Failed to read record");
2367 return 1;
2368 }
2369
2370 info->trn= trn;
2371 error= _ma_apply_undo_row_update(info, previous_undo_lsn,
2372 log_record_buffer.str + LSN_STORE_SIZE +
2373 FILEID_STORE_SIZE,
2374 rec->record_length -
2375 (LSN_STORE_SIZE + FILEID_STORE_SIZE));
2376 info->trn= 0;
2377 tprint(tracef, " undo_lsn now LSN " LSN_FMT "\n",
2378 LSN_IN_PARTS(trn->undo_lsn));
2379 return error;
2380}
2381
2382
2383prototype_undo_exec_hook(UNDO_KEY_INSERT)
2384{
2385 my_bool error;
2386 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2387 LSN previous_undo_lsn= lsn_korr(rec->header);
2388 MARIA_SHARE *share;
2389
2390 if (info == NULL || maria_is_crashed(info))
2391 {
2392 skip_undo_record(previous_undo_lsn, trn);
2393 return 0;
2394 }
2395
2396 share= info->s;
2397 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2398 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2399
2400 enlarge_buffer(rec);
2401 if (log_record_buffer.str == NULL ||
2402 translog_read_record(rec->lsn, 0, rec->record_length,
2403 log_record_buffer.str, NULL) !=
2404 rec->record_length)
2405 {
2406 eprint(tracef, "Failed to read record");
2407 return 1;
2408 }
2409
2410 info->trn= trn;
2411 error= _ma_apply_undo_key_insert(info, previous_undo_lsn,
2412 log_record_buffer.str + LSN_STORE_SIZE +
2413 FILEID_STORE_SIZE,
2414 rec->record_length - LSN_STORE_SIZE -
2415 FILEID_STORE_SIZE);
2416 info->trn= 0;
2417 /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2418 tprint(tracef, " undo_lsn now LSN " LSN_FMT "\n",
2419 LSN_IN_PARTS(trn->undo_lsn));
2420 return error;
2421}
2422
2423
2424prototype_undo_exec_hook(UNDO_KEY_DELETE)
2425{
2426 my_bool error;
2427 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2428 LSN previous_undo_lsn= lsn_korr(rec->header);
2429 MARIA_SHARE *share;
2430
2431 if (info == NULL || maria_is_crashed(info))
2432 {
2433 skip_undo_record(previous_undo_lsn, trn);
2434 return 0;
2435 }
2436
2437 share= info->s;
2438 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2439 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2440
2441 enlarge_buffer(rec);
2442 if (log_record_buffer.str == NULL ||
2443 translog_read_record(rec->lsn, 0, rec->record_length,
2444 log_record_buffer.str, NULL) !=
2445 rec->record_length)
2446 {
2447 eprint(tracef, "Failed to read record");
2448 return 1;
2449 }
2450
2451 info->trn= trn;
2452 error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
2453 log_record_buffer.str + LSN_STORE_SIZE +
2454 FILEID_STORE_SIZE,
2455 rec->record_length - LSN_STORE_SIZE -
2456 FILEID_STORE_SIZE, FALSE);
2457 info->trn= 0;
2458 /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2459 tprint(tracef, " undo_lsn now LSN " LSN_FMT "\n",
2460 LSN_IN_PARTS(trn->undo_lsn));
2461 return error;
2462}
2463
2464
2465prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
2466{
2467 my_bool error;
2468 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2469 LSN previous_undo_lsn= lsn_korr(rec->header);
2470 MARIA_SHARE *share;
2471
2472 if (info == NULL || maria_is_crashed(info))
2473 {
2474 skip_undo_record(previous_undo_lsn, trn);
2475 return 0;
2476 }
2477
2478 share= info->s;
2479 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2480 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2481
2482 enlarge_buffer(rec);
2483 if (log_record_buffer.str == NULL ||
2484 translog_read_record(rec->lsn, 0, rec->record_length,
2485 log_record_buffer.str, NULL) !=
2486 rec->record_length)
2487 {
2488 eprint(tracef, "Failed to read record");
2489 return 1;
2490 }
2491
2492 info->trn= trn;
2493 error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
2494 log_record_buffer.str + LSN_STORE_SIZE +
2495 FILEID_STORE_SIZE,
2496 rec->record_length - LSN_STORE_SIZE -
2497 FILEID_STORE_SIZE, TRUE);
2498 info->trn= 0;
2499 /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2500 tprint(tracef, " undo_lsn now LSN " LSN_FMT "\n",
2501 LSN_IN_PARTS(trn->undo_lsn));
2502 return error;
2503}
2504
2505
2506prototype_undo_exec_hook(UNDO_BULK_INSERT)
2507{
2508 my_bool error;
2509 MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
2510 LSN previous_undo_lsn= lsn_korr(rec->header);
2511 MARIA_SHARE *share;
2512
2513 /* Here we don't check for crashed as we can undo the bulk insert */
2514 if (info == NULL)
2515 {
2516 skip_undo_record(previous_undo_lsn, trn);
2517 return 0;
2518 }
2519
2520 share= info->s;
2521 share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
2522 STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
2523
2524 info->trn= trn;
2525 error= _ma_apply_undo_bulk_insert(info, previous_undo_lsn);
2526 info->trn= 0;
2527 /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
2528 tprint(tracef, " undo_lsn now LSN " LSN_FMT "\n",
2529 LSN_IN_PARTS(trn->undo_lsn));
2530 return error;
2531}
2532
2533
2534static int run_redo_phase(LSN lsn, LSN lsn_end, enum maria_apply_log_way apply)
2535{
2536 TRANSLOG_HEADER_BUFFER rec;
2537 struct st_translog_scanner_data scanner;
2538 int len;
2539 uint i;
2540 DBUG_ENTER("run_redo_phase");
2541
2542 /* install hooks for execution */
2543#define install_redo_exec_hook(R) \
2544 log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
2545 exec_REDO_LOGREC_ ## R;
2546#define install_redo_exec_hook_shared(R,S) \
2547 log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
2548 exec_REDO_LOGREC_ ## S;
2549#define install_undo_exec_hook(R) \
2550 log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
2551 exec_UNDO_LOGREC_ ## R;
2552 install_redo_exec_hook(LONG_TRANSACTION_ID);
2553 install_redo_exec_hook(CHECKPOINT);
2554 install_redo_exec_hook(REDO_CREATE_TABLE);
2555 install_redo_exec_hook(REDO_RENAME_TABLE);
2556 install_redo_exec_hook(REDO_REPAIR_TABLE);
2557 install_redo_exec_hook(REDO_DROP_TABLE);
2558 install_redo_exec_hook(FILE_ID);
2559 install_redo_exec_hook(INCOMPLETE_LOG);
2560 install_redo_exec_hook(INCOMPLETE_GROUP);
2561 install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
2562 install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
2563 install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
2564 install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
2565 install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
2566 install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
2567 install_redo_exec_hook(REDO_FREE_BLOCKS);
2568 install_redo_exec_hook(REDO_DELETE_ALL);
2569 install_redo_exec_hook(REDO_INDEX);
2570 install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
2571 install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
2572 install_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
2573 install_redo_exec_hook(UNDO_ROW_INSERT);
2574 install_redo_exec_hook(UNDO_ROW_DELETE);
2575 install_redo_exec_hook(UNDO_ROW_UPDATE);
2576 install_redo_exec_hook(UNDO_KEY_INSERT);
2577 install_redo_exec_hook(UNDO_KEY_DELETE);
2578 install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
2579 install_redo_exec_hook(COMMIT);
2580 install_redo_exec_hook(CLR_END);
2581 install_undo_exec_hook(UNDO_ROW_INSERT);
2582 install_undo_exec_hook(UNDO_ROW_DELETE);
2583 install_undo_exec_hook(UNDO_ROW_UPDATE);
2584 install_undo_exec_hook(UNDO_KEY_INSERT);
2585 install_undo_exec_hook(UNDO_KEY_DELETE);
2586 install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
2587 /* REDO_NEW_ROW_HEAD shares entry with REDO_INSERT_ROW_HEAD */
2588 install_redo_exec_hook_shared(REDO_NEW_ROW_HEAD, REDO_INSERT_ROW_HEAD);
2589 /* REDO_NEW_ROW_TAIL shares entry with REDO_INSERT_ROW_TAIL */
2590 install_redo_exec_hook_shared(REDO_NEW_ROW_TAIL, REDO_INSERT_ROW_TAIL);
2591 install_redo_exec_hook(UNDO_BULK_INSERT);
2592 install_undo_exec_hook(UNDO_BULK_INSERT);
2593 install_redo_exec_hook(IMPORTED_TABLE);
2594 install_redo_exec_hook(DEBUG_INFO);
2595
2596 current_group_end_lsn= LSN_IMPOSSIBLE;
2597#ifndef DBUG_OFF
2598 current_group_table= NULL;
2599#endif
2600
2601 if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
2602 {
2603 tprint(tracef, "checkpoint address refers to the log end log or "
2604 "log is empty, nothing to do.\n");
2605 DBUG_RETURN(0);
2606 }
2607
2608 len= translog_read_record_header(lsn, &rec);
2609
2610 if (len == RECHEADER_READ_ERROR)
2611 {
2612 eprint(tracef, "Failed to read header of the first record.");
2613 DBUG_RETURN(1);
2614 }
2615 if (translog_scanner_init(lsn, 1, &scanner, 1))
2616 {
2617 tprint(tracef, "Scanner init failed\n");
2618 DBUG_RETURN(1);
2619 }
2620 for (i= 1;;i++)
2621 {
2622 uint16 sid= rec.short_trid;
2623 const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
2624 display_record_position(log_desc, &rec, i);
2625 /*
2626 A complete group is a set of log records with an "end mark" record
2627 (e.g. a set of REDOs for an operation, terminated by an UNDO for this
2628 operation); if there is no "end mark" record the group is incomplete and
2629 won't be executed.
2630 */
2631 if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
2632 (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
2633 {
2634 if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
2635 {
2636 if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
2637 {
2638 /*
2639 Can happen if the transaction got a table write error, then
2640 unlocked tables thus wrote a COMMIT record. Or can be an
2641 INCOMPLETE_GROUP record written by a previous recovery.
2642 */
2643 tprint(tracef, "\nDiscarding incomplete group before this record\n");
2644 all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2645 }
2646 else
2647 {
2648 struct st_translog_scanner_data scanner2;
2649 TRANSLOG_HEADER_BUFFER rec2;
2650 /*
2651 There is a complete group for this transaction, containing more
2652 than this event.
2653 */
2654 tprint(tracef, " ends a group:\n");
2655 len=
2656 translog_read_record_header(all_active_trans[sid].group_start_lsn,
2657 &rec2);
2658 if (len < 0) /* EOF or error */
2659 {
2660 tprint(tracef, "Cannot find record where it should be\n");
2661 goto err;
2662 }
2663 if (lsn_end != LSN_IMPOSSIBLE && rec2.lsn >= lsn_end)
2664 {
2665 tprint(tracef,
2666 "lsn_end reached at " LSN_FMT ". "
2667 "Skipping rest of redo entries",
2668 LSN_IN_PARTS(rec2.lsn));
2669 translog_destroy_scanner(&scanner);
2670 translog_free_record_header(&rec);
2671 DBUG_RETURN(0);
2672 }
2673
2674 if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1))
2675 {
2676 tprint(tracef, "Scanner2 init failed\n");
2677 goto err;
2678 }
2679 current_group_end_lsn= rec.lsn;
2680 do
2681 {
2682 if (rec2.short_trid == sid) /* it's in our group */
2683 {
2684 const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
2685 display_record_position(log_desc2, &rec2, 0);
2686 if (apply == MARIA_LOG_CHECK)
2687 {
2688 translog_size_t read_len;
2689 enlarge_buffer(&rec2);
2690 read_len=
2691 translog_read_record(rec2.lsn, 0, rec2.record_length,
2692 log_record_buffer.str, NULL);
2693 if (read_len != rec2.record_length)
2694 {
2695 tprint(tracef, "Cannot read record's body: read %u of"
2696 " %u bytes\n", read_len, rec2.record_length);
2697 translog_destroy_scanner(&scanner2);
2698 translog_free_record_header(&rec2);
2699 goto err;
2700 }
2701 }
2702 if (apply == MARIA_LOG_APPLY &&
2703 display_and_apply_record(log_desc2, &rec2))
2704 {
2705 translog_destroy_scanner(&scanner2);
2706 translog_free_record_header(&rec2);
2707 goto err;
2708 }
2709 }
2710 translog_free_record_header(&rec2);
2711 len= translog_read_next_record_header(&scanner2, &rec2);
2712 if (len < 0) /* EOF or error */
2713 {
2714 tprint(tracef, "Cannot find record where it should be\n");
2715 translog_destroy_scanner(&scanner2);
2716 translog_free_record_header(&rec2);
2717 goto err;
2718 }
2719 }
2720 while (rec2.lsn < rec.lsn);
2721 /* group finished */
2722 all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2723 current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
2724 display_record_position(log_desc, &rec, 0);
2725 translog_destroy_scanner(&scanner2);
2726 translog_free_record_header(&rec2);
2727 }
2728 }
2729 if (apply == MARIA_LOG_APPLY &&
2730 display_and_apply_record(log_desc, &rec))
2731 goto err;
2732#ifndef DBUG_OFF
2733 current_group_table= NULL;
2734#endif
2735 }
2736 else /* record does not end group */
2737 {
2738 /* just record the fact, can't know if can execute yet */
2739 if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
2740 {
2741 /* group not yet started */
2742 all_active_trans[sid].group_start_lsn= rec.lsn;
2743 }
2744 }
2745 translog_free_record_header(&rec);
2746 len= translog_read_next_record_header(&scanner, &rec);
2747 if (len < 0)
2748 {
2749 switch (len)
2750 {
2751 case RECHEADER_READ_EOF:
2752 tprint(tracef, "EOF on the log\n");
2753 break;
2754 case RECHEADER_READ_ERROR:
2755 tprint(tracef, "Error reading log\n");
2756 goto err;
2757 }
2758 break;
2759 }
2760 }
2761 translog_destroy_scanner(&scanner);
2762 translog_free_record_header(&rec);
2763 if (recovery_message_printed == REC_MSG_REDO)
2764 {
2765 fprintf(stderr, " 100%%");
2766 fflush(stderr);
2767 procent_printed= 1;
2768 }
2769 DBUG_RETURN(0);
2770
2771err:
2772 translog_destroy_scanner(&scanner);
2773 translog_free_record_header(&rec);
2774 DBUG_RETURN(1);
2775}
2776
2777
2778/**
2779 @brief Informs about any aborted groups or uncommitted transactions,
2780 prepares for the UNDO phase if needed.
2781
2782 @note Observe that it may init trnman.
2783*/
2784static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
2785{
2786 uint sid, uncommitted= 0;
2787 char llbuf[22];
2788 LSN addr;
2789
2790 my_hash_free(&all_dirty_pages);
2791 /*
2792 hash_free() can be called multiple times probably, but be safe if that
2793 changes
2794 */
2795 bzero(&all_dirty_pages, sizeof(all_dirty_pages));
2796 my_free(dirty_pages_pool);
2797 dirty_pages_pool= NULL;
2798
2799 llstr(max_long_trid, llbuf);
2800 tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf);
2801 llstr(max_trid_in_control_file, llbuf);
2802 tprint(tracef, "Maximum transaction long id seen in control file: %s\n",
2803 llbuf);
2804 /*
2805 If logs were deleted, or lost, trid in control file is needed to set
2806 trnman's generator:
2807 */
2808 set_if_bigger(max_long_trid, max_trid_in_control_file);
2809 if (prepare_for_undo_phase && trnman_init(max_long_trid))
2810 return -1;
2811
2812 trns_created= TRUE;
2813
2814 for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
2815 {
2816 TrID long_trid= all_active_trans[sid].long_trid;
2817 LSN gslsn= all_active_trans[sid].group_start_lsn;
2818 TRN *trn;
2819 if (gslsn != LSN_IMPOSSIBLE)
2820 {
2821 tprint(tracef, "Group at LSN " LSN_FMT " short_trid %u incomplete\n",
2822 LSN_IN_PARTS(gslsn), sid);
2823 all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
2824 }
2825 if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
2826 {
2827 llstr(long_trid, llbuf);
2828 tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
2829 llbuf, sid);
2830 /*
2831 dummy_transaction_object serves only for DDLs, where there is never a
2832 rollback or incomplete group. And unknown transactions (which have
2833 long_trid==0) should have undo_lsn==LSN_IMPOSSIBLE.
2834 */
2835 if (long_trid ==0)
2836 {
2837 eprint(tracef, "Transaction with long_trid 0 should not roll back");
2838 ALERT_USER();
2839 return -1;
2840 }
2841 if (prepare_for_undo_phase)
2842 {
2843 if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
2844 return -1;
2845 trn->undo_lsn= all_active_trans[sid].undo_lsn;
2846 trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
2847 TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
2848 if (gslsn != LSN_IMPOSSIBLE)
2849 {
2850 /*
2851 UNDO phase will log some records. So, a future recovery may see:
2852 REDO(from incomplete group) - REDO(from rollback) - CLR_END
2853 and thus execute the first REDO (finding it in "a complete
2854 group"). To prevent that:
2855 */
2856 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS];
2857 LSN lsn;
2858 if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
2859 trn, NULL, 0,
2860 TRANSLOG_INTERNAL_PARTS, log_array,
2861 NULL, NULL))
2862 return -1;
2863 }
2864 }
2865 uncommitted++;
2866 }
2867#ifdef MARIA_VERSIONING
2868 /*
2869 If real recovery: if transaction was committed, move it to some separate
2870 list for soon purging.
2871 */
2872#endif
2873 }
2874
2875 my_free(all_active_trans);
2876 all_active_trans= NULL;
2877
2878 /*
2879 The UNDO phase uses some normal run-time code of ROLLBACK: generates log
2880 records, etc; prepare tables for that
2881 */
2882 addr= translog_get_horizon();
2883 for (sid= 0; sid <= SHARE_ID_MAX; sid++)
2884 {
2885 MARIA_HA *info= all_tables[sid].info;
2886 if (info != NULL)
2887 {
2888 prepare_table_for_close(info, addr);
2889 /*
2890 But we don't close it; we leave it available for the UNDO phase;
2891 it's likely that the UNDO phase will need it.
2892 */
2893 if (prepare_for_undo_phase)
2894 translog_assign_id_to_share_from_recovery(info->s, sid);
2895 }
2896 }
2897 return uncommitted;
2898}
2899
2900
2901static int run_undo_phase(uint uncommitted)
2902{
2903 LSN last_undo __attribute__((unused));
2904 DBUG_ENTER("run_undo_phase");
2905
2906 if (uncommitted > 0)
2907 {
2908 checkpoint_useful= TRUE;
2909 if (tracef != stdout)
2910 {
2911 if (recovery_message_printed == REC_MSG_NONE)
2912 print_preamble();
2913 fprintf(stderr, "transactions to roll back:");
2914 recovery_message_printed= REC_MSG_UNDO;
2915 }
2916 tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
2917 procent_printed= 1;
2918 for( ; ; )
2919 {
2920 char llbuf[22];
2921 TRN *trn;
2922 if (recovery_message_printed == REC_MSG_UNDO)
2923 {
2924 fprintf(stderr, " %u", uncommitted);
2925 fflush(stderr);
2926 }
2927 if ((uncommitted--) == 0)
2928 break;
2929 trn= trnman_get_any_trn();
2930 DBUG_ASSERT(trn != NULL);
2931 llstr(trn->trid, llbuf);
2932 tprint(tracef, "Rolling back transaction of long id %s\n", llbuf);
2933 last_undo= trn->undo_lsn + 1;
2934
2935 /* Execute all undo entries */
2936 while (trn->undo_lsn)
2937 {
2938 TRANSLOG_HEADER_BUFFER rec;
2939 LOG_DESC *log_desc;
2940 DBUG_ASSERT(trn->undo_lsn < last_undo);
2941 last_undo= trn->undo_lsn;
2942
2943 if (translog_read_record_header(trn->undo_lsn, &rec) ==
2944 RECHEADER_READ_ERROR)
2945 DBUG_RETURN(1);
2946 log_desc= &log_record_type_descriptor[rec.type];
2947 display_record_position(log_desc, &rec, 0);
2948 if (log_desc->record_execute_in_undo_phase(&rec, trn))
2949 {
2950 eprint(tracef, "Got error %d when executing undo %s", my_errno,
2951 log_desc->name);
2952 translog_free_record_header(&rec);
2953 DBUG_RETURN(1);
2954 }
2955 translog_free_record_header(&rec);
2956 }
2957
2958 /* Force a crash to test recovery of recovery */
2959 if (maria_recovery_force_crash_counter)
2960 {
2961 DBUG_ASSERT(--maria_recovery_force_crash_counter > 0);
2962 }
2963
2964 if (trnman_rollback_trn(trn))
2965 DBUG_RETURN(1);
2966 /* We could want to span a few threads (4?) instead of 1 */
2967 /* In the future, we want to have this phase *online* */
2968 }
2969 }
2970 procent_printed= 0;
2971 DBUG_RETURN(0);
2972}
2973
2974
2975/**
2976 In case of error in recovery, deletes all transactions from the transaction
2977 manager so that this module does not assert.
2978
2979 @note no checkpoint should be taken as those transactions matter for the
2980 next recovery (they still haven't been properly dealt with).
2981*/
2982
2983static void delete_all_transactions()
2984{
2985 for( ; ; )
2986 {
2987 TRN *trn= trnman_get_any_trn();
2988 if (trn == NULL)
2989 break;
2990 trn->undo_lsn= trn->first_undo_lsn= LSN_IMPOSSIBLE;
2991 trnman_rollback_trn(trn); /* ignore error */
2992 }
2993}
2994
2995
2996/**
2997 @brief re-enables transactionality, updates is_of_horizon
2998
2999 @param info table
3000 @param horizon address to set is_of_horizon
3001*/
3002
3003static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
3004{
3005 MARIA_SHARE *share= info->s;
3006 /*
3007 In a fully-forward REDO phase (no checkpoint record),
3008 state is now at least as new as the LSN of the current record. It may be
3009 newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a
3010 table, but that table was later modified further in the log.
3011 But if we parsed a checkpoint record, it may be this way in the log:
3012 FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1)
3013 Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to
3014 make t1 close; the first condition below is however false (when checkpoint
3015 was taken it increased is_of_horizon) and so it works. For safety we
3016 add the second condition.
3017 */
3018 if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 &&
3019 cmp_translog_addr(share->lsn_of_file_id, horizon) < 0)
3020 {
3021 share->state.is_of_horizon= horizon;
3022 _ma_state_info_write_sub(share->kfile.file, &share->state,
3023 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET);
3024 }
3025
3026 /*
3027 Ensure that info->state is up to date as
3028 _ma_renable_logging_for_table() is depending on this
3029 */
3030 *info->state= info->s->state.state;
3031
3032 /*
3033 This leaves PAGECACHE_PLAIN_PAGE pages into the cache, while the table is
3034 going to switch back to transactional. So the table will be a mix of
3035 pages, which is ok as long as we don't take any checkpoints until all
3036 tables get closed at the end of the UNDO phase.
3037 */
3038 _ma_reenable_logging_for_table(info, FALSE);
3039 info->trn= NULL; /* safety */
3040}
3041
3042
3043static MARIA_HA *get_MARIA_HA_from_REDO_record(const
3044 TRANSLOG_HEADER_BUFFER *rec)
3045{
3046 uint16 sid;
3047 pgcache_page_no_t UNINIT_VAR(page);
3048 MARIA_HA *info;
3049 MARIA_SHARE *share;
3050 char llbuf[22];
3051 my_bool index_page_redo_entry= FALSE, page_redo_entry= FALSE;
3052
3053 print_redo_phase_progress(rec->lsn);
3054 sid= fileid_korr(rec->header);
3055 switch (rec->type) {
3056 /* not all REDO records have a page: */
3057 case LOGREC_REDO_INDEX_NEW_PAGE:
3058 case LOGREC_REDO_INDEX:
3059 case LOGREC_REDO_INDEX_FREE_PAGE:
3060 index_page_redo_entry= 1;
3061 /* fall through*/
3062 case LOGREC_REDO_INSERT_ROW_HEAD:
3063 case LOGREC_REDO_INSERT_ROW_TAIL:
3064 case LOGREC_REDO_PURGE_ROW_HEAD:
3065 case LOGREC_REDO_PURGE_ROW_TAIL:
3066 case LOGREC_REDO_NEW_ROW_HEAD:
3067 case LOGREC_REDO_NEW_ROW_TAIL:
3068 case LOGREC_REDO_FREE_HEAD_OR_TAIL:
3069 page_redo_entry= TRUE;
3070 page= page_korr(rec->header + FILEID_STORE_SIZE);
3071 llstr(page, llbuf);
3072 break;
3073 case LOGREC_REDO_FREE_BLOCKS:
3074 /*
3075 We are checking against the dirty pages in _ma_apply_redo_free_blocks()
3076 */
3077 break;
3078 default:
3079 break;
3080 }
3081 tprint(tracef, " For table of short id %u", sid);
3082 info= all_tables[sid].info;
3083#ifndef DBUG_OFF
3084 DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
3085 current_group_table= info;
3086#endif
3087 if (info == NULL)
3088 {
3089 tprint(tracef, ", table skipped, so skipping record\n");
3090 return NULL;
3091 }
3092 share= info->s;
3093 tprint(tracef, ", '%s'", share->open_file_name.str);
3094 DBUG_ASSERT(in_redo_phase);
3095 if (!table_is_part_of_recovery_set(&share->open_file_name))
3096 {
3097 tprint(tracef, ", skipped by user\n");
3098 return NULL;
3099 }
3100
3101 if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
3102 {
3103 /*
3104 This can happen only if processing a record before the checkpoint
3105 record.
3106 id->name mapping is newer than REDO record: for sure the table subject
3107 of the REDO has been flushed and forced (id re-assignment implies this);
3108 REDO can be ignored (and must be, as we don't know what this subject
3109 table was).
3110 */
3111 DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
3112 tprint(tracef, ", table's LOGREC_FILE_ID has LSN " LSN_FMT " more recent"
3113 " than record, skipping record",
3114 LSN_IN_PARTS(share->lsn_of_file_id));
3115 return NULL;
3116 }
3117 if (cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
3118 {
3119 /* probably a bulk insert repair */
3120 tprint(tracef, ", has skip_redo_lsn " LSN_FMT " more recent than"
3121 " record, skipping record\n",
3122 LSN_IN_PARTS(share->state.skip_redo_lsn));
3123 return NULL;
3124 }
3125 /* detect if an open instance of a dropped table (internal bug) */
3126 DBUG_ASSERT(share->last_version != 0);
3127 if (page_redo_entry)
3128 {
3129 /*
3130 Consult dirty pages list.
3131 REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
3132 pages.
3133 */
3134 if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
3135 index_page_redo_entry))
3136 return NULL;
3137 }
3138 /*
3139 So we are going to read the page, and if its LSN is older than the
3140 record's we will modify the page
3141 */
3142 tprint(tracef, ", applying record\n");
3143 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
3144 return info;
3145}
3146
3147
3148static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
3149 TRANSLOG_HEADER_BUFFER *rec)
3150{
3151 uint16 sid;
3152 MARIA_HA *info;
3153 MARIA_SHARE *share;
3154
3155 sid= fileid_korr(rec->header + LSN_STORE_SIZE);
3156 tprint(tracef, " For table of short id %u", sid);
3157 info= all_tables[sid].info;
3158#ifndef DBUG_OFF
3159 DBUG_ASSERT(!in_redo_phase ||
3160 current_group_table == NULL || current_group_table == info);
3161 current_group_table= info;
3162#endif
3163 if (info == NULL)
3164 {
3165 tprint(tracef, ", table skipped, so skipping record\n");
3166 return NULL;
3167 }
3168 share= info->s;
3169 tprint(tracef, ", '%s'", share->open_file_name.str);
3170
3171 if (!table_is_part_of_recovery_set(&share->open_file_name))
3172 {
3173 tprint(tracef, ", skipped by user\n");
3174 return NULL;
3175 }
3176
3177 if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
3178 {
3179 tprint(tracef, ", table's LOGREC_FILE_ID has LSN " LSN_FMT " more recent"
3180 " than record, skipping record",
3181 LSN_IN_PARTS(share->lsn_of_file_id));
3182 return NULL;
3183 }
3184 if (in_redo_phase &&
3185 cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
3186 {
3187 /* probably a bulk insert repair */
3188 tprint(tracef, ", has skip_redo_lsn " LSN_FMT " more recent than"
3189 " record, skipping record\n",
3190 LSN_IN_PARTS(share->state.skip_redo_lsn));
3191 return NULL;
3192 }
3193 DBUG_ASSERT(share->last_version != 0);
3194 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
3195 tprint(tracef, ", applying record\n");
3196 return info;
3197}
3198
3199
3200/**
3201 @brief Parses checkpoint record.
3202
3203 Builds from it the dirty_pages list (a hash), opens tables and maps them to
3204 their 2-byte IDs, recreates transactions (not real TRNs though).
3205
3206 @return LSN from where in the log the REDO phase should start
3207 @retval LSN_ERROR error
3208 @retval other ok
3209*/
3210
3211static LSN parse_checkpoint_record(LSN lsn)
3212{
3213 ulong i;
3214 ulonglong nb_dirty_pages;
3215 TRANSLOG_HEADER_BUFFER rec;
3216 TRANSLOG_ADDRESS start_address;
3217 int len;
3218 uint nb_active_transactions, nb_committed_transactions, nb_tables;
3219 uchar *ptr;
3220 LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages;
3221 struct st_dirty_page *next_dirty_page_in_pool;
3222
3223 tprint(tracef, "Loading data from checkpoint record at LSN " LSN_FMT "\n",
3224 LSN_IN_PARTS(lsn));
3225 if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR ||
3226 rec.type != LOGREC_CHECKPOINT)
3227 {
3228 eprint(tracef, "Cannot find checkpoint record at LSN " LSN_FMT,
3229 LSN_IN_PARTS(lsn));
3230 return LSN_ERROR;
3231 }
3232
3233 enlarge_buffer(&rec);
3234 if (log_record_buffer.str == NULL ||
3235 translog_read_record(rec.lsn, 0, rec.record_length,
3236 log_record_buffer.str, NULL) !=
3237 rec.record_length)
3238 {
3239 eprint(tracef, "Failed to read record");
3240 return LSN_ERROR;
3241 }
3242
3243 ptr= log_record_buffer.str;
3244 start_address= lsn_korr(ptr);
3245 ptr+= LSN_STORE_SIZE;
3246 tprint(tracef, "Checkpoint record has start_horizon at " LSN_FMT "\n",
3247 LSN_IN_PARTS(start_address));
3248
3249 /* transactions */
3250 nb_active_transactions= uint2korr(ptr);
3251 ptr+= 2;
3252 tprint(tracef, "%u active transactions\n", nb_active_transactions);
3253 minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
3254 ptr+= LSN_STORE_SIZE;
3255 max_long_trid= transid_korr(ptr);
3256 ptr+= TRANSID_SIZE;
3257
3258 /*
3259 how much brain juice and discussions there was to come to writing this
3260 line. It may make start_address slightly decrease (only by the time it
3261 takes to write one or a few rows, roughly).
3262 */
3263 tprint(tracef, "Checkpoint record has min_rec_lsn of active transactions"
3264 " at " LSN_FMT "\n",
3265 LSN_IN_PARTS(minimum_rec_lsn_of_active_transactions));
3266 set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);
3267
3268 for (i= 0; i < nb_active_transactions; i++)
3269 {
3270 uint16 sid= uint2korr(ptr);
3271 TrID long_id;
3272 LSN undo_lsn, first_undo_lsn;
3273 ptr+= 2;
3274 long_id= uint6korr(ptr);
3275 ptr+= 6;
3276 DBUG_ASSERT(sid > 0 && long_id > 0);
3277 undo_lsn= lsn_korr(ptr);
3278 ptr+= LSN_STORE_SIZE;
3279 first_undo_lsn= lsn_korr(ptr);
3280 ptr+= LSN_STORE_SIZE;
3281 new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
3282 }
3283 nb_committed_transactions= uint4korr(ptr);
3284 ptr+= 4;
3285 tprint(tracef, "%lu committed transactions\n",
3286 (ulong)nb_committed_transactions);
3287 /* no purging => committed transactions are not important */
3288 ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
3289
3290 /* tables */
3291 nb_tables= uint4korr(ptr);
3292 ptr+= 4;
3293 tprint(tracef, "%u open tables\n", nb_tables);
3294 for (i= 0; i< nb_tables; i++)
3295 {
3296 char name[FN_REFLEN];
3297 LSN first_log_write_lsn;
3298 size_t name_len;
3299 uint16 sid= uint2korr(ptr);
3300 ptr+= 2;
3301 DBUG_ASSERT(sid > 0);
3302 first_log_write_lsn= lsn_korr(ptr);
3303 ptr+= LSN_STORE_SIZE;
3304 name_len= strlen((char *)ptr) + 1;
3305 strmake_buf(name, (char *)ptr);
3306 ptr+= name_len;
3307 if (new_table(sid, name, first_log_write_lsn))
3308 return LSN_ERROR;
3309 }
3310
3311 /* dirty pages */
3312 nb_dirty_pages= uint8korr(ptr);
3313
3314 /* Ensure casts later will not loose significant bits. */
3315 DBUG_ASSERT((nb_dirty_pages <= SIZE_T_MAX/sizeof(struct st_dirty_page)) &&
3316 (nb_dirty_pages <= ULONG_MAX));
3317
3318 ptr+= 8;
3319 tprint(tracef, "%lu dirty pages\n", (ulong) nb_dirty_pages);
3320 if (my_hash_init(&all_dirty_pages, &my_charset_bin, (ulong)nb_dirty_pages,
3321 offsetof(struct st_dirty_page, file_and_page_id),
3322 sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
3323 NULL, NULL, 0))
3324 return LSN_ERROR;
3325 dirty_pages_pool=
3326 (struct st_dirty_page *)my_malloc((size_t)nb_dirty_pages *
3327 sizeof(struct st_dirty_page),
3328 MYF(MY_WME));
3329 if (unlikely(dirty_pages_pool == NULL))
3330 return LSN_ERROR;
3331 next_dirty_page_in_pool= dirty_pages_pool;
3332 minimum_rec_lsn_of_dirty_pages= LSN_MAX;
3333 if (maria_recovery_verbose)
3334 tprint(tracef, "Table_id Is_index Page_id Rec_lsn\n");
3335 for (i= 0; i < nb_dirty_pages ; i++)
3336 {
3337 pgcache_page_no_t page_id;
3338 LSN rec_lsn;
3339 uint32 is_index;
3340 uint16 table_id= uint2korr(ptr);
3341 ptr+= 2;
3342 is_index= ptr[0];
3343 ptr++;
3344 page_id= page_korr(ptr);
3345 ptr+= PAGE_STORE_SIZE;
3346 rec_lsn= lsn_korr(ptr);
3347 ptr+= LSN_STORE_SIZE;
3348 if (new_page((is_index << 16) | table_id,
3349 page_id, rec_lsn, next_dirty_page_in_pool++))
3350 return LSN_ERROR;
3351 if (maria_recovery_verbose)
3352 tprint(tracef, "%8u %8u %12lu " LSN_FMT "\n", (uint) table_id,
3353 (uint) is_index, (ulong) page_id, LSN_IN_PARTS(rec_lsn));
3354 set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
3355 }
3356 /* after that, there will be no insert/delete into the hash */
3357 /*
3358 sanity check on record (did we screw up with all those "ptr+=", did the
3359 checkpoint write code and checkpoint read code go out of sync?).
3360 */
3361 if (ptr != (log_record_buffer.str + log_record_buffer.length))
3362 {
3363 eprint(tracef, "checkpoint record corrupted\n");
3364 return LSN_ERROR;
3365 }
3366
3367 /*
3368 start_address is now from where the dirty pages list can be ignored.
3369 Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
3370 translog_read_record() functions.
3371 */
3372 start_address= checkpoint_start=
3373 translog_next_LSN(start_address, LSN_IMPOSSIBLE);
3374 tprint(tracef, "Checkpoint record start_horizon now adjusted to"
3375 " LSN " LSN_FMT "\n", LSN_IN_PARTS(start_address));
3376 if (checkpoint_start == LSN_IMPOSSIBLE)
3377 {
3378 /*
3379 There must be a problem, as our checkpoint record exists and is >= the
3380 address which is stored in its first bytes, which is >= start_address.
3381 */
3382 return LSN_ERROR;
3383 }
3384 /* now, where the REDO phase should start reading log: */
3385 tprint(tracef, "Checkpoint has min_rec_lsn of dirty pages at"
3386 " LSN " LSN_FMT "\n", LSN_IN_PARTS(minimum_rec_lsn_of_dirty_pages));
3387 set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
3388 DBUG_PRINT("info",
3389 ("checkpoint_start: " LSN_FMT " start_address: " LSN_FMT,
3390 LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address)));
3391 return start_address;
3392}
3393
3394
3395static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
3396 struct st_dirty_page *dirty_page)
3397{
3398 /* serves as hash key */
3399 dirty_page->file_and_page_id= (((uint64)fileid) << 40) | pageid;
3400 dirty_page->rec_lsn= rec_lsn;
3401 return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
3402}
3403
3404
3405static int close_all_tables(void)
3406{
3407 int error= 0;
3408 uint count= 0;
3409 LIST *list_element, *next_open;
3410 MARIA_HA *info;
3411 TRANSLOG_ADDRESS addr;
3412 DBUG_ENTER("close_all_tables");
3413
3414 mysql_mutex_lock(&THR_LOCK_maria);
3415 if (maria_open_list == NULL)
3416 goto end;
3417 tprint(tracef, "Closing all tables\n");
3418 if (tracef != stdout)
3419 {
3420 if (recovery_message_printed == REC_MSG_NONE)
3421 print_preamble();
3422 for (count= 0, list_element= maria_open_list ;
3423 list_element ; count++, (list_element= list_element->next))
3424 ;
3425 fprintf(stderr, "tables to flush:");
3426 recovery_message_printed= REC_MSG_FLUSH;
3427 }
3428 /*
3429 Since the end of end_of_redo_phase(), we may have written new records
3430 (if UNDO phase ran) and thus the state is newer than at
3431 end_of_redo_phase(), we need to bump is_of_horizon again.
3432 */
3433 addr= translog_get_horizon();
3434 for (list_element= maria_open_list ; ; list_element= next_open)
3435 {
3436 if (recovery_message_printed == REC_MSG_FLUSH)
3437 {
3438 fprintf(stderr, " %u", count--);
3439 fflush(stderr);
3440 }
3441 if (list_element == NULL)
3442 break;
3443 next_open= list_element->next;
3444 info= (MARIA_HA*)list_element->data;
3445 mysql_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
3446 /*
3447 Tables which we see here are exactly those which were open at time of
3448 crash. They might have open_count>0 as Checkpoint maybe flushed their
3449 state while they were used. As Recovery corrected them, don't alarm the
3450 user, don't ask for a table check:
3451 */
3452 if (info->s->state.open_count != 0)
3453 {
3454 /* let maria_close() mark the table properly closed */
3455 info->s->state.open_count= 1;
3456 info->s->global_changed= 1;
3457 info->s->changed= 1;
3458 }
3459 prepare_table_for_close(info, addr);
3460 error|= maria_close(info);
3461 mysql_mutex_lock(&THR_LOCK_maria);
3462
3463 /* Force a crash to test recovery of recovery */
3464 if (maria_recovery_force_crash_counter)
3465 {
3466 DBUG_ASSERT(--maria_recovery_force_crash_counter > 0);
3467 }
3468 }
3469end:
3470 mysql_mutex_unlock(&THR_LOCK_maria);
3471 DBUG_RETURN(error);
3472}
3473
3474
3475/**
3476 @brief Close all table instances with a certain name which are present in
3477 all_tables.
3478
3479 @param name Name of table
3480 @param addr Log address passed to prepare_table_for_close()
3481*/
3482
3483static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
3484{
3485 my_bool res= 0;
3486 /* There are no other threads using the tables, so we don't need any locks */
3487 struct st_table_for_recovery *internal_table, *end;
3488 for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
3489 internal_table < end ;
3490 internal_table++)
3491 {
3492 MARIA_HA *info= internal_table->info;
3493 if ((info != NULL) && !strcmp(info->s->open_file_name.str, name))
3494 {
3495 prepare_table_for_close(info, addr);
3496 if (maria_close(info))
3497 res= 1;
3498 internal_table->info= NULL;
3499 }
3500 }
3501 return res;
3502}
3503
3504
3505/**
3506 Temporarily disables logging for this table.
3507
3508 If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log
3509 to warn log readers.
3510
3511 @param info table
3512 @param log_incomplete if that disabling makes the log incomplete
3513
3514 @note for example in the REDO phase we disable logging but that does not
3515 make the log incomplete.
3516*/
3517
3518void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
3519 my_bool log_incomplete)
3520{
3521 MARIA_SHARE *share= info->s;
3522 DBUG_ENTER("_ma_tmp_disable_logging_for_table");
3523
3524 /*
3525 We have to ensure that bitmap is flushed, as it's checking
3526 that share->now_transactional is set
3527 */
3528 if (share->now_transactional && share->data_file_type == BLOCK_RECORD)
3529 _ma_bitmap_flush_all(share);
3530
3531 if (log_incomplete)
3532 {
3533 uchar log_data[FILEID_STORE_SIZE];
3534 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
3535 LSN lsn;
3536 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
3537 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
3538 translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
3539 &dummy_transaction_object, info,
3540 (translog_size_t) sizeof(log_data),
3541 TRANSLOG_INTERNAL_PARTS + 1, log_array,
3542 log_data, NULL);
3543 }
3544
3545 /* if we disabled before writing the record, record wouldn't reach log */
3546 share->now_transactional= FALSE;
3547
3548 /*
3549 Reset state pointers. This is needed as in ALTER table we may do
3550 commit followed by _ma_renable_logging_for_table and then
3551 info->state may point to a state that was deleted by
3552 _ma_trnman_end_trans_hook()
3553 */
3554 share->state.common= *info->state;
3555 info->state= &share->state.common;
3556 info->switched_transactional= TRUE;
3557
3558 /*
3559 Some code in ma_blockrec.c assumes a trn even if !now_transactional but in
3560 this case it only reads trn->rec_lsn, which has to be LSN_IMPOSSIBLE and
3561 should be now. info->trn may be NULL in maria_chk.
3562 */
3563 if (info->trn == NULL)
3564 info->trn= &dummy_transaction_object;
3565 DBUG_ASSERT(info->trn->rec_lsn == LSN_IMPOSSIBLE);
3566 share->page_type= PAGECACHE_PLAIN_PAGE;
3567 /* Functions below will pick up now_transactional and change callbacks */
3568 _ma_set_data_pagecache_callbacks(&info->dfile, share);
3569 _ma_set_index_pagecache_callbacks(&share->kfile, share);
3570 _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
3571 DBUG_VOID_RETURN;
3572}
3573
3574
3575/**
3576 Re-enables logging for a table which had it temporarily disabled.
3577
3578 Only the thread which disabled logging is allowed to reenable it. Indeed,
3579 re-enabling logging affects all open instances, one must have exclusive
3580 access to the table to do that. In practice, the one which disables has
3581 such access.
3582
3583 @param info table
3584 @param flush_pages if function needs to flush pages first
3585*/
3586
3587my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages)
3588{
3589 MARIA_SHARE *share= info->s;
3590 DBUG_ENTER("_ma_reenable_logging_for_table");
3591
3592 if (share->now_transactional == share->base.born_transactional ||
3593 !info->switched_transactional)
3594 {
3595 info->switched_transactional= FALSE;
3596 DBUG_RETURN(0);
3597 }
3598 info->switched_transactional= FALSE;
3599
3600 if ((share->now_transactional= share->base.born_transactional))
3601 {
3602 share->page_type= PAGECACHE_LSN_PAGE;
3603
3604 /*
3605 Copy state information that where updated while the table was used
3606 in not transactional mode
3607 */
3608 _ma_copy_nontrans_state_information(info);
3609 _ma_reset_history(info->s);
3610
3611 if (flush_pages)
3612 {
3613 /* Ensure that recover is not executing any redo before this */
3614 if (!maria_in_recovery)
3615 share->state.is_of_horizon= share->state.create_rename_lsn=
3616 share->state.skip_redo_lsn= translog_get_horizon();
3617 /*
3618 We are going to change callbacks; if a page is flushed at this moment
3619 this can cause race conditions, that's one reason to flush pages
3620 now. Other reasons: a checkpoint could be running and miss pages; the
3621 pages have type PAGECACHE_PLAIN_PAGE which should not remain. As
3622 there are no REDOs for pages, them, bitmaps and the state also have to
3623 be flushed and synced.
3624 */
3625 if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
3626 FLUSH_RELEASE, FLUSH_RELEASE) ||
3627 _ma_state_info_write(share,
3628 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
3629 MA_STATE_INFO_WRITE_LOCK) ||
3630 _ma_sync_table_files(info))
3631 DBUG_RETURN(1);
3632 }
3633 else if (!maria_in_recovery)
3634 {
3635 /*
3636 Except in Recovery, we mustn't leave dirty pages (see comments above).
3637 Note that this does not verify that the state was flushed, but hey.
3638 */
3639 pagecache_file_no_dirty_page(share->pagecache, &info->dfile);
3640 pagecache_file_no_dirty_page(share->pagecache, &share->kfile);
3641 }
3642 _ma_set_data_pagecache_callbacks(&info->dfile, share);
3643 _ma_set_index_pagecache_callbacks(&share->kfile, share);
3644 _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
3645 /*
3646 info->trn was not changed in the disable/enable combo, so that it's
3647 still usable in this kind of combination:
3648 external_lock;
3649 start_bulk_insert; # table is empty, disables logging
3650 end_bulk_insert; # enables logging
3651 start_bulk_insert; # table is not empty, logging stays
3652 # so rows insertion needs the real trn.
3653 as happens during row-based replication on the slave.
3654 */
3655 }
3656 DBUG_RETURN(0);
3657}
3658
3659
3660static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
3661{
3662 static uint end_logno= FILENO_IMPOSSIBLE, percentage_printed= 0;
3663 static ulong end_offset;
3664 static ulonglong initial_remainder= ~(ulonglong) 0;
3665
3666 uint cur_logno;
3667 ulong cur_offset;
3668 ulonglong local_remainder;
3669 uint percentage_done;
3670
3671 if (tracef == stdout)
3672 return;
3673 if (recovery_message_printed == REC_MSG_NONE)
3674 {
3675 print_preamble();
3676 fprintf(stderr, "recovered pages: 0%%");
3677 fflush(stderr);
3678 procent_printed= 1;
3679 recovery_message_printed= REC_MSG_REDO;
3680 }
3681 if (end_logno == FILENO_IMPOSSIBLE)
3682 {
3683 LSN end_addr= translog_get_horizon();
3684 end_logno= LSN_FILE_NO(end_addr);
3685 end_offset= LSN_OFFSET(end_addr);
3686 }
3687 cur_logno= LSN_FILE_NO(addr);
3688 cur_offset= LSN_OFFSET(addr);
3689 local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) :
3690 (((longlong)log_file_size) - cur_offset +
3691 MY_MAX(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) +
3692 end_offset);
3693 if (initial_remainder == (ulonglong)(-1))
3694 initial_remainder= local_remainder;
3695 percentage_done= (uint) ((initial_remainder - local_remainder) * 100ULL /
3696 initial_remainder);
3697 if ((percentage_done - percentage_printed) >= 10)
3698 {
3699 percentage_printed= percentage_done;
3700 fprintf(stderr, " %u%%", percentage_done);
3701 fflush(stderr);
3702 procent_printed= 1;
3703 }
3704}
3705
3706
3707#ifdef MARIA_EXTERNAL_LOCKING
3708#error Marias Checkpoint and Recovery are really not ready for it
3709#endif
3710
3711/*
3712Recovery of the state : how it works
3713=====================================
3714
3715Here we ignore Checkpoints for a start.
3716
3717The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in
3718memory frequently (at least at every row write/update/delete) but goes
3719to disk at few moments: maria_close() when closing the last open
3720instance, and a few rare places like CHECK/REPAIR/ALTER
3721(non-transactional tables also do it at maria_lock_database() but we
3722needn't cover them here).
3723
3724In case of crash, state on disk is likely to be older than what it was
3725in memory, the REDO phase needs to recreate the state as it was in
3726memory at the time of crash. When we say Recovery here we will always
3727mean "REDO phase".
3728
3729For example MARIA_STATUS_INFO::records (count of records). It is updated at
3730the end of every row write/update/delete/delete_all. When Recovery sees the
3731sign of such row operation (UNDO or REDO), it may need to update the records'
3732count if that count does not reflect that operation (is older). How to know
3733the age of the state compared to the log record: every time the state
3734goes to disk at runtime, its member "is_of_horizon" is updated to the
3735current end-of-log horizon. So Recovery just needs to compare is_of_horizon
3736and the record's LSN to know if it should modify "records".
3737
3738Other operations like ALTER TABLE DISABLE KEYS update the state but
3739don't write log records, thus the REDO phase cannot repeat their
3740effect on the state in case of crash. But we make them sync the state
3741as soon as they have finished. This reduces the window for a problem.
3742
3743It looks like only one thread at a time updates the state in memory or
3744on disk. We assume that the upper level (normally MySQL) has protection
3745against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these
3746are not issued while there are any running transactions on the given table.
3747If this is not done, we may write a corrupted state to disk.
3748
3749With checkpoints
3750================
3751
3752Checkpoint module needs to read the state in memory and write it to
3753disk. This may happen while some other thread is modifying the state
3754in memory or on disk. Checkpoint thus may be reading changing data, it
3755needs a mutex to not have it corrupted, and concurrent modifiers of
3756the state need that mutex too for the same reason.
3757"records" is modified for every row write/update/delete, we don't want
3758to add a mutex lock/unlock there. So we re-use the mutex lock/unlock
3759which is already present in these moments, namely the log's mutex which is
3760taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in
3761under-log-mutex hooks when writing these records (thus "records" is
3762not updated at the end of maria_write/update/delete() anymore).
3763Thus Checkpoint takes the log's lock and can read "records" from
3764memory an write it to disk and release log's lock.
3765We however want to avoid having the disk write under the log's
3766lock. So it has to be under another mutex, natural choice is
3767intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile,
3768and as maria_close() takes it too). All state writes to disk are
3769changed to be protected with intern_lock.
3770So Checkpoint takes intern_lock, log's lock, reads "records" from
3771memory, releases log's lock, updates is_of_horizon and writes "records" to
3772disk, release intern_lock.
3773In practice, not only "records" needs to be written but the full
3774state. So, Checkpoint reads the full state from memory. Some other
3775thread may at this moment be modifying in memory some pieces of the
3776state which are not protected by the lock's log (see ma_extra.c
3777HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state
3778from memory; to guard against that we extend the intern_lock-zone to
3779changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and
3780also any change made in memory to create_rename_lsn/state_is_of_horizon.
3781Last, we don't want in Checkpoint to do
3782 log lock; read state from memory; release log lock;
3783for each table, it may hold the log's lock too much in total.
3784So, we instead do
3785 log lock; read N states from memory; release log lock;
3786Thus, the sequence above happens outside of any intern_lock.
3787But this re-introduces the problem that some other thread may be changing the
3788state in memory and on disk under intern_lock, without log's lock, like
3789HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later
3790comes to handling the table under intern_lock, which is serialized with
3791HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state
3792was read from memory under log's lock, and thus can decide to not flush the
3793obsolete state it has, knowing that the other thread flushed a more recent
3794state already. If on the other hand is_of_horizon is not higher, the read
3795state is current and can be flushed. So we have a per-table sequence:
3796 lock intern_lock; test if is_of_horizon is higher than when we read the state
3797 under log's lock; if no then flush the read state to disk.
3798*/
3799
3800/* some comments and pseudo-code which we keep for later */
3801#if 0
3802 /*
3803 MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
3804 after a certain amount of log records have been executed. This helps
3805 against repeated crashes. Those checkpoints could not be user-requested
3806 (as engine is not communicating during the REDO phase), so they would be
3807 automatic: this changes the original assumption that we don't write to the
3808 log while in the REDO phase, but why not. How often should we checkpoint?
3809 */
3810
3811 /*
3812 We want to have two steps:
3813 engine->recover_with_max_memory();
3814 next_engine->recover_with_max_memory();
3815 engine->init_with_normal_memory();
3816 next_engine->init_with_normal_memory();
3817 So: in recover_with_max_memory() allocate a giant page cache, do REDO
3818 phase, then all page cache is flushed and emptied and freed (only retain
3819 small structures like TM): take full checkpoint, which is useful if
3820 next engine crashes in its recovery the next second.
3821 Destroy all shares (maria_close()), then at init_with_normal_memory() we
3822 do this:
3823 */
3824
3825 /**** UNDO PHASE *****/
3826
3827 /*
3828 Launch one or more threads to do the background rollback. Don't wait for
3829 them to complete their rollback (background rollback; for debugging, we
3830 can have an option which waits). Set a counter (total_of_rollback_threads)
3831 to the number of threads to lauch.
3832
3833 Note that InnoDB's rollback-in-background works as long as InnoDB is the
3834 last engine to recover, otherwise MySQL will refuse new connections until
3835 the last engine has recovered so it's not "background" from the user's
3836 point of view. InnoDB is near top of sys_table_types so all others
3837 (e.g. BDB) recover after it... So it's really "online rollback" only if
3838 InnoDB is the only engine.
3839 */
3840
3841 /* wake up delete/update handler */
3842 /* tell the TM that it can now accept new transactions */
3843
3844 /*
3845 mark that checkpoint requests are now allowed.
3846 */
3847#endif
3848