1/*****************************************************************************
2
3Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2012, Facebook Inc.
5Copyright (c) 2013, 2018, MariaDB Corporation.
6
7This program is free software; you can redistribute it and/or modify it under
8the terms of the GNU General Public License as published by the Free Software
9Foundation; version 2 of the License.
10
11This program is distributed in the hope that it will be useful, but WITHOUT
12ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License along with
16this program; if not, write to the Free Software Foundation, Inc.,
1751 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
18
19*****************************************************************************/
20
21/**************************************************//**
22@file log/log0recv.cc
23Recovery
24
25Created 9/20/1997 Heikki Tuuri
26*******************************************************/
27
28#include "ha_prototypes.h"
29
30#include <vector>
31#include <map>
32#include <string>
33#include <my_service_manager.h>
34
35#include "log0recv.h"
36
37#ifdef HAVE_MY_AES_H
38#include <my_aes.h>
39#endif
40
41#include "log0crypt.h"
42#include "mem0mem.h"
43#include "buf0buf.h"
44#include "buf0flu.h"
45#include "mtr0mtr.h"
46#include "mtr0log.h"
47#include "page0cur.h"
48#include "page0zip.h"
49#include "btr0btr.h"
50#include "btr0cur.h"
51#include "ibuf0ibuf.h"
52#include "trx0undo.h"
53#include "trx0rec.h"
54#include "fil0fil.h"
55#include "fsp0sysspace.h"
56#include "ut0new.h"
57#include "row0trunc.h"
58#include "buf0rea.h"
59#include "srv0srv.h"
60#include "srv0start.h"
61#include "trx0roll.h"
62#include "row0merge.h"
63
64/** Log records are stored in the hash table in chunks at most of this size;
65this must be less than srv_page_size as it is stored in the buffer pool */
66#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t))
67
68/** Read-ahead area in applying log records to file pages */
69#define RECV_READ_AHEAD_AREA 32
70
71/** The recovery system */
72recv_sys_t* recv_sys;
73/** TRUE when applying redo log records during crash recovery; FALSE
74otherwise. Note that this is FALSE while a background thread is
75rolling back incomplete transactions. */
76volatile bool recv_recovery_on;
77
78/** TRUE when recv_init_crash_recovery() has been called. */
79bool recv_needed_recovery;
80#ifdef UNIV_DEBUG
81/** TRUE if writing to the redo log (mtr_commit) is forbidden.
82Protected by log_sys.mutex. */
83bool recv_no_log_write = false;
84#endif /* UNIV_DEBUG */
85
86/** TRUE if buf_page_is_corrupted() should check if the log sequence
87number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by
88recv_recovery_from_checkpoint_start(). */
89bool recv_lsn_checks_on;
90
91/** If the following is TRUE, the buffer pool file pages must be invalidated
92after recovery and no ibuf operations are allowed; this becomes TRUE if
93the log record hash table becomes too full, and log records must be merged
94to file pages already before the recovery is finished: in this case no
95ibuf operations are allowed, as they could modify the pages read in the
96buffer pool before the pages have been recovered to the up-to-date state.
97
98TRUE means that recovery is running and no operations on the log files
99are allowed yet: the variable name is misleading. */
100bool recv_no_ibuf_operations;
101
102/** The type of the previous parsed redo log record */
103static mlog_id_t recv_previous_parsed_rec_type;
104/** The offset of the previous parsed redo log record */
105static ulint recv_previous_parsed_rec_offset;
106/** The 'multi' flag of the previous parsed redo log record */
107static ulint recv_previous_parsed_rec_is_multi;
108
109/** This many frames must be left free in the buffer pool when we scan
110the log and store the scanned log records in the buffer pool: we will
111use these free frames to read in pages when we start applying the
112log records to the database.
113This is the default value. If the actual size of the buffer pool is
114larger than 10 MB we'll set this value to 512. */
115ulint recv_n_pool_free_frames;
116
117/** The maximum lsn we see for a page during the recovery process. If this
118is bigger than the lsn we are able to scan up to, that is an indication that
119the recovery failed and the database may be corrupt. */
120static lsn_t recv_max_page_lsn;
121
122#ifdef UNIV_PFS_THREAD
123mysql_pfs_key_t trx_rollback_clean_thread_key;
124mysql_pfs_key_t recv_writer_thread_key;
125#endif /* UNIV_PFS_THREAD */
126
127/** Is recv_writer_thread active? */
128bool recv_writer_thread_active;
129
130#ifndef DBUG_OFF
131/** Return string name of the redo log record type.
132@param[in] type record log record enum
133@return string name of record log record */
134const char*
135get_mlog_string(mlog_id_t type);
136#endif /* !DBUG_OFF */
137
138/** Tablespace item during recovery */
139struct file_name_t {
140 /** Tablespace file name (MLOG_FILE_NAME) */
141 std::string name;
142 /** Tablespace object (NULL if not valid or not found) */
143 fil_space_t* space;
144
145 /** Tablespace status. */
146 enum fil_status {
147 /** Normal tablespace */
148 NORMAL,
149 /** Deleted tablespace */
150 DELETED,
151 /** Missing tablespace */
152 MISSING
153 };
154
155 /** Status of the tablespace */
156 fil_status status;
157
158 /** Constructor */
159 file_name_t(std::string name_, bool deleted) :
160 name(name_), space(NULL), status(deleted ? DELETED: NORMAL) {}
161};
162
163/** Map of dirty tablespaces during recovery */
164typedef std::map<
165 ulint,
166 file_name_t,
167 std::less<ulint>,
168 ut_allocator<std::pair<const ulint, file_name_t> > > recv_spaces_t;
169
170static recv_spaces_t recv_spaces;
171
172/** Backup function checks whether the space id belongs to
173the skip table list given in the mariabackup option. */
174bool(*check_if_backup_includes)(ulint space_id);
175
176/** Process a file name from a MLOG_FILE_* record.
177@param[in,out] name file name
178@param[in] len length of the file name
179@param[in] space_id the tablespace ID
180@param[in] deleted whether this is a MLOG_FILE_DELETE record
181@retval true if able to process file successfully.
182@retval false if unable to process the file */
183static
184bool
185fil_name_process(
186 char* name,
187 ulint len,
188 ulint space_id,
189 bool deleted)
190{
191 if (srv_operation == SRV_OPERATION_BACKUP) {
192 return true;
193 }
194
195 ut_ad(srv_operation == SRV_OPERATION_NORMAL
196 || srv_operation == SRV_OPERATION_RESTORE
197 || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
198
199 bool processed = true;
200
201 /* We will also insert space=NULL into the map, so that
202 further checks can ensure that a MLOG_FILE_NAME record was
203 scanned before applying any page records for the space_id. */
204
205 os_normalize_path(name);
206 file_name_t fname(std::string(name, len - 1), deleted);
207 std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.insert(
208 std::make_pair(space_id, fname));
209 ut_ad(p.first->first == space_id);
210
211 file_name_t& f = p.first->second;
212
213 if (deleted) {
214 /* Got MLOG_FILE_DELETE */
215
216 if (!p.second && f.status != file_name_t::DELETED) {
217 f.status = file_name_t::DELETED;
218 if (f.space != NULL) {
219 fil_space_free(space_id, false);
220 f.space = NULL;
221 }
222 }
223
224 ut_ad(f.space == NULL);
225 } else if (p.second // the first MLOG_FILE_NAME or MLOG_FILE_RENAME2
226 || f.name != fname.name) {
227 fil_space_t* space;
228
229 /* Check if the tablespace file exists and contains
230 the space_id. If not, ignore the file after displaying
231 a note. Abort if there are multiple files with the
232 same space_id. */
233 switch (fil_ibd_load(space_id, name, space)) {
234 case FIL_LOAD_OK:
235 ut_ad(space != NULL);
236
237 if (f.space == NULL || f.space == space) {
238 f.name = fname.name;
239 f.space = space;
240 f.status = file_name_t::NORMAL;
241 } else {
242 ib::error() << "Tablespace " << space_id
243 << " has been found in two places: '"
244 << f.name << "' and '" << name << "'."
245 " You must delete one of them.";
246 recv_sys->found_corrupt_fs = true;
247 processed = false;
248 }
249 break;
250
251 case FIL_LOAD_ID_CHANGED:
252 ut_ad(space == NULL);
253 break;
254
255 case FIL_LOAD_NOT_FOUND:
256 /* No matching tablespace was found; maybe it
257 was renamed, and we will find a subsequent
258 MLOG_FILE_* record. */
259 ut_ad(space == NULL);
260
261 if (srv_force_recovery) {
262 /* Without innodb_force_recovery,
263 missing tablespaces will only be
264 reported in
265 recv_init_crash_recovery_spaces().
266 Enable some more diagnostics when
267 forcing recovery. */
268
269 ib::info()
270 << "At LSN: " << recv_sys->recovered_lsn
271 << ": unable to open file " << name
272 << " for tablespace " << space_id;
273 }
274 break;
275
276 case FIL_LOAD_INVALID:
277 ut_ad(space == NULL);
278 if (srv_force_recovery == 0) {
279 ib::warn() << "We do not continue the crash"
280 " recovery, because the table may"
281 " become corrupt if we cannot apply"
282 " the log records in the InnoDB log to"
283 " it. To fix the problem and start"
284 " mysqld:";
285 ib::info() << "1) If there is a permission"
286 " problem in the file and mysqld"
287 " cannot open the file, you should"
288 " modify the permissions.";
289 ib::info() << "2) If the tablespace is not"
290 " needed, or you can restore an older"
291 " version from a backup, then you can"
292 " remove the .ibd file, and use"
293 " --innodb_force_recovery=1 to force"
294 " startup without this file.";
295 ib::info() << "3) If the file system or the"
296 " disk is broken, and you cannot"
297 " remove the .ibd file, you can set"
298 " --innodb_force_recovery.";
299 recv_sys->found_corrupt_fs = true;
300 processed = false;
301 break;
302 }
303
304 ib::info() << "innodb_force_recovery was set to "
305 << srv_force_recovery << ". Continuing crash"
306 " recovery even though we cannot access the"
307 " files for tablespace " << space_id << ".";
308 break;
309 }
310 }
311 return(processed);
312}
313
314/** Parse or process a MLOG_FILE_* record.
315@param[in] ptr redo log record
316@param[in] end end of the redo log buffer
317@param[in] space_id the tablespace ID
318@param[in] first_page_no first page number in the file
319@param[in] type MLOG_FILE_NAME or MLOG_FILE_DELETE
320or MLOG_FILE_CREATE2 or MLOG_FILE_RENAME2
321@param[in] apply whether to apply the record
322@return pointer to next redo log record
323@retval NULL if this log record was truncated */
324static
325byte*
326fil_name_parse(
327 byte* ptr,
328 const byte* end,
329 ulint space_id,
330 ulint first_page_no,
331 mlog_id_t type,
332 bool apply)
333{
334 if (type == MLOG_FILE_CREATE2) {
335 if (end < ptr + 4) {
336 return(NULL);
337 }
338 ptr += 4;
339 }
340
341 if (end < ptr + 2) {
342 return(NULL);
343 }
344
345 ulint len = mach_read_from_2(ptr);
346 ptr += 2;
347 if (end < ptr + len) {
348 return(NULL);
349 }
350
351 /* MLOG_FILE_* records should only be written for
352 user-created tablespaces. The name must be long enough
353 and end in .ibd. */
354 bool corrupt = is_predefined_tablespace(space_id)
355 || first_page_no != 0 // TODO: multi-file user tablespaces
356 || len < sizeof "/a.ibd\0"
357 || memcmp(ptr + len - 5, DOT_IBD, 5) != 0
358 || memchr(ptr, OS_PATH_SEPARATOR, len) == NULL;
359
360 byte* end_ptr = ptr + len;
361
362 switch (type) {
363 default:
364 ut_ad(0); // the caller checked this
365 case MLOG_FILE_NAME:
366 if (corrupt) {
367 ib::error() << "MLOG_FILE_NAME incorrect:" << ptr;
368 recv_sys->found_corrupt_log = true;
369 break;
370 }
371
372 fil_name_process(
373 reinterpret_cast<char*>(ptr), len, space_id, false);
374 break;
375 case MLOG_FILE_DELETE:
376 if (corrupt) {
377 ib::error() << "MLOG_FILE_DELETE incorrect:" << ptr;
378 recv_sys->found_corrupt_log = true;
379 break;
380 }
381
382 fil_name_process(
383 reinterpret_cast<char*>(ptr), len, space_id, true);
384
385 break;
386 case MLOG_FILE_CREATE2:
387 break;
388 case MLOG_FILE_RENAME2:
389 if (corrupt) {
390 ib::error() << "MLOG_FILE_RENAME2 incorrect:" << ptr;
391 recv_sys->found_corrupt_log = true;
392 }
393
394 /* The new name follows the old name. */
395 byte* new_name = end_ptr + 2;
396 if (end < new_name) {
397 return(NULL);
398 }
399
400 ulint new_len = mach_read_from_2(end_ptr);
401
402 if (end < end_ptr + 2 + new_len) {
403 return(NULL);
404 }
405
406 end_ptr += 2 + new_len;
407
408 corrupt = corrupt
409 || new_len < sizeof "/a.ibd\0"
410 || memcmp(new_name + new_len - 5, DOT_IBD, 5) != 0
411 || !memchr(new_name, OS_PATH_SEPARATOR, new_len);
412
413 if (corrupt) {
414 ib::error() << "MLOG_FILE_RENAME2 new_name incorrect:" << ptr
415 << " new_name: " << new_name;
416 recv_sys->found_corrupt_log = true;
417 break;
418 }
419
420 fil_name_process(
421 reinterpret_cast<char*>(ptr), len,
422 space_id, false);
423 fil_name_process(
424 reinterpret_cast<char*>(new_name), new_len,
425 space_id, false);
426
427 if (!apply) {
428 break;
429 }
430 if (!fil_op_replay_rename(
431 space_id, first_page_no,
432 reinterpret_cast<const char*>(ptr),
433 reinterpret_cast<const char*>(new_name))) {
434 recv_sys->found_corrupt_fs = true;
435 }
436 }
437
438 return(end_ptr);
439}
440
441/** Clean up after recv_sys_init() */
442void
443recv_sys_close()
444{
445 if (recv_sys != NULL) {
446 recv_sys->dblwr.pages.clear();
447
448 if (recv_sys->addr_hash != NULL) {
449 hash_table_free(recv_sys->addr_hash);
450 }
451
452 if (recv_sys->heap != NULL) {
453 mem_heap_free(recv_sys->heap);
454 }
455
456 if (recv_sys->flush_start != NULL) {
457 os_event_destroy(recv_sys->flush_start);
458 }
459
460 if (recv_sys->flush_end != NULL) {
461 os_event_destroy(recv_sys->flush_end);
462 }
463
464 if (recv_sys->buf != NULL) {
465 ut_free_dodump(recv_sys->buf, recv_sys->buf_size);
466 }
467
468 ut_ad(!recv_writer_thread_active);
469 mutex_free(&recv_sys->writer_mutex);
470
471 mutex_free(&recv_sys->mutex);
472
473 ut_free(recv_sys);
474 recv_sys = NULL;
475 }
476
477 recv_spaces.clear();
478}
479
480/************************************************************
481Reset the state of the recovery system variables. */
482void
483recv_sys_var_init(void)
484/*===================*/
485{
486 recv_recovery_on = false;
487 recv_needed_recovery = false;
488 recv_lsn_checks_on = false;
489 recv_no_ibuf_operations = false;
490 recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
491 recv_previous_parsed_rec_offset = 0;
492 recv_previous_parsed_rec_is_multi = 0;
493 recv_n_pool_free_frames = 256;
494 recv_max_page_lsn = 0;
495}
496
497/******************************************************************//**
498recv_writer thread tasked with flushing dirty pages from the buffer
499pools.
500@return a dummy parameter */
501extern "C"
502os_thread_ret_t
503DECLARE_THREAD(recv_writer_thread)(
504/*===============================*/
505 void* arg MY_ATTRIBUTE((unused)))
506 /*!< in: a dummy parameter required by
507 os_thread_create */
508{
509 my_thread_init();
510 ut_ad(!srv_read_only_mode);
511
512#ifdef UNIV_PFS_THREAD
513 pfs_register_thread(recv_writer_thread_key);
514#endif /* UNIV_PFS_THREAD */
515
516#ifdef UNIV_DEBUG_THREAD_CREATION
517 ib::info() << "recv_writer thread running, id "
518 << os_thread_pf(os_thread_get_curr_id());
519#endif /* UNIV_DEBUG_THREAD_CREATION */
520
521 while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
522
523 /* Wait till we get a signal to clean the LRU list.
524 Bounded by max wait time of 100ms. */
525 int64_t sig_count = os_event_reset(buf_flush_event);
526 os_event_wait_time_low(buf_flush_event, 100000, sig_count);
527
528 mutex_enter(&recv_sys->writer_mutex);
529
530 if (!recv_recovery_on) {
531 mutex_exit(&recv_sys->writer_mutex);
532 break;
533 }
534
535 /* Flush pages from end of LRU if required */
536 os_event_reset(recv_sys->flush_end);
537 recv_sys->flush_type = BUF_FLUSH_LRU;
538 os_event_set(recv_sys->flush_start);
539 os_event_wait(recv_sys->flush_end);
540
541 mutex_exit(&recv_sys->writer_mutex);
542 }
543
544 recv_writer_thread_active = false;
545
546 my_thread_end();
547 /* We count the number of threads in os_thread_exit().
548 A created thread should always use that to exit and not
549 use return() to exit. */
550 os_thread_exit();
551
552 OS_THREAD_DUMMY_RETURN;
553}
554
555/** Initialize the redo log recovery subsystem. */
556void
557recv_sys_init()
558{
559 ut_ad(recv_sys == NULL);
560
561 recv_sys = static_cast<recv_sys_t*>(ut_zalloc_nokey(sizeof(*recv_sys)));
562
563 mutex_create(LATCH_ID_RECV_SYS, &recv_sys->mutex);
564 mutex_create(LATCH_ID_RECV_WRITER, &recv_sys->writer_mutex);
565
566 recv_sys->heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
567
568 if (!srv_read_only_mode) {
569 recv_sys->flush_start = os_event_create(0);
570 recv_sys->flush_end = os_event_create(0);
571 }
572
573 ulint size = buf_pool_get_curr_size();
574 /* Set appropriate value of recv_n_pool_free_frames. */
575 if (size >= 10 << 20) {
576 /* Buffer pool of size greater than 10 MB. */
577 recv_n_pool_free_frames = 512;
578 }
579
580 recv_sys->buf = static_cast<byte*>(
581 ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
582 recv_sys->buf_size = RECV_PARSING_BUF_SIZE;
583
584 recv_sys->addr_hash = hash_create(size / 512);
585 recv_sys->progress_time = ut_time();
586 recv_max_page_lsn = 0;
587
588 /* Call the constructor for recv_sys_t::dblwr member */
589 new (&recv_sys->dblwr) recv_dblwr_t();
590}
591
592/** Empty a fully processed hash table. */
593static
594void
595recv_sys_empty_hash()
596{
597 ut_ad(mutex_own(&(recv_sys->mutex)));
598 ut_a(recv_sys->n_addrs == 0);
599
600 hash_table_free(recv_sys->addr_hash);
601 mem_heap_empty(recv_sys->heap);
602
603 recv_sys->addr_hash = hash_create(buf_pool_get_curr_size() / 512);
604}
605
606/********************************************************//**
607Frees the recovery system. */
608void
609recv_sys_debug_free(void)
610/*=====================*/
611{
612 mutex_enter(&(recv_sys->mutex));
613
614 hash_table_free(recv_sys->addr_hash);
615 mem_heap_free(recv_sys->heap);
616 ut_free_dodump(recv_sys->buf, recv_sys->buf_size);
617
618 recv_sys->buf_size = 0;
619 recv_sys->buf = NULL;
620 recv_sys->heap = NULL;
621 recv_sys->addr_hash = NULL;
622
623 /* wake page cleaner up to progress */
624 if (!srv_read_only_mode) {
625 ut_ad(!recv_recovery_on);
626 ut_ad(!recv_writer_thread_active);
627 os_event_reset(buf_flush_event);
628 os_event_set(recv_sys->flush_start);
629 }
630
631 mutex_exit(&(recv_sys->mutex));
632}
633
634/** Read a log segment to log_sys.buf.
635@param[in,out] start_lsn in: read area start,
636out: the last read valid lsn
637@param[in] end_lsn read area end
638@return whether no invalid blocks (e.g checksum mismatch) were found */
639bool log_t::files::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
640{
641 ulint len;
642 bool success = true;
643 ut_ad(log_sys.mutex.is_owned());
644 ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
645 ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
646 byte* buf = log_sys.buf;
647loop:
648 lsn_t source_offset = calc_lsn_offset(*start_lsn);
649
650 ut_a(end_lsn - *start_lsn <= ULINT_MAX);
651 len = (ulint) (end_lsn - *start_lsn);
652
653 ut_ad(len != 0);
654
655 const bool at_eof = (source_offset % file_size) + len > file_size;
656 if (at_eof) {
657 /* If the above condition is true then len (which is ulint)
658 is > the expression below, so the typecast is ok */
659 len = ulint(file_size - (source_offset % file_size));
660 }
661
662 log_sys.n_log_ios++;
663
664 MONITOR_INC(MONITOR_LOG_IO);
665
666 ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
667
668 const ulint page_no = ulint(source_offset >> srv_page_size_shift);
669
670 fil_io(IORequestLogRead, true,
671 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
672 univ_page_size,
673 ulint(source_offset & (srv_page_size - 1)),
674 len, buf, NULL);
675
676 for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
677 buf += OS_FILE_LOG_BLOCK_SIZE,
678 (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
679 const ulint block_number = log_block_get_hdr_no(buf);
680
681 if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
682 /* Garbage or an incompletely written log block.
683 We will not report any error, because this can
684 happen when InnoDB was killed while it was
685 writing redo log. We simply treat this as an
686 abrupt end of the redo log. */
687 end_lsn = *start_lsn;
688 break;
689 }
690
691 if (innodb_log_checksums || is_encrypted()) {
692 ulint crc = log_block_calc_checksum_crc32(buf);
693 ulint cksum = log_block_get_checksum(buf);
694
695 DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
696 static int block_counter;
697 if (block_counter++ == 0) {
698 cksum = crc + 1;
699 }
700 });
701
702 if (crc != cksum) {
703 ib::error() << "Invalid log block checksum."
704 << " block: " << block_number
705 << " checkpoint no: "
706 << log_block_get_checkpoint_no(buf)
707 << " expected: " << crc
708 << " found: " << cksum;
709 end_lsn = *start_lsn;
710 success = false;
711 break;
712 }
713
714 if (is_encrypted()) {
715 log_crypt(buf, *start_lsn,
716 OS_FILE_LOG_BLOCK_SIZE, true);
717 }
718 }
719 }
720
721 if (recv_sys->report(ut_time())) {
722 ib::info() << "Read redo log up to LSN=" << *start_lsn;
723 service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
724 "Read redo log up to LSN=" LSN_PF,
725 *start_lsn);
726 }
727
728 if (*start_lsn != end_lsn) {
729 goto loop;
730 }
731
732 return(success);
733}
734
735
736
737/********************************************************//**
738Copies a log segment from the most up-to-date log group to the other log
739groups, so that they all contain the latest log data. Also writes the info
740about the latest checkpoint to the groups, and inits the fields in the group
741memory structs to up-to-date values. */
742static
743void
744recv_synchronize_groups()
745{
746 const lsn_t recovered_lsn = recv_sys->recovered_lsn;
747
748 /* Read the last recovered log block to the recovery system buffer:
749 the block is always incomplete */
750
751 lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
752 OS_FILE_LOG_BLOCK_SIZE);
753 log_sys.log.read_log_seg(&start_lsn,
754 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
755 log_sys.log.set_fields(recovered_lsn);
756
757 /* Copy the checkpoint info to the log; remember that we have
758 incremented checkpoint_no by one, and the info will not be written
759 over the max checkpoint info, thus making the preservation of max
760 checkpoint info on disk certain */
761
762 if (!srv_read_only_mode) {
763 log_write_checkpoint_info(true, 0);
764 log_mutex_enter();
765 }
766}
767
768/** Check the consistency of a log header block.
769@param[in] log header block
770@return true if ok */
771static
772bool
773recv_check_log_header_checksum(
774 const byte* buf)
775{
776 return(log_block_get_checksum(buf)
777 == log_block_calc_checksum_crc32(buf));
778}
779
780/** Find the latest checkpoint in the format-0 log header.
781@param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
782@return error code or DB_SUCCESS */
783static MY_ATTRIBUTE((warn_unused_result))
784dberr_t
785recv_find_max_checkpoint_0(ulint* max_field)
786{
787 ib_uint64_t max_no = 0;
788 ib_uint64_t checkpoint_no;
789 byte* buf = log_sys.checkpoint_buf;
790
791 ut_ad(log_sys.log.format == 0);
792
793 /** Offset of the first checkpoint checksum */
794 static const uint CHECKSUM_1 = 288;
795 /** Offset of the second checkpoint checksum */
796 static const uint CHECKSUM_2 = CHECKSUM_1 + 4;
797 /** Most significant bits of the checkpoint offset */
798 static const uint OFFSET_HIGH32 = CHECKSUM_2 + 12;
799 /** Least significant bits of the checkpoint offset */
800 static const uint OFFSET_LOW32 = 16;
801
802 bool found = false;
803
804 for (ulint field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
805 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
806 log_header_read(field);
807
808 if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1))
809 != mach_read_from_4(buf + CHECKSUM_1)
810 || static_cast<uint32_t>(
811 ut_fold_binary(buf + LOG_CHECKPOINT_LSN,
812 CHECKSUM_2 - LOG_CHECKPOINT_LSN))
813 != mach_read_from_4(buf + CHECKSUM_2)) {
814 DBUG_LOG("ib_log",
815 "invalid pre-10.2.2 checkpoint " << field);
816 continue;
817 }
818
819 checkpoint_no = mach_read_from_8(
820 buf + LOG_CHECKPOINT_NO);
821
822 if (!log_crypt_101_read_checkpoint(buf)) {
823 ib::error() << "Decrypting checkpoint failed";
824 continue;
825 }
826
827 DBUG_PRINT("ib_log",
828 ("checkpoint " UINT64PF " at " LSN_PF " found",
829 checkpoint_no,
830 mach_read_from_8(buf + LOG_CHECKPOINT_LSN)));
831
832 if (checkpoint_no >= max_no) {
833 found = true;
834 *max_field = field;
835 max_no = checkpoint_no;
836
837 log_sys.log.state = LOG_GROUP_OK;
838
839 log_sys.log.lsn = mach_read_from_8(
840 buf + LOG_CHECKPOINT_LSN);
841 log_sys.log.lsn_offset = static_cast<ib_uint64_t>(
842 mach_read_from_4(buf + OFFSET_HIGH32)) << 32
843 | mach_read_from_4(buf + OFFSET_LOW32);
844 }
845 }
846
847 if (found) {
848 return(DB_SUCCESS);
849 }
850
851 ib::error() << "Upgrade after a crash is not supported."
852 " This redo log was created before MariaDB 10.2.2,"
853 " and we did not find a valid checkpoint."
854 " Please follow the instructions at"
855 " https://mariadb.com/kb/en/library/upgrading/";
856 return(DB_ERROR);
857}
858
859/** Determine if a pre-MySQL 5.7.9/MariaDB 10.2.2 redo log is clean.
860@param[in] lsn checkpoint LSN
861@return error code
862@retval DB_SUCCESS if the redo log is clean
863@retval DB_ERROR if the redo log is corrupted or dirty */
864static
865dberr_t
866recv_log_format_0_recover(lsn_t lsn)
867{
868 log_mutex_enter();
869 const lsn_t source_offset = log_sys.log.calc_lsn_offset(lsn);
870 log_mutex_exit();
871 const ulint page_no = ulint(source_offset >> srv_page_size_shift);
872 byte* buf = log_sys.buf;
873
874 static const char* NO_UPGRADE_RECOVERY_MSG =
875 "Upgrade after a crash is not supported."
876 " This redo log was created before MariaDB 10.2.2";
877
878 fil_io(IORequestLogRead, true,
879 page_id_t(SRV_LOG_SPACE_FIRST_ID, page_no),
880 univ_page_size,
881 ulint((source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1))
882 & (srv_page_size - 1)),
883 OS_FILE_LOG_BLOCK_SIZE, buf, NULL);
884
885 if (log_block_calc_checksum_format_0(buf)
886 != log_block_get_checksum(buf)
887 && !log_crypt_101_read_block(buf)) {
888 ib::error() << NO_UPGRADE_RECOVERY_MSG
889 << ", and it appears corrupted.";
890 return(DB_CORRUPTION);
891 }
892
893 if (log_block_get_data_len(buf)
894 != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
895 ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
896 return(DB_ERROR);
897 }
898
899 /* Mark the redo log for upgrading. */
900 srv_log_file_size = 0;
901 recv_sys->parse_start_lsn = recv_sys->recovered_lsn
902 = recv_sys->scanned_lsn
903 = recv_sys->mlog_checkpoint_lsn = lsn;
904 log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
905 = log_sys.lsn = log_sys.write_lsn
906 = log_sys.current_flush_lsn = log_sys.flushed_to_disk_lsn
907 = lsn;
908 log_sys.next_checkpoint_no = 0;
909 return(DB_SUCCESS);
910}
911
912/** Find the latest checkpoint in the log header.
913@param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
914@return error code or DB_SUCCESS */
915dberr_t
916recv_find_max_checkpoint(ulint* max_field)
917{
918 ib_uint64_t max_no;
919 ib_uint64_t checkpoint_no;
920 ulint field;
921 byte* buf;
922
923 max_no = 0;
924 *max_field = 0;
925
926 buf = log_sys.checkpoint_buf;
927
928 log_sys.log.state = LOG_GROUP_CORRUPTED;
929
930 log_header_read(0);
931 /* Check the header page checksum. There was no
932 checksum in the first redo log format (version 0). */
933 log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
934 if (log_sys.log.format != LOG_HEADER_FORMAT_3_23
935 && !recv_check_log_header_checksum(buf)) {
936 ib::error() << "Invalid redo log header checksum.";
937 return(DB_CORRUPTION);
938 }
939
940 char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
941
942 memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
943 /* Ensure that the string is NUL-terminated. */
944 creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
945
946 switch (log_sys.log.format) {
947 case LOG_HEADER_FORMAT_3_23:
948 return(recv_find_max_checkpoint_0(max_field));
949 case LOG_HEADER_FORMAT_10_2:
950 case LOG_HEADER_FORMAT_10_2 | LOG_HEADER_FORMAT_ENCRYPTED:
951 case LOG_HEADER_FORMAT_CURRENT:
952 case LOG_HEADER_FORMAT_CURRENT | LOG_HEADER_FORMAT_ENCRYPTED:
953 break;
954 default:
955 ib::error() << "Unsupported redo log format."
956 " The redo log was created with " << creator << ".";
957 return(DB_ERROR);
958 }
959
960 for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
961 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
962
963 log_header_read(field);
964
965 const ulint crc32 = log_block_calc_checksum_crc32(buf);
966 const ulint cksum = log_block_get_checksum(buf);
967
968 if (crc32 != cksum) {
969 DBUG_PRINT("ib_log",
970 ("invalid checkpoint,"
971 " at " ULINTPF
972 ", checksum " ULINTPFx
973 " expected " ULINTPFx,
974 field, cksum, crc32));
975 continue;
976 }
977
978 if (log_sys.is_encrypted()
979 && !log_crypt_read_checkpoint_buf(buf)) {
980 ib::error() << "Reading checkpoint"
981 " encryption info failed.";
982 continue;
983 }
984
985 checkpoint_no = mach_read_from_8(
986 buf + LOG_CHECKPOINT_NO);
987
988 DBUG_PRINT("ib_log",
989 ("checkpoint " UINT64PF " at " LSN_PF " found",
990 checkpoint_no, mach_read_from_8(
991 buf + LOG_CHECKPOINT_LSN)));
992
993 if (checkpoint_no >= max_no) {
994 *max_field = field;
995 max_no = checkpoint_no;
996 log_sys.log.state = LOG_GROUP_OK;
997 log_sys.log.lsn = mach_read_from_8(
998 buf + LOG_CHECKPOINT_LSN);
999 log_sys.log.lsn_offset = mach_read_from_8(
1000 buf + LOG_CHECKPOINT_OFFSET);
1001 log_sys.next_checkpoint_no = checkpoint_no;
1002 }
1003 }
1004
1005 if (*max_field == 0) {
1006 /* Before 10.2.2, we could get here during database
1007 initialization if we created an ib_logfile0 file that
1008 was filled with zeroes, and were killed. After
1009 10.2.2, we would reject such a file already earlier,
1010 when checking the file header. */
1011 ib::error() << "No valid checkpoint found"
1012 " (corrupted redo log)."
1013 " You can try --innodb-force-recovery=6"
1014 " as a last resort.";
1015 return(DB_ERROR);
1016 }
1017
1018 return(DB_SUCCESS);
1019}
1020
1021/** Try to parse a single log record body and also applies it if
1022specified.
1023@param[in] type redo log entry type
1024@param[in] ptr redo log record body
1025@param[in] end_ptr end of buffer
1026@param[in] space_id tablespace identifier
1027@param[in] page_no page number
1028@param[in] apply whether to apply the record
1029@param[in,out] block buffer block, or NULL if
1030a page log record should not be applied
1031or if it is a MLOG_FILE_ operation
1032@param[in,out] mtr mini-transaction, or NULL if
1033a page log record should not be applied
1034@return log record end, NULL if not a complete record */
1035static
1036byte*
1037recv_parse_or_apply_log_rec_body(
1038 mlog_id_t type,
1039 byte* ptr,
1040 byte* end_ptr,
1041 ulint space_id,
1042 ulint page_no,
1043 bool apply,
1044 buf_block_t* block,
1045 mtr_t* mtr)
1046{
1047 ut_ad(!block == !mtr);
1048 ut_ad(!apply || recv_sys->mlog_checkpoint_lsn != 0);
1049
1050 switch (type) {
1051 case MLOG_FILE_NAME:
1052 case MLOG_FILE_DELETE:
1053 case MLOG_FILE_CREATE2:
1054 case MLOG_FILE_RENAME2:
1055 ut_ad(block == NULL);
1056 /* Collect the file names when parsing the log,
1057 before applying any log records. */
1058 return(fil_name_parse(ptr, end_ptr, space_id, page_no, type,
1059 apply));
1060 case MLOG_INDEX_LOAD:
1061 if (end_ptr < ptr + 8) {
1062 return(NULL);
1063 }
1064 return(ptr + 8);
1065 case MLOG_TRUNCATE:
1066 return(truncate_t::parse_redo_entry(ptr, end_ptr, space_id));
1067
1068 default:
1069 break;
1070 }
1071
1072 dict_index_t* index = NULL;
1073 page_t* page;
1074 page_zip_des_t* page_zip;
1075#ifdef UNIV_DEBUG
1076 ulint page_type;
1077#endif /* UNIV_DEBUG */
1078
1079 if (block) {
1080 /* Applying a page log record. */
1081 ut_ad(apply);
1082 page = block->frame;
1083 page_zip = buf_block_get_page_zip(block);
1084 ut_d(page_type = fil_page_get_type(page));
1085 } else if (apply
1086 && !is_predefined_tablespace(space_id)
1087 && recv_spaces.find(space_id) == recv_spaces.end()) {
1088 if (recv_sys->recovered_lsn < recv_sys->mlog_checkpoint_lsn) {
1089 /* We have not seen all records between the
1090 checkpoint and MLOG_CHECKPOINT. There should be
1091 a MLOG_FILE_DELETE for this tablespace later. */
1092 recv_spaces.insert(
1093 std::make_pair(space_id,
1094 file_name_t("", false)));
1095 goto parse_log;
1096 }
1097
1098 ib::error() << "Missing MLOG_FILE_NAME or MLOG_FILE_DELETE"
1099 " for redo log record " << type << " (page "
1100 << space_id << ":" << page_no << ") at "
1101 << recv_sys->recovered_lsn << ".";
1102 recv_sys->found_corrupt_log = true;
1103 return(NULL);
1104 } else {
1105parse_log:
1106 /* Parsing a page log record. */
1107 page = NULL;
1108 page_zip = NULL;
1109 ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED);
1110 }
1111
1112 const byte* old_ptr = ptr;
1113
1114 switch (type) {
1115#ifdef UNIV_LOG_LSN_DEBUG
1116 case MLOG_LSN:
1117 /* The LSN is checked in recv_parse_log_rec(). */
1118 break;
1119#endif /* UNIV_LOG_LSN_DEBUG */
1120 case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES:
1121#ifdef UNIV_DEBUG
1122 if (page && page_type == FIL_PAGE_TYPE_ALLOCATED
1123 && end_ptr >= ptr + 2) {
1124 /* It is OK to set FIL_PAGE_TYPE and certain
1125 list node fields on an empty page. Any other
1126 write is not OK. */
1127
1128 /* NOTE: There may be bogus assertion failures for
1129 dict_hdr_create(), trx_rseg_header_create(),
1130 trx_sys_create_doublewrite_buf(), and
1131 trx_sysf_create().
1132 These are only called during database creation. */
1133 ulint offs = mach_read_from_2(ptr);
1134
1135 switch (type) {
1136 default:
1137 ut_error;
1138 case MLOG_2BYTES:
1139 /* Note that this can fail when the
1140 redo log been written with something
1141 older than InnoDB Plugin 1.0.4. */
1142 ut_ad(offs == FIL_PAGE_TYPE
1143 || offs == IBUF_TREE_SEG_HEADER
1144 + IBUF_HEADER + FSEG_HDR_OFFSET
1145 || offs == PAGE_BTR_IBUF_FREE_LIST
1146 + PAGE_HEADER + FIL_ADDR_BYTE
1147 || offs == PAGE_BTR_IBUF_FREE_LIST
1148 + PAGE_HEADER + FIL_ADDR_BYTE
1149 + FIL_ADDR_SIZE
1150 || offs == PAGE_BTR_SEG_LEAF
1151 + PAGE_HEADER + FSEG_HDR_OFFSET
1152 || offs == PAGE_BTR_SEG_TOP
1153 + PAGE_HEADER + FSEG_HDR_OFFSET
1154 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1155 + PAGE_HEADER + FIL_ADDR_BYTE
1156 + 0 /*FLST_PREV*/
1157 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1158 + PAGE_HEADER + FIL_ADDR_BYTE
1159 + FIL_ADDR_SIZE /*FLST_NEXT*/);
1160 break;
1161 case MLOG_4BYTES:
1162 /* Note that this can fail when the
1163 redo log been written with something
1164 older than InnoDB Plugin 1.0.4. */
1165 ut_ad(0
1166 /* fil_crypt_rotate_page() writes this */
1167 || offs == FIL_PAGE_SPACE_ID
1168 || offs == IBUF_TREE_SEG_HEADER
1169 + IBUF_HEADER + FSEG_HDR_SPACE
1170 || offs == IBUF_TREE_SEG_HEADER
1171 + IBUF_HEADER + FSEG_HDR_PAGE_NO
1172 || offs == PAGE_BTR_IBUF_FREE_LIST
1173 + PAGE_HEADER/* flst_init */
1174 || offs == PAGE_BTR_IBUF_FREE_LIST
1175 + PAGE_HEADER + FIL_ADDR_PAGE
1176 || offs == PAGE_BTR_IBUF_FREE_LIST
1177 + PAGE_HEADER + FIL_ADDR_PAGE
1178 + FIL_ADDR_SIZE
1179 || offs == PAGE_BTR_SEG_LEAF
1180 + PAGE_HEADER + FSEG_HDR_PAGE_NO
1181 || offs == PAGE_BTR_SEG_LEAF
1182 + PAGE_HEADER + FSEG_HDR_SPACE
1183 || offs == PAGE_BTR_SEG_TOP
1184 + PAGE_HEADER + FSEG_HDR_PAGE_NO
1185 || offs == PAGE_BTR_SEG_TOP
1186 + PAGE_HEADER + FSEG_HDR_SPACE
1187 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1188 + PAGE_HEADER + FIL_ADDR_PAGE
1189 + 0 /*FLST_PREV*/
1190 || offs == PAGE_BTR_IBUF_FREE_LIST_NODE
1191 + PAGE_HEADER + FIL_ADDR_PAGE
1192 + FIL_ADDR_SIZE /*FLST_NEXT*/);
1193 break;
1194 }
1195 }
1196#endif /* UNIV_DEBUG */
1197 ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip);
1198 if (ptr != NULL && page != NULL
1199 && page_no == 0 && type == MLOG_4BYTES) {
1200 ulint offs = mach_read_from_2(old_ptr);
1201 switch (offs) {
1202 fil_space_t* space;
1203 ulint val;
1204 default:
1205 break;
1206 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1207 case FSP_HEADER_OFFSET + FSP_SIZE:
1208 case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1209 case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1210 space = fil_space_get(space_id);
1211 ut_a(space != NULL);
1212 val = mach_read_from_4(page + offs);
1213
1214 switch (offs) {
1215 case FSP_HEADER_OFFSET + FSP_SPACE_FLAGS:
1216 space->flags = val;
1217 break;
1218 case FSP_HEADER_OFFSET + FSP_SIZE:
1219 space->size_in_header = val;
1220 break;
1221 case FSP_HEADER_OFFSET + FSP_FREE_LIMIT:
1222 space->free_limit = val;
1223 break;
1224 case FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN:
1225 space->free_len = val;
1226 ut_ad(val == flst_get_len(
1227 page + offs));
1228 break;
1229 }
1230 }
1231 }
1232 break;
1233 case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
1234 ut_ad(!page || fil_page_type_is_index(page_type));
1235
1236 if (NULL != (ptr = mlog_parse_index(
1237 ptr, end_ptr,
1238 type == MLOG_COMP_REC_INSERT,
1239 &index))) {
1240 ut_a(!page
1241 || (ibool)!!page_is_comp(page)
1242 == dict_table_is_comp(index->table));
1243 ptr = page_cur_parse_insert_rec(FALSE, ptr, end_ptr,
1244 block, index, mtr);
1245 }
1246 break;
1247 case MLOG_REC_CLUST_DELETE_MARK: case MLOG_COMP_REC_CLUST_DELETE_MARK:
1248 ut_ad(!page || fil_page_type_is_index(page_type));
1249
1250 if (NULL != (ptr = mlog_parse_index(
1251 ptr, end_ptr,
1252 type == MLOG_COMP_REC_CLUST_DELETE_MARK,
1253 &index))) {
1254 ut_a(!page
1255 || (ibool)!!page_is_comp(page)
1256 == dict_table_is_comp(index->table));
1257 ptr = btr_cur_parse_del_mark_set_clust_rec(
1258 ptr, end_ptr, page, page_zip, index);
1259 }
1260 break;
1261 case MLOG_REC_SEC_DELETE_MARK:
1262 ut_ad(!page || fil_page_type_is_index(page_type));
1263 ptr = btr_cur_parse_del_mark_set_sec_rec(ptr, end_ptr,
1264 page, page_zip);
1265 break;
1266 case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
1267 ut_ad(!page || fil_page_type_is_index(page_type));
1268
1269 if (NULL != (ptr = mlog_parse_index(
1270 ptr, end_ptr,
1271 type == MLOG_COMP_REC_UPDATE_IN_PLACE,
1272 &index))) {
1273 ut_a(!page
1274 || (ibool)!!page_is_comp(page)
1275 == dict_table_is_comp(index->table));
1276 ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page,
1277 page_zip, index);
1278 }
1279 break;
1280 case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE:
1281 case MLOG_LIST_START_DELETE: case MLOG_COMP_LIST_START_DELETE:
1282 ut_ad(!page || fil_page_type_is_index(page_type));
1283
1284 if (NULL != (ptr = mlog_parse_index(
1285 ptr, end_ptr,
1286 type == MLOG_COMP_LIST_END_DELETE
1287 || type == MLOG_COMP_LIST_START_DELETE,
1288 &index))) {
1289 ut_a(!page
1290 || (ibool)!!page_is_comp(page)
1291 == dict_table_is_comp(index->table));
1292 ptr = page_parse_delete_rec_list(type, ptr, end_ptr,
1293 block, index, mtr);
1294 }
1295 break;
1296 case MLOG_LIST_END_COPY_CREATED: case MLOG_COMP_LIST_END_COPY_CREATED:
1297 ut_ad(!page || fil_page_type_is_index(page_type));
1298
1299 if (NULL != (ptr = mlog_parse_index(
1300 ptr, end_ptr,
1301 type == MLOG_COMP_LIST_END_COPY_CREATED,
1302 &index))) {
1303 ut_a(!page
1304 || (ibool)!!page_is_comp(page)
1305 == dict_table_is_comp(index->table));
1306 ptr = page_parse_copy_rec_list_to_created_page(
1307 ptr, end_ptr, block, index, mtr);
1308 }
1309 break;
1310 case MLOG_PAGE_REORGANIZE:
1311 case MLOG_COMP_PAGE_REORGANIZE:
1312 case MLOG_ZIP_PAGE_REORGANIZE:
1313 ut_ad(!page || fil_page_type_is_index(page_type));
1314
1315 if (NULL != (ptr = mlog_parse_index(
1316 ptr, end_ptr,
1317 type != MLOG_PAGE_REORGANIZE,
1318 &index))) {
1319 ut_a(!page
1320 || (ibool)!!page_is_comp(page)
1321 == dict_table_is_comp(index->table));
1322 ptr = btr_parse_page_reorganize(
1323 ptr, end_ptr, index,
1324 type == MLOG_ZIP_PAGE_REORGANIZE,
1325 block, mtr);
1326 }
1327 break;
1328 case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
1329 /* Allow anything in page_type when creating a page. */
1330 ut_a(!page_zip);
1331 page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
1332 break;
1333 case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
1334 page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
1335 true);
1336 break;
1337 case MLOG_UNDO_INSERT:
1338 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1339 ptr = trx_undo_parse_add_undo_rec(ptr, end_ptr, page);
1340 break;
1341 case MLOG_UNDO_ERASE_END:
1342 if (page) {
1343 ut_ad(page_type == FIL_PAGE_UNDO_LOG);
1344 trx_undo_erase_page_end(page);
1345 }
1346 break;
1347 case MLOG_UNDO_INIT:
1348 /* Allow anything in page_type when creating a page. */
1349 ptr = trx_undo_parse_page_init(ptr, end_ptr, page);
1350 break;
1351 case MLOG_UNDO_HDR_REUSE:
1352 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1353 ptr = trx_undo_parse_page_header_reuse(ptr, end_ptr, page);
1354 break;
1355 case MLOG_UNDO_HDR_CREATE:
1356 ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
1357 ptr = trx_undo_parse_page_header(ptr, end_ptr, page, mtr);
1358 break;
1359 case MLOG_REC_MIN_MARK: case MLOG_COMP_REC_MIN_MARK:
1360 ut_ad(!page || fil_page_type_is_index(page_type));
1361 /* On a compressed page, MLOG_COMP_REC_MIN_MARK
1362 will be followed by MLOG_COMP_REC_DELETE
1363 or MLOG_ZIP_WRITE_HEADER(FIL_PAGE_PREV, FIL_NULL)
1364 in the same mini-transaction. */
1365 ut_a(type == MLOG_COMP_REC_MIN_MARK || !page_zip);
1366 ptr = btr_parse_set_min_rec_mark(
1367 ptr, end_ptr, type == MLOG_COMP_REC_MIN_MARK,
1368 page, mtr);
1369 break;
1370 case MLOG_REC_DELETE: case MLOG_COMP_REC_DELETE:
1371 ut_ad(!page || fil_page_type_is_index(page_type));
1372
1373 if (NULL != (ptr = mlog_parse_index(
1374 ptr, end_ptr,
1375 type == MLOG_COMP_REC_DELETE,
1376 &index))) {
1377 ut_a(!page
1378 || (ibool)!!page_is_comp(page)
1379 == dict_table_is_comp(index->table));
1380 ptr = page_cur_parse_delete_rec(ptr, end_ptr,
1381 block, index, mtr);
1382 }
1383 break;
1384 case MLOG_IBUF_BITMAP_INIT:
1385 /* Allow anything in page_type when creating a page. */
1386 ptr = ibuf_parse_bitmap_init(ptr, end_ptr, block, mtr);
1387 break;
1388 case MLOG_INIT_FILE_PAGE2:
1389 /* Allow anything in page_type when creating a page. */
1390 ptr = fsp_parse_init_file_page(ptr, end_ptr, block);
1391 break;
1392 case MLOG_WRITE_STRING:
1393 ptr = mlog_parse_string(ptr, end_ptr, page, page_zip);
1394 break;
1395 case MLOG_ZIP_WRITE_NODE_PTR:
1396 ut_ad(!page || fil_page_type_is_index(page_type));
1397 ptr = page_zip_parse_write_node_ptr(ptr, end_ptr,
1398 page, page_zip);
1399 break;
1400 case MLOG_ZIP_WRITE_BLOB_PTR:
1401 ut_ad(!page || fil_page_type_is_index(page_type));
1402 ptr = page_zip_parse_write_blob_ptr(ptr, end_ptr,
1403 page, page_zip);
1404 break;
1405 case MLOG_ZIP_WRITE_HEADER:
1406 ut_ad(!page || fil_page_type_is_index(page_type));
1407 ptr = page_zip_parse_write_header(ptr, end_ptr,
1408 page, page_zip);
1409 break;
1410 case MLOG_ZIP_PAGE_COMPRESS:
1411 /* Allow anything in page_type when creating a page. */
1412 ptr = page_zip_parse_compress(ptr, end_ptr,
1413 page, page_zip);
1414 break;
1415 case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
1416 if (NULL != (ptr = mlog_parse_index(
1417 ptr, end_ptr, TRUE, &index))) {
1418
1419 ut_a(!page || ((ibool)!!page_is_comp(page)
1420 == dict_table_is_comp(index->table)));
1421 ptr = page_zip_parse_compress_no_data(
1422 ptr, end_ptr, page, page_zip, index);
1423 }
1424 break;
1425 case MLOG_ZIP_WRITE_TRX_ID:
1426 /* This must be a clustered index leaf page. */
1427 ut_ad(!page || page_type == FIL_PAGE_INDEX);
1428 ptr = page_zip_parse_write_trx_id(ptr, end_ptr,
1429 page, page_zip);
1430 break;
1431 case MLOG_FILE_WRITE_CRYPT_DATA:
1432 dberr_t err;
1433 ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, &err));
1434
1435 if (err != DB_SUCCESS) {
1436 recv_sys->found_corrupt_log = TRUE;
1437 }
1438 break;
1439 default:
1440 ptr = NULL;
1441 ib::error() << "Incorrect log record type:" << type;
1442
1443 recv_sys->found_corrupt_log = true;
1444 }
1445
1446 if (index) {
1447 dict_table_t* table = index->table;
1448
1449 dict_mem_index_free(index);
1450 dict_mem_table_free(table);
1451 }
1452
1453 return(ptr);
1454}
1455
1456/*********************************************************************//**
1457Calculates the fold value of a page file address: used in inserting or
1458searching for a log record in the hash table.
1459@return folded value */
1460UNIV_INLINE
1461ulint
1462recv_fold(
1463/*======*/
1464 ulint space, /*!< in: space */
1465 ulint page_no)/*!< in: page number */
1466{
1467 return(ut_fold_ulint_pair(space, page_no));
1468}
1469
1470/*********************************************************************//**
1471Calculates the hash value of a page file address: used in inserting or
1472searching for a log record in the hash table.
1473@return folded value */
1474UNIV_INLINE
1475ulint
1476recv_hash(
1477/*======*/
1478 ulint space, /*!< in: space */
1479 ulint page_no)/*!< in: page number */
1480{
1481 return(hash_calc_hash(recv_fold(space, page_no), recv_sys->addr_hash));
1482}
1483
1484/*********************************************************************//**
1485Gets the hashed file address struct for a page.
1486@return file address struct, NULL if not found from the hash table */
1487static
1488recv_addr_t*
1489recv_get_fil_addr_struct(
1490/*=====================*/
1491 ulint space, /*!< in: space id */
1492 ulint page_no)/*!< in: page number */
1493{
1494 recv_addr_t* recv_addr;
1495
1496 for (recv_addr = static_cast<recv_addr_t*>(
1497 HASH_GET_FIRST(recv_sys->addr_hash,
1498 recv_hash(space, page_no)));
1499 recv_addr != 0;
1500 recv_addr = static_cast<recv_addr_t*>(
1501 HASH_GET_NEXT(addr_hash, recv_addr))) {
1502
1503 if (recv_addr->space == space
1504 && recv_addr->page_no == page_no) {
1505
1506 return(recv_addr);
1507 }
1508 }
1509
1510 return(NULL);
1511}
1512
1513/*******************************************************************//**
1514Adds a new log record to the hash table of log records. */
1515static
1516void
1517recv_add_to_hash_table(
1518/*===================*/
1519 mlog_id_t type, /*!< in: log record type */
1520 ulint space, /*!< in: space id */
1521 ulint page_no, /*!< in: page number */
1522 byte* body, /*!< in: log record body */
1523 byte* rec_end, /*!< in: log record end */
1524 lsn_t start_lsn, /*!< in: start lsn of the mtr */
1525 lsn_t end_lsn) /*!< in: end lsn of the mtr */
1526{
1527 recv_t* recv;
1528 ulint len;
1529 recv_data_t* recv_data;
1530 recv_data_t** prev_field;
1531 recv_addr_t* recv_addr;
1532
1533 ut_ad(type != MLOG_FILE_DELETE);
1534 ut_ad(type != MLOG_FILE_CREATE2);
1535 ut_ad(type != MLOG_FILE_RENAME2);
1536 ut_ad(type != MLOG_FILE_NAME);
1537 ut_ad(type != MLOG_DUMMY_RECORD);
1538 ut_ad(type != MLOG_CHECKPOINT);
1539 ut_ad(type != MLOG_INDEX_LOAD);
1540 ut_ad(type != MLOG_TRUNCATE);
1541
1542 len = ulint(rec_end - body);
1543
1544 recv = static_cast<recv_t*>(
1545 mem_heap_alloc(recv_sys->heap, sizeof(recv_t)));
1546
1547 recv->type = type;
1548 recv->len = ulint(rec_end - body);
1549 recv->start_lsn = start_lsn;
1550 recv->end_lsn = end_lsn;
1551
1552 recv_addr = recv_get_fil_addr_struct(space, page_no);
1553
1554 if (recv_addr == NULL) {
1555 recv_addr = static_cast<recv_addr_t*>(
1556 mem_heap_alloc(recv_sys->heap, sizeof(recv_addr_t)));
1557
1558 recv_addr->space = space;
1559 recv_addr->page_no = page_no;
1560 recv_addr->state = RECV_NOT_PROCESSED;
1561
1562 UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
1563
1564 HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
1565 recv_fold(space, page_no), recv_addr);
1566 recv_sys->n_addrs++;
1567#if 0
1568 fprintf(stderr, "Inserting log rec for space %lu, page %lu\n",
1569 space, page_no);
1570#endif
1571 }
1572
1573 UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
1574
1575 prev_field = &(recv->data);
1576
1577 /* Store the log record body in chunks of less than srv_page_size:
1578 recv_sys->heap grows into the buffer pool, and bigger chunks could not
1579 be allocated */
1580
1581 while (rec_end > body) {
1582
1583 len = ulint(rec_end - body);
1584
1585 if (len > RECV_DATA_BLOCK_SIZE) {
1586 len = RECV_DATA_BLOCK_SIZE;
1587 }
1588
1589 recv_data = static_cast<recv_data_t*>(
1590 mem_heap_alloc(recv_sys->heap,
1591 sizeof(recv_data_t) + len));
1592
1593 *prev_field = recv_data;
1594
1595 memcpy(recv_data + 1, body, len);
1596
1597 prev_field = &(recv_data->next);
1598
1599 body += len;
1600 }
1601
1602 *prev_field = NULL;
1603}
1604
1605/*********************************************************************//**
1606Copies the log record body from recv to buf. */
1607static
1608void
1609recv_data_copy_to_buf(
1610/*==================*/
1611 byte* buf, /*!< in: buffer of length at least recv->len */
1612 recv_t* recv) /*!< in: log record */
1613{
1614 recv_data_t* recv_data;
1615 ulint part_len;
1616 ulint len;
1617
1618 len = recv->len;
1619 recv_data = recv->data;
1620
1621 while (len > 0) {
1622 if (len > RECV_DATA_BLOCK_SIZE) {
1623 part_len = RECV_DATA_BLOCK_SIZE;
1624 } else {
1625 part_len = len;
1626 }
1627
1628 ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
1629 part_len);
1630 buf += part_len;
1631 len -= part_len;
1632
1633 recv_data = recv_data->next;
1634 }
1635}
1636
1637/** Apply the hashed log records to the page, if the page lsn is less than the
1638lsn of a log record.
1639@param just_read_in whether the page recently arrived to the I/O handler
1640@param block the page in the buffer pool */
1641void
1642recv_recover_page(bool just_read_in, buf_block_t* block)
1643{
1644 page_t* page;
1645 page_zip_des_t* page_zip;
1646 recv_addr_t* recv_addr;
1647 recv_t* recv;
1648 byte* buf;
1649 lsn_t start_lsn;
1650 lsn_t end_lsn;
1651 lsn_t page_lsn;
1652 lsn_t page_newest_lsn;
1653 ibool modification_to_page;
1654 mtr_t mtr;
1655
1656 mutex_enter(&(recv_sys->mutex));
1657
1658 if (recv_sys->apply_log_recs == FALSE) {
1659
1660 /* Log records should not be applied now */
1661
1662 mutex_exit(&(recv_sys->mutex));
1663
1664 return;
1665 }
1666
1667 recv_addr = recv_get_fil_addr_struct(block->page.id.space(),
1668 block->page.id.page_no());
1669
1670 if ((recv_addr == NULL)
1671 || (recv_addr->state == RECV_BEING_PROCESSED)
1672 || (recv_addr->state == RECV_PROCESSED)) {
1673 ut_ad(recv_addr == NULL || recv_needed_recovery);
1674
1675 mutex_exit(&(recv_sys->mutex));
1676
1677 return;
1678 }
1679
1680 ut_ad(recv_needed_recovery);
1681
1682 DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
1683
1684 recv_addr->state = RECV_BEING_PROCESSED;
1685
1686 mutex_exit(&(recv_sys->mutex));
1687
1688 mtr_start(&mtr);
1689 mtr_set_log_mode(&mtr, MTR_LOG_NONE);
1690
1691 page = block->frame;
1692 page_zip = buf_block_get_page_zip(block);
1693
1694 if (just_read_in) {
1695 /* Move the ownership of the x-latch on the page to
1696 this OS thread, so that we can acquire a second
1697 x-latch on it. This is needed for the operations to
1698 the page to pass the debug checks. */
1699
1700 rw_lock_x_lock_move_ownership(&block->lock);
1701 }
1702
1703 ibool success = buf_page_get_known_nowait(
1704 RW_X_LATCH, block, BUF_KEEP_OLD,
1705 __FILE__, __LINE__, &mtr);
1706 ut_a(success);
1707
1708 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
1709
1710 /* Read the newest modification lsn from the page */
1711 page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
1712
1713 /* It may be that the page has been modified in the buffer
1714 pool: read the newest modification lsn there */
1715
1716 page_newest_lsn = buf_page_get_newest_modification(&block->page);
1717
1718 if (page_newest_lsn) {
1719
1720 page_lsn = page_newest_lsn;
1721 }
1722
1723 modification_to_page = FALSE;
1724 start_lsn = end_lsn = 0;
1725
1726 recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
1727 fil_space_t* space = fil_space_acquire(block->page.id.space());
1728
1729 while (recv) {
1730 end_lsn = recv->end_lsn;
1731
1732 ut_ad(end_lsn <= log_sys.log.scanned_lsn);
1733
1734 if (recv->len > RECV_DATA_BLOCK_SIZE) {
1735 /* We have to copy the record body to a separate
1736 buffer */
1737
1738 buf = static_cast<byte*>(ut_malloc_nokey(recv->len));
1739
1740 recv_data_copy_to_buf(buf, recv);
1741 } else {
1742 buf = ((byte*)(recv->data)) + sizeof(recv_data_t);
1743 }
1744
1745 /* If per-table tablespace was truncated and there exist REDO
1746 records before truncate that are to be applied as part of
1747 recovery (checkpoint didn't happen since truncate was done)
1748 skip such records using lsn check as they may not stand valid
1749 post truncate.
1750 LSN at start of truncate is recorded and any redo record
1751 with LSN less than recorded LSN is skipped.
1752 Note: We can't skip complete recv_addr as same page may have
1753 valid REDO records post truncate those needs to be applied. */
1754
1755 /* Ignore applying the redo logs for tablespace that is
1756 truncated. Post recovery there is fixup action that will
1757 restore the tablespace back to normal state.
1758 Applying redo at this stage can result in error given that
1759 redo will have action recorded on page before tablespace
1760 was re-inited and that would lead to an error while applying
1761 such action. */
1762 if (recv->start_lsn >= page_lsn
1763 && !srv_is_tablespace_truncated(space->id)
1764 && !(srv_was_tablespace_truncated(space)
1765 && recv->start_lsn
1766 < truncate_t::get_truncated_tablespace_init_lsn(
1767 space->id))) {
1768
1769 lsn_t end_lsn;
1770
1771 if (!modification_to_page) {
1772
1773 modification_to_page = TRUE;
1774 start_lsn = recv->start_lsn;
1775 }
1776
1777 DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
1778 << get_mlog_string(recv->type)
1779 << " len " << recv->len
1780 << " page " << block->page.id);
1781
1782 recv_parse_or_apply_log_rec_body(
1783 recv->type, buf, buf + recv->len,
1784 block->page.id.space(),
1785 block->page.id.page_no(),
1786 true, block, &mtr);
1787
1788 end_lsn = recv->start_lsn + recv->len;
1789 mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
1790 mach_write_to_8(srv_page_size
1791 - FIL_PAGE_END_LSN_OLD_CHKSUM
1792 + page, end_lsn);
1793
1794 if (page_zip) {
1795 mach_write_to_8(FIL_PAGE_LSN
1796 + page_zip->data, end_lsn);
1797 }
1798 }
1799
1800 if (recv->len > RECV_DATA_BLOCK_SIZE) {
1801 ut_free(buf);
1802 }
1803
1804 recv = UT_LIST_GET_NEXT(rec_list, recv);
1805 }
1806
1807 space->release();
1808
1809#ifdef UNIV_ZIP_DEBUG
1810 if (fil_page_index_page_check(page)) {
1811 page_zip_des_t* page_zip = buf_block_get_page_zip(block);
1812
1813 ut_a(!page_zip
1814 || page_zip_validate_low(page_zip, page, NULL, FALSE));
1815 }
1816#endif /* UNIV_ZIP_DEBUG */
1817
1818 if (modification_to_page) {
1819 ut_a(block);
1820
1821 log_flush_order_mutex_enter();
1822 buf_flush_recv_note_modification(block, start_lsn, end_lsn);
1823 log_flush_order_mutex_exit();
1824 }
1825
1826 /* Make sure that committing mtr does not change the modification
1827 lsn values of page */
1828
1829 mtr.discard_modifications();
1830
1831 mtr_commit(&mtr);
1832
1833 ib_time_t time = ut_time();
1834
1835 mutex_enter(&recv_sys->mutex);
1836
1837 if (recv_max_page_lsn < page_lsn) {
1838 recv_max_page_lsn = page_lsn;
1839 }
1840
1841 recv_addr->state = RECV_PROCESSED;
1842
1843 ut_a(recv_sys->n_addrs > 0);
1844 if (ulint n = --recv_sys->n_addrs) {
1845 if (recv_sys->report(time)) {
1846 ib::info() << "To recover: " << n << " pages from log";
1847 service_manager_extend_timeout(
1848 INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
1849 }
1850 }
1851
1852 mutex_exit(&recv_sys->mutex);
1853}
1854
1855/** Reads in pages which have hashed log records, from an area around a given
1856page number.
1857@param[in] page_id page id
1858@return number of pages found */
1859static
1860ulint
1861recv_read_in_area(
1862 const page_id_t& page_id)
1863{
1864 recv_addr_t* recv_addr;
1865 ulint page_nos[RECV_READ_AHEAD_AREA];
1866 ulint low_limit;
1867 ulint n;
1868
1869 low_limit = page_id.page_no()
1870 - (page_id.page_no() % RECV_READ_AHEAD_AREA);
1871
1872 n = 0;
1873
1874 for (ulint page_no = low_limit;
1875 page_no < low_limit + RECV_READ_AHEAD_AREA;
1876 page_no++) {
1877
1878 recv_addr = recv_get_fil_addr_struct(page_id.space(), page_no);
1879
1880 const page_id_t cur_page_id(page_id.space(), page_no);
1881
1882 if (recv_addr && !buf_page_peek(cur_page_id)) {
1883
1884 mutex_enter(&(recv_sys->mutex));
1885
1886 if (recv_addr->state == RECV_NOT_PROCESSED) {
1887 recv_addr->state = RECV_BEING_READ;
1888
1889 page_nos[n] = page_no;
1890
1891 n++;
1892 }
1893
1894 mutex_exit(&(recv_sys->mutex));
1895 }
1896 }
1897
1898 buf_read_recv_pages(FALSE, page_id.space(), page_nos, n);
1899 return(n);
1900}
1901
1902/** Apply the hash table of stored log records to persistent data pages.
1903@param[in] last_batch whether the change buffer merge will be
1904 performed as part of the operation */
1905void
1906recv_apply_hashed_log_recs(bool last_batch)
1907{
1908 ut_ad(srv_operation == SRV_OPERATION_NORMAL
1909 || srv_operation == SRV_OPERATION_RESTORE
1910 || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
1911
1912 mutex_enter(&recv_sys->mutex);
1913
1914 while (recv_sys->apply_batch_on) {
1915 bool abort = recv_sys->found_corrupt_log;
1916 mutex_exit(&recv_sys->mutex);
1917
1918 if (abort) {
1919 return;
1920 }
1921
1922 os_thread_sleep(500000);
1923 mutex_enter(&recv_sys->mutex);
1924 }
1925
1926 ut_ad(!last_batch == log_mutex_own());
1927
1928 recv_no_ibuf_operations = !last_batch
1929 || srv_operation == SRV_OPERATION_RESTORE
1930 || srv_operation == SRV_OPERATION_RESTORE_EXPORT;
1931
1932 ut_d(recv_no_log_write = recv_no_ibuf_operations);
1933
1934 if (ulint n = recv_sys->n_addrs) {
1935 const char* msg = last_batch
1936 ? "Starting final batch to recover "
1937 : "Starting a batch to recover ";
1938 ib::info() << msg << n << " pages from redo log.";
1939 sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
1940 msg, n);
1941 }
1942 recv_sys->apply_log_recs = TRUE;
1943 recv_sys->apply_batch_on = TRUE;
1944
1945 for (ulint i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
1946 for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
1947 HASH_GET_FIRST(recv_sys->addr_hash, i));
1948 recv_addr;
1949 recv_addr = static_cast<recv_addr_t*>(
1950 HASH_GET_NEXT(addr_hash, recv_addr))) {
1951
1952 if (srv_is_tablespace_truncated(recv_addr->space)) {
1953 /* Avoid applying REDO log for the tablespace
1954 that is schedule for TRUNCATE. */
1955 ut_a(recv_sys->n_addrs);
1956 recv_addr->state = RECV_DISCARDED;
1957 recv_sys->n_addrs--;
1958 continue;
1959 }
1960
1961 if (recv_addr->state == RECV_DISCARDED) {
1962 ut_a(recv_sys->n_addrs);
1963 recv_sys->n_addrs--;
1964 continue;
1965 }
1966
1967 const page_id_t page_id(recv_addr->space,
1968 recv_addr->page_no);
1969 bool found;
1970 const page_size_t& page_size
1971 = fil_space_get_page_size(recv_addr->space,
1972 &found);
1973
1974 ut_ad(found);
1975
1976 if (recv_addr->state == RECV_NOT_PROCESSED) {
1977 mutex_exit(&recv_sys->mutex);
1978
1979 if (buf_page_peek(page_id)) {
1980 mtr_t mtr;
1981 mtr.start();
1982
1983 buf_block_t* block = buf_page_get(
1984 page_id, page_size,
1985 RW_X_LATCH, &mtr);
1986
1987 buf_block_dbg_add_level(
1988 block, SYNC_NO_ORDER_CHECK);
1989
1990 recv_recover_page(FALSE, block);
1991 mtr.commit();
1992 } else {
1993 recv_read_in_area(page_id);
1994 }
1995
1996 mutex_enter(&recv_sys->mutex);
1997 }
1998 }
1999 }
2000
2001 /* Wait until all the pages have been processed */
2002
2003 while (recv_sys->n_addrs != 0) {
2004 bool abort = recv_sys->found_corrupt_log;
2005
2006 mutex_exit(&(recv_sys->mutex));
2007
2008 if (abort) {
2009 return;
2010 }
2011
2012 os_thread_sleep(500000);
2013
2014 mutex_enter(&(recv_sys->mutex));
2015 }
2016
2017 if (!last_batch) {
2018 /* Flush all the file pages to disk and invalidate them in
2019 the buffer pool */
2020
2021 mutex_exit(&(recv_sys->mutex));
2022 log_mutex_exit();
2023
2024 /* Stop the recv_writer thread from issuing any LRU
2025 flush batches. */
2026 mutex_enter(&recv_sys->writer_mutex);
2027
2028 /* Wait for any currently run batch to end. */
2029 buf_flush_wait_LRU_batch_end();
2030
2031 os_event_reset(recv_sys->flush_end);
2032 recv_sys->flush_type = BUF_FLUSH_LIST;
2033 os_event_set(recv_sys->flush_start);
2034 os_event_wait(recv_sys->flush_end);
2035
2036 buf_pool_invalidate();
2037
2038 /* Allow batches from recv_writer thread. */
2039 mutex_exit(&recv_sys->writer_mutex);
2040
2041 log_mutex_enter();
2042 mutex_enter(&(recv_sys->mutex));
2043 }
2044
2045 recv_sys->apply_log_recs = FALSE;
2046 recv_sys->apply_batch_on = FALSE;
2047
2048 recv_sys_empty_hash();
2049
2050 mutex_exit(&recv_sys->mutex);
2051}
2052
2053/** Tries to parse a single log record.
2054@param[out] type log record type
2055@param[in] ptr pointer to a buffer
2056@param[in] end_ptr end of the buffer
2057@param[out] space_id tablespace identifier
2058@param[out] page_no page number
2059@param[in] apply whether to apply MLOG_FILE_* records
2060@param[out] body start of log record body
2061@return length of the record, or 0 if the record was not complete */
2062static
2063ulint
2064recv_parse_log_rec(
2065 mlog_id_t* type,
2066 byte* ptr,
2067 byte* end_ptr,
2068 ulint* space,
2069 ulint* page_no,
2070 bool apply,
2071 byte** body)
2072{
2073 byte* new_ptr;
2074
2075 *body = NULL;
2076
2077 UNIV_MEM_INVALID(type, sizeof *type);
2078 UNIV_MEM_INVALID(space, sizeof *space);
2079 UNIV_MEM_INVALID(page_no, sizeof *page_no);
2080 UNIV_MEM_INVALID(body, sizeof *body);
2081
2082 if (ptr == end_ptr) {
2083
2084 return(0);
2085 }
2086
2087 switch (*ptr) {
2088#ifdef UNIV_LOG_LSN_DEBUG
2089 case MLOG_LSN | MLOG_SINGLE_REC_FLAG:
2090 case MLOG_LSN:
2091 new_ptr = mlog_parse_initial_log_record(
2092 ptr, end_ptr, type, space, page_no);
2093 if (new_ptr != NULL) {
2094 const lsn_t lsn = static_cast<lsn_t>(
2095 *space) << 32 | *page_no;
2096 ut_a(lsn == recv_sys->recovered_lsn);
2097 }
2098
2099 *type = MLOG_LSN;
2100 return(new_ptr - ptr);
2101#endif /* UNIV_LOG_LSN_DEBUG */
2102 case MLOG_MULTI_REC_END:
2103 case MLOG_DUMMY_RECORD:
2104 *type = static_cast<mlog_id_t>(*ptr);
2105 return(1);
2106 case MLOG_CHECKPOINT:
2107 if (end_ptr < ptr + SIZE_OF_MLOG_CHECKPOINT) {
2108 return(0);
2109 }
2110 *type = static_cast<mlog_id_t>(*ptr);
2111 return(SIZE_OF_MLOG_CHECKPOINT);
2112 case MLOG_MULTI_REC_END | MLOG_SINGLE_REC_FLAG:
2113 case MLOG_DUMMY_RECORD | MLOG_SINGLE_REC_FLAG:
2114 case MLOG_CHECKPOINT | MLOG_SINGLE_REC_FLAG:
2115 ib::error() << "Incorrect log record type:" << *ptr;
2116 recv_sys->found_corrupt_log = true;
2117 return(0);
2118 }
2119
2120 new_ptr = mlog_parse_initial_log_record(ptr, end_ptr, type, space,
2121 page_no);
2122 *body = new_ptr;
2123
2124 if (UNIV_UNLIKELY(!new_ptr)) {
2125
2126 return(0);
2127 }
2128
2129 const byte* old_ptr = new_ptr;
2130 new_ptr = recv_parse_or_apply_log_rec_body(
2131 *type, new_ptr, end_ptr, *space, *page_no, apply, NULL, NULL);
2132
2133 if (UNIV_UNLIKELY(new_ptr == NULL)) {
2134
2135 return(0);
2136 }
2137
2138 if (*page_no == 0 && *type == MLOG_4BYTES
2139 && mach_read_from_2(old_ptr) == FSP_HEADER_OFFSET + FSP_SIZE) {
2140 old_ptr += 2;
2141 fil_space_set_recv_size(*space,
2142 mach_parse_compressed(&old_ptr,
2143 end_ptr));
2144 }
2145
2146 return ulint(new_ptr - ptr);
2147}
2148
2149/*******************************************************//**
2150Calculates the new value for lsn when more data is added to the log. */
2151static
2152lsn_t
2153recv_calc_lsn_on_data_add(
2154/*======================*/
2155 lsn_t lsn, /*!< in: old lsn */
2156 ib_uint64_t len) /*!< in: this many bytes of data is
2157 added, log block headers not included */
2158{
2159 ulint frag_len;
2160 ib_uint64_t lsn_len;
2161
2162 frag_len = (lsn % OS_FILE_LOG_BLOCK_SIZE) - LOG_BLOCK_HDR_SIZE;
2163 ut_ad(frag_len < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
2164 - LOG_BLOCK_TRL_SIZE);
2165 lsn_len = len;
2166 lsn_len += (lsn_len + frag_len)
2167 / (OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE
2168 - LOG_BLOCK_TRL_SIZE)
2169 * (LOG_BLOCK_HDR_SIZE + LOG_BLOCK_TRL_SIZE);
2170
2171 return(lsn + lsn_len);
2172}
2173
2174/** Prints diagnostic info of corrupt log.
2175@param[in] ptr pointer to corrupt log record
2176@param[in] type type of the log record (could be garbage)
2177@param[in] space tablespace ID (could be garbage)
2178@param[in] page_no page number (could be garbage)
2179@return whether processing should continue */
2180static
2181bool
2182recv_report_corrupt_log(
2183 const byte* ptr,
2184 int type,
2185 ulint space,
2186 ulint page_no)
2187{
2188 ib::error() <<
2189 "############### CORRUPT LOG RECORD FOUND ##################";
2190
2191 ib::info() << "Log record type " << type << ", page " << space << ":"
2192 << page_no << ". Log parsing proceeded successfully up to "
2193 << recv_sys->recovered_lsn << ". Previous log record type "
2194 << recv_previous_parsed_rec_type << ", is multi "
2195 << recv_previous_parsed_rec_is_multi << " Recv offset "
2196 << (ptr - recv_sys->buf) << ", prev "
2197 << recv_previous_parsed_rec_offset;
2198
2199 ut_ad(ptr <= recv_sys->buf + recv_sys->len);
2200
2201 const ulint limit = 100;
2202 const ulint before
2203 = std::min(recv_previous_parsed_rec_offset, limit);
2204 const ulint after
2205 = std::min(recv_sys->len - ulint(ptr - recv_sys->buf), limit);
2206
2207 ib::info() << "Hex dump starting " << before << " bytes before and"
2208 " ending " << after << " bytes after the corrupted record:";
2209
2210 ut_print_buf(stderr,
2211 recv_sys->buf
2212 + recv_previous_parsed_rec_offset - before,
2213 ulint(ptr - recv_sys->buf) + before + after
2214 - recv_previous_parsed_rec_offset);
2215 putc('\n', stderr);
2216
2217 if (!srv_force_recovery) {
2218 ib::info() << "Set innodb_force_recovery to ignore this error.";
2219 return(false);
2220 }
2221
2222 ib::warn() << "The log file may have been corrupt and it is possible"
2223 " that the log scan did not proceed far enough in recovery!"
2224 " Please run CHECK TABLE on your InnoDB tables to check"
2225 " that they are ok! If mysqld crashes after this recovery; "
2226 << FORCE_RECOVERY_MSG;
2227 return(true);
2228}
2229
2230/** Parse log records from a buffer and optionally store them to a
2231hash table to wait merging to file pages.
2232@param[in] checkpoint_lsn the LSN of the latest checkpoint
2233@param[in] store whether to store page operations
2234@param[in] apply whether to apply the records
2235@return whether MLOG_CHECKPOINT record was seen the first time,
2236or corruption was noticed */
2237bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
2238{
2239 byte* ptr;
2240 byte* end_ptr;
2241 bool single_rec;
2242 ulint len;
2243 lsn_t new_recovered_lsn;
2244 lsn_t old_lsn;
2245 mlog_id_t type;
2246 ulint space;
2247 ulint page_no;
2248 byte* body;
2249
2250 ut_ad(log_mutex_own());
2251 ut_ad(recv_sys->parse_start_lsn != 0);
2252loop:
2253 ptr = recv_sys->buf + recv_sys->recovered_offset;
2254
2255 end_ptr = recv_sys->buf + recv_sys->len;
2256
2257 if (ptr == end_ptr) {
2258
2259 return(false);
2260 }
2261
2262 switch (*ptr) {
2263 case MLOG_CHECKPOINT:
2264#ifdef UNIV_LOG_LSN_DEBUG
2265 case MLOG_LSN:
2266#endif /* UNIV_LOG_LSN_DEBUG */
2267 case MLOG_DUMMY_RECORD:
2268 single_rec = true;
2269 break;
2270 default:
2271 single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
2272 }
2273
2274 if (single_rec) {
2275 /* The mtr did not modify multiple pages */
2276
2277 old_lsn = recv_sys->recovered_lsn;
2278
2279 /* Try to parse a log record, fetching its type, space id,
2280 page no, and a pointer to the body of the log record */
2281
2282 len = recv_parse_log_rec(&type, ptr, end_ptr, &space,
2283 &page_no, apply, &body);
2284
2285 if (len == 0) {
2286 return(false);
2287 }
2288
2289 if (recv_sys->found_corrupt_log) {
2290 recv_report_corrupt_log(
2291 ptr, type, space, page_no);
2292 return(true);
2293 }
2294
2295 if (recv_sys->found_corrupt_fs) {
2296 return(true);
2297 }
2298
2299 new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
2300
2301 if (new_recovered_lsn > recv_sys->scanned_lsn) {
2302 /* The log record filled a log block, and we require
2303 that also the next log block should have been scanned
2304 in */
2305
2306 return(false);
2307 }
2308
2309 recv_previous_parsed_rec_type = type;
2310 recv_previous_parsed_rec_offset = recv_sys->recovered_offset;
2311 recv_previous_parsed_rec_is_multi = 0;
2312
2313 recv_sys->recovered_offset += len;
2314 recv_sys->recovered_lsn = new_recovered_lsn;
2315
2316 switch (type) {
2317 lsn_t lsn;
2318 case MLOG_DUMMY_RECORD:
2319 /* Do nothing */
2320 break;
2321 case MLOG_CHECKPOINT:
2322 compile_time_assert(SIZE_OF_MLOG_CHECKPOINT == 1 + 8);
2323 lsn = mach_read_from_8(ptr + 1);
2324
2325 DBUG_PRINT("ib_log",
2326 ("MLOG_CHECKPOINT(" LSN_PF ") %s at "
2327 LSN_PF,
2328 lsn,
2329 lsn != checkpoint_lsn ? "ignored"
2330 : recv_sys->mlog_checkpoint_lsn
2331 ? "reread" : "read",
2332 recv_sys->recovered_lsn));
2333
2334 if (lsn == checkpoint_lsn) {
2335 if (recv_sys->mlog_checkpoint_lsn) {
2336 /* At recv_reset_logs() we may
2337 write a duplicate MLOG_CHECKPOINT
2338 for the same checkpoint LSN. Thus
2339 recv_sys->mlog_checkpoint_lsn
2340 can differ from the current LSN. */
2341 ut_ad(recv_sys->mlog_checkpoint_lsn
2342 <= recv_sys->recovered_lsn);
2343 break;
2344 }
2345 recv_sys->mlog_checkpoint_lsn
2346 = recv_sys->recovered_lsn;
2347 return(true);
2348 }
2349 break;
2350#ifdef UNIV_LOG_LSN_DEBUG
2351 case MLOG_LSN:
2352 /* Do not add these records to the hash table.
2353 The page number and space id fields are misused
2354 for something else. */
2355 break;
2356#endif /* UNIV_LOG_LSN_DEBUG */
2357 default:
2358 switch (store) {
2359 case STORE_NO:
2360 break;
2361 case STORE_IF_EXISTS:
2362 if (fil_space_get_flags(space)
2363 == ULINT_UNDEFINED) {
2364 break;
2365 }
2366 /* fall through */
2367 case STORE_YES:
2368 recv_add_to_hash_table(
2369 type, space, page_no, body,
2370 ptr + len, old_lsn,
2371 recv_sys->recovered_lsn);
2372 }
2373 /* fall through */
2374 case MLOG_INDEX_LOAD:
2375 if (type == MLOG_INDEX_LOAD) {
2376 if (check_if_backup_includes
2377 && !check_if_backup_includes(space)) {
2378 ut_ad(srv_operation
2379 == SRV_OPERATION_BACKUP);
2380 return true;
2381 }
2382 }
2383 /* fall through */
2384 case MLOG_FILE_NAME:
2385 case MLOG_FILE_DELETE:
2386 case MLOG_FILE_CREATE2:
2387 case MLOG_FILE_RENAME2:
2388 case MLOG_TRUNCATE:
2389 /* These were already handled by
2390 recv_parse_log_rec() and
2391 recv_parse_or_apply_log_rec_body(). */
2392 DBUG_PRINT("ib_log",
2393 ("scan " LSN_PF ": log rec %s"
2394 " len " ULINTPF
2395 " page " ULINTPF ":" ULINTPF,
2396 old_lsn, get_mlog_string(type),
2397 len, space, page_no));
2398 }
2399 } else {
2400 /* Check that all the records associated with the single mtr
2401 are included within the buffer */
2402
2403 ulint total_len = 0;
2404 ulint n_recs = 0;
2405 bool only_mlog_file = true;
2406 ulint mlog_rec_len = 0;
2407
2408 for (;;) {
2409 len = recv_parse_log_rec(
2410 &type, ptr, end_ptr, &space, &page_no,
2411 false, &body);
2412
2413 if (len == 0) {
2414 return(false);
2415 }
2416
2417 if (recv_sys->found_corrupt_log
2418 || type == MLOG_CHECKPOINT
2419 || (*ptr & MLOG_SINGLE_REC_FLAG)) {
2420 recv_sys->found_corrupt_log = true;
2421 recv_report_corrupt_log(
2422 ptr, type, space, page_no);
2423 return(true);
2424 }
2425
2426 if (recv_sys->found_corrupt_fs) {
2427 return(true);
2428 }
2429
2430 recv_previous_parsed_rec_type = type;
2431 recv_previous_parsed_rec_offset
2432 = recv_sys->recovered_offset + total_len;
2433 recv_previous_parsed_rec_is_multi = 1;
2434
2435 /* MLOG_FILE_NAME redo log records doesn't make changes
2436 to persistent data. If only MLOG_FILE_NAME redo
2437 log record exists then reset the parsing buffer pointer
2438 by changing recovered_lsn and recovered_offset. */
2439 if (type != MLOG_FILE_NAME && only_mlog_file == true) {
2440 only_mlog_file = false;
2441 }
2442
2443 if (only_mlog_file) {
2444 new_recovered_lsn = recv_calc_lsn_on_data_add(
2445 recv_sys->recovered_lsn, len);
2446 mlog_rec_len += len;
2447 recv_sys->recovered_offset += len;
2448 recv_sys->recovered_lsn = new_recovered_lsn;
2449 }
2450
2451 total_len += len;
2452 n_recs++;
2453
2454 ptr += len;
2455
2456 if (type == MLOG_MULTI_REC_END) {
2457 DBUG_PRINT("ib_log",
2458 ("scan " LSN_PF
2459 ": multi-log end"
2460 " total_len " ULINTPF
2461 " n=" ULINTPF,
2462 recv_sys->recovered_lsn,
2463 total_len, n_recs));
2464 total_len -= mlog_rec_len;
2465 break;
2466 }
2467
2468 DBUG_PRINT("ib_log",
2469 ("scan " LSN_PF ": multi-log rec %s"
2470 " len " ULINTPF
2471 " page " ULINTPF ":" ULINTPF,
2472 recv_sys->recovered_lsn,
2473 get_mlog_string(type), len, space, page_no));
2474 }
2475
2476 new_recovered_lsn = recv_calc_lsn_on_data_add(
2477 recv_sys->recovered_lsn, total_len);
2478
2479 if (new_recovered_lsn > recv_sys->scanned_lsn) {
2480 /* The log record filled a log block, and we require
2481 that also the next log block should have been scanned
2482 in */
2483
2484 return(false);
2485 }
2486
2487 /* Add all the records to the hash table */
2488
2489 ptr = recv_sys->buf + recv_sys->recovered_offset;
2490
2491 for (;;) {
2492 old_lsn = recv_sys->recovered_lsn;
2493 /* This will apply MLOG_FILE_ records. We
2494 had to skip them in the first scan, because we
2495 did not know if the mini-transaction was
2496 completely recovered (until MLOG_MULTI_REC_END). */
2497 len = recv_parse_log_rec(
2498 &type, ptr, end_ptr, &space, &page_no,
2499 apply, &body);
2500
2501 if (recv_sys->found_corrupt_log
2502 && !recv_report_corrupt_log(
2503 ptr, type, space, page_no)) {
2504 return(true);
2505 }
2506
2507 if (recv_sys->found_corrupt_fs) {
2508 return(true);
2509 }
2510
2511 ut_a(len != 0);
2512 ut_a(!(*ptr & MLOG_SINGLE_REC_FLAG));
2513
2514 recv_sys->recovered_offset += len;
2515 recv_sys->recovered_lsn
2516 = recv_calc_lsn_on_data_add(old_lsn, len);
2517
2518 switch (type) {
2519 case MLOG_MULTI_REC_END:
2520 /* Found the end mark for the records */
2521 goto loop;
2522#ifdef UNIV_LOG_LSN_DEBUG
2523 case MLOG_LSN:
2524 /* Do not add these records to the hash table.
2525 The page number and space id fields are misused
2526 for something else. */
2527 break;
2528#endif /* UNIV_LOG_LSN_DEBUG */
2529 case MLOG_INDEX_LOAD:
2530 /* Mariabackup FIXME: Report an error
2531 when encountering MLOG_INDEX_LOAD on
2532 --prepare or already on --backup. */
2533 ut_a(srv_operation == SRV_OPERATION_NORMAL);
2534 break;
2535 case MLOG_FILE_NAME:
2536 case MLOG_FILE_DELETE:
2537 case MLOG_FILE_CREATE2:
2538 case MLOG_FILE_RENAME2:
2539 case MLOG_TRUNCATE:
2540 /* These were already handled by
2541 recv_parse_log_rec() and
2542 recv_parse_or_apply_log_rec_body(). */
2543 break;
2544 default:
2545 switch (store) {
2546 case STORE_NO:
2547 break;
2548 case STORE_IF_EXISTS:
2549 if (fil_space_get_flags(space)
2550 == ULINT_UNDEFINED) {
2551 break;
2552 }
2553 /* fall through */
2554 case STORE_YES:
2555 recv_add_to_hash_table(
2556 type, space, page_no,
2557 body, ptr + len,
2558 old_lsn,
2559 new_recovered_lsn);
2560 }
2561 }
2562
2563 ptr += len;
2564 }
2565 }
2566
2567 goto loop;
2568}
2569
2570/** Adds data from a new log block to the parsing buffer of recv_sys if
2571recv_sys->parse_start_lsn is non-zero.
2572@param[in] log_block log block to add
2573@param[in] scanned_lsn lsn of how far we were able to find
2574 data in this log block
2575@return true if more data added */
2576bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
2577{
2578 ulint more_len;
2579 ulint data_len;
2580 ulint start_offset;
2581 ulint end_offset;
2582
2583 ut_ad(scanned_lsn >= recv_sys->scanned_lsn);
2584
2585 if (!recv_sys->parse_start_lsn) {
2586 /* Cannot start parsing yet because no start point for
2587 it found */
2588
2589 return(false);
2590 }
2591
2592 data_len = log_block_get_data_len(log_block);
2593
2594 if (recv_sys->parse_start_lsn >= scanned_lsn) {
2595
2596 return(false);
2597
2598 } else if (recv_sys->scanned_lsn >= scanned_lsn) {
2599
2600 return(false);
2601
2602 } else if (recv_sys->parse_start_lsn > recv_sys->scanned_lsn) {
2603 more_len = (ulint) (scanned_lsn - recv_sys->parse_start_lsn);
2604 } else {
2605 more_len = (ulint) (scanned_lsn - recv_sys->scanned_lsn);
2606 }
2607
2608 if (more_len == 0) {
2609
2610 return(false);
2611 }
2612
2613 ut_ad(data_len >= more_len);
2614
2615 start_offset = data_len - more_len;
2616
2617 if (start_offset < LOG_BLOCK_HDR_SIZE) {
2618 start_offset = LOG_BLOCK_HDR_SIZE;
2619 }
2620
2621 end_offset = data_len;
2622
2623 if (end_offset > OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE) {
2624 end_offset = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE;
2625 }
2626
2627 ut_ad(start_offset <= end_offset);
2628
2629 if (start_offset < end_offset) {
2630 ut_memcpy(recv_sys->buf + recv_sys->len,
2631 log_block + start_offset, end_offset - start_offset);
2632
2633 recv_sys->len += end_offset - start_offset;
2634
2635 ut_a(recv_sys->len <= RECV_PARSING_BUF_SIZE);
2636 }
2637
2638 return(true);
2639}
2640
2641/** Moves the parsing buffer data left to the buffer start. */
2642void recv_sys_justify_left_parsing_buf()
2643{
2644 ut_memmove(recv_sys->buf, recv_sys->buf + recv_sys->recovered_offset,
2645 recv_sys->len - recv_sys->recovered_offset);
2646
2647 recv_sys->len -= recv_sys->recovered_offset;
2648
2649 recv_sys->recovered_offset = 0;
2650}
2651
2652/** Scan redo log from a buffer and stores new log data to the parsing buffer.
2653Parse and hash the log records if new data found.
2654Apply log records automatically when the hash table becomes full.
2655@return true if not able to scan any more in this log group */
2656static
2657bool
2658recv_scan_log_recs(
2659/*===============*/
2660 ulint available_memory,/*!< in: we let the hash table of recs
2661 to grow to this size, at the maximum */
2662 store_t* store_to_hash, /*!< in,out: whether the records should be
2663 stored to the hash table; this is reset
2664 if just debug checking is needed, or
2665 when the available_memory runs out */
2666 const byte* log_block, /*!< in: log segment */
2667 lsn_t checkpoint_lsn, /*!< in: latest checkpoint LSN */
2668 lsn_t start_lsn, /*!< in: buffer start LSN */
2669 lsn_t end_lsn, /*!< in: buffer end LSN */
2670 lsn_t* contiguous_lsn, /*!< in/out: it is known that all log
2671 groups contain contiguous log data up
2672 to this lsn */
2673 lsn_t* group_scanned_lsn)/*!< out: scanning succeeded up to
2674 this lsn */
2675{
2676 lsn_t scanned_lsn = start_lsn;
2677 bool finished = false;
2678 ulint data_len;
2679 bool more_data = false;
2680 bool apply = recv_sys->mlog_checkpoint_lsn != 0;
2681 ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
2682
2683 ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
2684 ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
2685 ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
2686
2687 const byte* const log_end = log_block
2688 + ulint(end_lsn - start_lsn);
2689
2690 do {
2691 ut_ad(!finished);
2692
2693 if (log_block_get_flush_bit(log_block)) {
2694 /* This block was a start of a log flush operation:
2695 we know that the previous flush operation must have
2696 been completed for all log groups before this block
2697 can have been flushed to any of the groups. Therefore,
2698 we know that log data is contiguous up to scanned_lsn
2699 in all non-corrupt log groups. */
2700
2701 if (scanned_lsn > *contiguous_lsn) {
2702 *contiguous_lsn = scanned_lsn;
2703 }
2704 }
2705
2706 data_len = log_block_get_data_len(log_block);
2707
2708 if (scanned_lsn + data_len > recv_sys->scanned_lsn
2709 && log_block_get_checkpoint_no(log_block)
2710 < recv_sys->scanned_checkpoint_no
2711 && (recv_sys->scanned_checkpoint_no
2712 - log_block_get_checkpoint_no(log_block)
2713 > 0x80000000UL)) {
2714
2715 /* Garbage from a log buffer flush which was made
2716 before the most recent database recovery */
2717 finished = true;
2718 break;
2719 }
2720
2721 if (!recv_sys->parse_start_lsn
2722 && (log_block_get_first_rec_group(log_block) > 0)) {
2723
2724 /* We found a point from which to start the parsing
2725 of log records */
2726
2727 recv_sys->parse_start_lsn = scanned_lsn
2728 + log_block_get_first_rec_group(log_block);
2729 recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
2730 recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
2731 }
2732
2733 scanned_lsn += data_len;
2734
2735 if (data_len == LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
2736 && scanned_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
2737 && log_block[LOG_BLOCK_HDR_SIZE] == MLOG_CHECKPOINT
2738 && checkpoint_lsn == mach_read_from_8(LOG_BLOCK_HDR_SIZE
2739 + 1 + log_block)) {
2740 /* The redo log is logically empty. */
2741 ut_ad(recv_sys->mlog_checkpoint_lsn == 0
2742 || recv_sys->mlog_checkpoint_lsn
2743 == checkpoint_lsn);
2744 recv_sys->mlog_checkpoint_lsn = checkpoint_lsn;
2745 DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
2746 scanned_lsn));
2747 finished = true;
2748 break;
2749 }
2750
2751 if (scanned_lsn > recv_sys->scanned_lsn) {
2752 ut_ad(!srv_log_files_created);
2753 if (!recv_needed_recovery) {
2754 recv_needed_recovery = true;
2755
2756 if (srv_read_only_mode) {
2757 ib::warn() << "innodb_read_only"
2758 " prevents crash recovery";
2759 return(true);
2760 }
2761
2762 ib::info() << "Starting crash recovery from"
2763 " checkpoint LSN="
2764 << recv_sys->scanned_lsn;
2765 }
2766
2767 /* We were able to find more log data: add it to the
2768 parsing buffer if parse_start_lsn is already
2769 non-zero */
2770
2771 DBUG_EXECUTE_IF(
2772 "reduce_recv_parsing_buf",
2773 recv_parsing_buf_size
2774 = (70 * 1024);
2775 );
2776
2777 if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE
2778 >= recv_parsing_buf_size) {
2779 ib::error() << "Log parsing buffer overflow."
2780 " Recovery may have failed!";
2781
2782 recv_sys->found_corrupt_log = true;
2783
2784 if (!srv_force_recovery) {
2785 ib::error()
2786 << "Set innodb_force_recovery"
2787 " to ignore this error.";
2788 return(true);
2789 }
2790 } else if (!recv_sys->found_corrupt_log) {
2791 more_data = recv_sys_add_to_parsing_buf(
2792 log_block, scanned_lsn);
2793 }
2794
2795 recv_sys->scanned_lsn = scanned_lsn;
2796 recv_sys->scanned_checkpoint_no
2797 = log_block_get_checkpoint_no(log_block);
2798 }
2799
2800 if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
2801 /* Log data for this group ends here */
2802 finished = true;
2803 break;
2804 } else {
2805 log_block += OS_FILE_LOG_BLOCK_SIZE;
2806 }
2807 } while (log_block < log_end);
2808
2809 *group_scanned_lsn = scanned_lsn;
2810
2811 if (more_data && !recv_sys->found_corrupt_log) {
2812 /* Try to parse more log records */
2813
2814 if (recv_parse_log_recs(checkpoint_lsn,
2815 *store_to_hash, apply)) {
2816 ut_ad(recv_sys->found_corrupt_log
2817 || recv_sys->found_corrupt_fs
2818 || recv_sys->mlog_checkpoint_lsn
2819 == recv_sys->recovered_lsn);
2820 return(true);
2821 }
2822
2823 if (*store_to_hash != STORE_NO
2824 && mem_heap_get_size(recv_sys->heap) > available_memory) {
2825
2826 DBUG_PRINT("ib_log", ("Ran out of memory and last "
2827 "stored lsn " LSN_PF,
2828 recv_sys->recovered_lsn));
2829
2830 recv_sys->last_stored_lsn = recv_sys->recovered_lsn;
2831 *store_to_hash = STORE_NO;
2832 }
2833
2834 if (recv_sys->recovered_offset > recv_parsing_buf_size / 4) {
2835 /* Move parsing buffer data to the buffer start */
2836
2837 recv_sys_justify_left_parsing_buf();
2838 }
2839 }
2840
2841 return(finished);
2842}
2843
2844/** Scans log from a buffer and stores new log data to the parsing buffer.
2845Parses and hashes the log records if new data found.
2846@param[in] checkpoint_lsn latest checkpoint log sequence number
2847@param[in,out] contiguous_lsn log sequence number
2848until which all redo log has been scanned
2849@param[in] last_phase whether changes
2850can be applied to the tablespaces
2851@return whether rescan is needed (not everything was stored) */
2852static
2853bool
2854recv_group_scan_log_recs(
2855 lsn_t checkpoint_lsn,
2856 lsn_t* contiguous_lsn,
2857 bool last_phase)
2858{
2859 DBUG_ENTER("recv_group_scan_log_recs");
2860 DBUG_ASSERT(!last_phase || recv_sys->mlog_checkpoint_lsn > 0);
2861
2862 mutex_enter(&recv_sys->mutex);
2863 recv_sys->len = 0;
2864 recv_sys->recovered_offset = 0;
2865 recv_sys->n_addrs = 0;
2866 recv_sys_empty_hash();
2867 srv_start_lsn = *contiguous_lsn;
2868 recv_sys->parse_start_lsn = *contiguous_lsn;
2869 recv_sys->scanned_lsn = *contiguous_lsn;
2870 recv_sys->recovered_lsn = *contiguous_lsn;
2871 recv_sys->scanned_checkpoint_no = 0;
2872 recv_previous_parsed_rec_type = MLOG_SINGLE_REC_FLAG;
2873 recv_previous_parsed_rec_offset = 0;
2874 recv_previous_parsed_rec_is_multi = 0;
2875 ut_ad(recv_max_page_lsn == 0);
2876 ut_ad(last_phase || !recv_writer_thread_active);
2877 mutex_exit(&recv_sys->mutex);
2878
2879 lsn_t start_lsn;
2880 lsn_t end_lsn;
2881 store_t store_to_hash = recv_sys->mlog_checkpoint_lsn == 0
2882 ? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
2883 ulint available_mem = srv_page_size
2884 * (buf_pool_get_n_pages()
2885 - (recv_n_pool_free_frames * srv_buf_pool_instances));
2886
2887 log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
2888 ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
2889
2890 do {
2891 if (last_phase && store_to_hash == STORE_NO) {
2892 store_to_hash = STORE_IF_EXISTS;
2893 /* We must not allow change buffer
2894 merge here, because it would generate
2895 redo log records before we have
2896 finished the redo log scan. */
2897 recv_apply_hashed_log_recs(false);
2898 }
2899
2900 start_lsn = ut_uint64_align_down(end_lsn,
2901 OS_FILE_LOG_BLOCK_SIZE);
2902 end_lsn = start_lsn;
2903 log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
2904 } while (end_lsn != start_lsn
2905 && !recv_scan_log_recs(
2906 available_mem, &store_to_hash, log_sys.buf,
2907 checkpoint_lsn,
2908 start_lsn, end_lsn,
2909 contiguous_lsn, &log_sys.log.scanned_lsn));
2910
2911 if (recv_sys->found_corrupt_log || recv_sys->found_corrupt_fs) {
2912 DBUG_RETURN(false);
2913 }
2914
2915 DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
2916 last_phase ? "rescan" : "scan",
2917 log_sys.log.scanned_lsn));
2918
2919 DBUG_RETURN(store_to_hash == STORE_NO);
2920}
2921
2922/** Report a missing tablespace for which page-redo log exists.
2923@param[in] err previous error code
2924@param[in] i tablespace descriptor
2925@return new error code */
2926static
2927dberr_t
2928recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
2929{
2930 if (srv_operation == SRV_OPERATION_RESTORE
2931 || srv_operation == SRV_OPERATION_RESTORE_EXPORT) {
2932 ib::warn() << "Tablespace " << i->first << " was not"
2933 " found at " << i->second.name << " when"
2934 " restoring a (partial?) backup. All redo log"
2935 " for this file will be ignored!";
2936 return(err);
2937 }
2938
2939 if (srv_force_recovery == 0) {
2940 ib::error() << "Tablespace " << i->first << " was not"
2941 " found at " << i->second.name << ".";
2942
2943 if (err == DB_SUCCESS) {
2944 ib::error() << "Set innodb_force_recovery=1 to"
2945 " ignore this and to permanently lose"
2946 " all changes to the tablespace.";
2947 err = DB_TABLESPACE_NOT_FOUND;
2948 }
2949 } else {
2950 ib::warn() << "Tablespace " << i->first << " was not"
2951 " found at " << i->second.name << ", and"
2952 " innodb_force_recovery was set. All redo log"
2953 " for this tablespace will be ignored!";
2954 }
2955
2956 return(err);
2957}
2958
2959/** Report the missing tablespace and discard the redo logs for the deleted
2960tablespace.
2961@param[in] rescan rescan of redo logs is needed
2962 if hash table ran out of memory
2963@param[out] missing_tablespace missing tablespace exists or not
2964@return error code or DB_SUCCESS. */
2965static MY_ATTRIBUTE((warn_unused_result))
2966dberr_t
2967recv_validate_tablespace(bool rescan, bool& missing_tablespace)
2968{
2969 dberr_t err = DB_SUCCESS;
2970
2971 for (ulint h = 0; h < hash_get_n_cells(recv_sys->addr_hash); h++) {
2972 for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
2973 HASH_GET_FIRST(recv_sys->addr_hash, h));
2974 recv_addr != 0;
2975 recv_addr = static_cast<recv_addr_t*>(
2976 HASH_GET_NEXT(addr_hash, recv_addr))) {
2977
2978 const ulint space = recv_addr->space;
2979
2980 if (is_predefined_tablespace(space)) {
2981 continue;
2982 }
2983
2984 recv_spaces_t::iterator i = recv_spaces.find(space);
2985 ut_ad(i != recv_spaces.end());
2986
2987 switch (i->second.status) {
2988 case file_name_t::MISSING:
2989 err = recv_init_missing_space(err, i);
2990 i->second.status = file_name_t::DELETED;
2991 /* fall through */
2992 case file_name_t::DELETED:
2993 recv_addr->state = RECV_DISCARDED;
2994 /* fall through */
2995 case file_name_t::NORMAL:
2996 continue;
2997 }
2998 ut_ad(0);
2999 }
3000 }
3001
3002 if (err != DB_SUCCESS) {
3003 return(err);
3004 }
3005
3006 /* When rescan is not needed then recv_sys->addr_hash will have
3007 all space id belongs to redo log. If rescan is needed and
3008 innodb_force_recovery > 0 then InnoDB can ignore missing tablespace. */
3009 for (recv_spaces_t::iterator i = recv_spaces.begin();
3010 i != recv_spaces.end(); i++) {
3011
3012 if (i->second.status != file_name_t::MISSING) {
3013 continue;
3014 }
3015
3016 missing_tablespace = true;
3017
3018 if (srv_force_recovery > 0) {
3019 ib::warn() << "Tablespace " << i->first
3020 <<" was not found at " << i->second.name
3021 <<", and innodb_force_recovery was set."
3022 <<" All redo log for this tablespace"
3023 <<" will be ignored!";
3024 continue;
3025 }
3026
3027 if (!rescan) {
3028 ib::info() << "Tablespace " << i->first
3029 << " was not found at '"
3030 << i->second.name << "', but there"
3031 <<" were no modifications either.";
3032 }
3033 }
3034
3035 if (!rescan || srv_force_recovery > 0) {
3036 missing_tablespace = false;
3037 }
3038
3039 return DB_SUCCESS;
3040}
3041
3042/** Check if all tablespaces were found for crash recovery.
3043@param[in] rescan rescan of redo logs is needed
3044@param[out] missing_tablespace missing table exists
3045@return error code or DB_SUCCESS */
3046static MY_ATTRIBUTE((warn_unused_result))
3047dberr_t
3048recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3049{
3050 bool flag_deleted = false;
3051
3052 ut_ad(!srv_read_only_mode);
3053 ut_ad(recv_needed_recovery);
3054
3055 for (recv_spaces_t::iterator i = recv_spaces.begin();
3056 i != recv_spaces.end(); i++) {
3057 ut_ad(!is_predefined_tablespace(i->first));
3058 ut_ad(i->second.status != file_name_t::DELETED || !i->second.space);
3059
3060 if (i->second.status == file_name_t::DELETED) {
3061 /* The tablespace was deleted,
3062 so we can ignore any redo log for it. */
3063 flag_deleted = true;
3064 } else if (i->second.space != NULL) {
3065 /* The tablespace was found, and there
3066 are some redo log records for it. */
3067 fil_names_dirty(i->second.space);
3068 } else if (i->second.name == "") {
3069 ib::error() << "Missing MLOG_FILE_NAME"
3070 " or MLOG_FILE_DELETE"
3071 " before MLOG_CHECKPOINT for tablespace "
3072 << i->first;
3073 recv_sys->found_corrupt_log = true;
3074 return(DB_CORRUPTION);
3075 } else {
3076 i->second.status = file_name_t::MISSING;
3077 flag_deleted = true;
3078 }
3079
3080 ut_ad(i->second.status == file_name_t::DELETED || i->second.name != "");
3081 }
3082
3083 if (flag_deleted) {
3084 return recv_validate_tablespace(rescan, missing_tablespace);
3085 }
3086
3087 return DB_SUCCESS;
3088}
3089
3090/** Start recovering from a redo log checkpoint.
3091@see recv_recovery_from_checkpoint_finish
3092@param[in] flush_lsn FIL_PAGE_FILE_FLUSH_LSN
3093of first system tablespace page
3094@return error code or DB_SUCCESS */
3095dberr_t
3096recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3097{
3098 ulint max_cp_field;
3099 lsn_t checkpoint_lsn;
3100 bool rescan;
3101 ib_uint64_t checkpoint_no;
3102 lsn_t contiguous_lsn;
3103 byte* buf;
3104 dberr_t err = DB_SUCCESS;
3105
3106 ut_ad(srv_operation == SRV_OPERATION_NORMAL
3107 || srv_operation == SRV_OPERATION_RESTORE
3108 || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
3109
3110 /* Initialize red-black tree for fast insertions into the
3111 flush_list during recovery process. */
3112 buf_flush_init_flush_rbt();
3113
3114 if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3115
3116 ib::info() << "innodb_force_recovery=6 skips redo log apply";
3117
3118 return(DB_SUCCESS);
3119 }
3120
3121 recv_recovery_on = true;
3122
3123 log_mutex_enter();
3124
3125 err = recv_find_max_checkpoint(&max_cp_field);
3126
3127 if (err != DB_SUCCESS) {
3128
3129 srv_start_lsn = recv_sys->recovered_lsn = log_sys.lsn;
3130 log_mutex_exit();
3131 return(err);
3132 }
3133
3134 log_header_read(max_cp_field);
3135
3136 buf = log_sys.checkpoint_buf;
3137
3138 checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3139 checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3140
3141 /* Start reading the log from the checkpoint lsn. The variable
3142 contiguous_lsn contains an lsn up to which the log is known to
3143 be contiguously written. */
3144 recv_sys->mlog_checkpoint_lsn = 0;
3145
3146 ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3147
3148 const lsn_t end_lsn = mach_read_from_8(
3149 buf + LOG_CHECKPOINT_END_LSN);
3150
3151 ut_ad(recv_sys->n_addrs == 0);
3152 contiguous_lsn = checkpoint_lsn;
3153 switch (log_sys.log.format) {
3154 case 0:
3155 log_mutex_exit();
3156 return(recv_log_format_0_recover(checkpoint_lsn));
3157 default:
3158 if (end_lsn == 0) {
3159 break;
3160 }
3161 if (end_lsn >= checkpoint_lsn) {
3162 contiguous_lsn = end_lsn;
3163 break;
3164 }
3165 recv_sys->found_corrupt_log = true;
3166 log_mutex_exit();
3167 return(DB_ERROR);
3168 }
3169
3170 /* Look for MLOG_CHECKPOINT. */
3171 recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3172 /* The first scan should not have stored or applied any records. */
3173 ut_ad(recv_sys->n_addrs == 0);
3174 ut_ad(!recv_sys->found_corrupt_fs);
3175
3176 if (srv_read_only_mode && recv_needed_recovery) {
3177 log_mutex_exit();
3178 return(DB_READ_ONLY);
3179 }
3180
3181 if (recv_sys->found_corrupt_log && !srv_force_recovery) {
3182 log_mutex_exit();
3183 ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3184 return(DB_ERROR);
3185 }
3186
3187 if (recv_sys->mlog_checkpoint_lsn == 0) {
3188 lsn_t scan_lsn = log_sys.log.scanned_lsn;
3189 if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3190 log_mutex_exit();
3191 ib::error err;
3192 err << "Missing MLOG_CHECKPOINT";
3193 if (end_lsn) {
3194 err << " at " << end_lsn;
3195 }
3196 err << " between the checkpoint " << checkpoint_lsn
3197 << " and the end " << scan_lsn << ".";
3198 return(DB_ERROR);
3199 }
3200
3201 log_sys.log.scanned_lsn = checkpoint_lsn;
3202 rescan = false;
3203 } else {
3204 contiguous_lsn = checkpoint_lsn;
3205 rescan = recv_group_scan_log_recs(
3206 checkpoint_lsn, &contiguous_lsn, false);
3207
3208 if ((recv_sys->found_corrupt_log && !srv_force_recovery)
3209 || recv_sys->found_corrupt_fs) {
3210 log_mutex_exit();
3211 return(DB_ERROR);
3212 }
3213 }
3214
3215 /* NOTE: we always do a 'recovery' at startup, but only if
3216 there is something wrong we will print a message to the
3217 user about recovery: */
3218
3219 if (flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
3220 && recv_sys->mlog_checkpoint_lsn == checkpoint_lsn) {
3221 /* The redo log is logically empty. */
3222 } else if (checkpoint_lsn != flush_lsn) {
3223 ut_ad(!srv_log_files_created);
3224
3225 if (checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT < flush_lsn) {
3226 ib::warn() << "Are you sure you are using the"
3227 " right ib_logfiles to start up the database?"
3228 " Log sequence number in the ib_logfiles is "
3229 << checkpoint_lsn << ", less than the"
3230 " log sequence number in the first system"
3231 " tablespace file header, " << flush_lsn << ".";
3232 }
3233
3234 if (!recv_needed_recovery) {
3235
3236 ib::info() << "The log sequence number " << flush_lsn
3237 << " in the system tablespace does not match"
3238 " the log sequence number " << checkpoint_lsn
3239 << " in the ib_logfiles!";
3240
3241 if (srv_read_only_mode) {
3242 ib::error() << "innodb_read_only"
3243 " prevents crash recovery";
3244 log_mutex_exit();
3245 return(DB_READ_ONLY);
3246 }
3247
3248 recv_needed_recovery = true;
3249 }
3250 }
3251
3252 log_sys.lsn = recv_sys->recovered_lsn;
3253
3254 if (recv_needed_recovery) {
3255 bool missing_tablespace = false;
3256
3257 err = recv_init_crash_recovery_spaces(
3258 rescan, missing_tablespace);
3259
3260 if (err != DB_SUCCESS) {
3261 log_mutex_exit();
3262 return(err);
3263 }
3264
3265 /* If there is any missing tablespace and rescan is needed
3266 then there is a possiblity that hash table will not contain
3267 all space ids redo logs. Rescan the remaining unstored
3268 redo logs for the validation of missing tablespace. */
3269 while (missing_tablespace) {
3270 DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3271 "the missing tablespace. Scan "
3272 "from last stored LSN " LSN_PF,
3273 recv_sys->last_stored_lsn));
3274
3275 lsn_t recent_stored_lsn = recv_sys->last_stored_lsn;
3276 rescan = recv_group_scan_log_recs(
3277 checkpoint_lsn, &recent_stored_lsn, false);
3278
3279 ut_ad(!recv_sys->found_corrupt_fs);
3280
3281 missing_tablespace = false;
3282
3283 err = recv_sys->found_corrupt_log
3284 ? DB_ERROR
3285 : recv_validate_tablespace(
3286 rescan, missing_tablespace);
3287
3288 if (err != DB_SUCCESS) {
3289 log_mutex_exit();
3290 return err;
3291 }
3292 }
3293
3294 if (srv_operation == SRV_OPERATION_NORMAL) {
3295 buf_dblwr_process();
3296 }
3297
3298 ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3299
3300 /* Spawn the background thread to flush dirty pages
3301 from the buffer pools. */
3302 recv_writer_thread_active = true;
3303 os_thread_create(recv_writer_thread, 0, 0);
3304
3305 if (rescan) {
3306 contiguous_lsn = checkpoint_lsn;
3307
3308 recv_group_scan_log_recs(
3309 checkpoint_lsn, &contiguous_lsn, true);
3310
3311 if ((recv_sys->found_corrupt_log
3312 && !srv_force_recovery)
3313 || recv_sys->found_corrupt_fs) {
3314 log_mutex_exit();
3315 return(DB_ERROR);
3316 }
3317 }
3318 } else {
3319 ut_ad(!rescan || recv_sys->n_addrs == 0);
3320 }
3321
3322 if (log_sys.log.scanned_lsn < checkpoint_lsn
3323 || log_sys.log.scanned_lsn < recv_max_page_lsn) {
3324
3325 ib::error() << "We scanned the log up to "
3326 << log_sys.log.scanned_lsn
3327 << ". A checkpoint was at " << checkpoint_lsn << " and"
3328 " the maximum LSN on a database page was "
3329 << recv_max_page_lsn << ". It is possible that the"
3330 " database is now corrupt!";
3331 }
3332
3333 if (recv_sys->recovered_lsn < checkpoint_lsn) {
3334 log_mutex_exit();
3335
3336 ib::error() << "Recovered only to lsn:"
3337 << recv_sys->recovered_lsn << " checkpoint_lsn: " << checkpoint_lsn;
3338
3339 return(DB_ERROR);
3340 }
3341
3342 log_sys.next_checkpoint_lsn = checkpoint_lsn;
3343 log_sys.next_checkpoint_no = checkpoint_no + 1;
3344
3345 recv_synchronize_groups();
3346
3347 if (!recv_needed_recovery) {
3348 ut_a(checkpoint_lsn == recv_sys->recovered_lsn);
3349 } else {
3350 srv_start_lsn = recv_sys->recovered_lsn;
3351 }
3352
3353 log_sys.buf_free = ulong(log_sys.lsn % OS_FILE_LOG_BLOCK_SIZE);
3354 log_sys.buf_next_to_write = log_sys.buf_free;
3355 log_sys.write_lsn = log_sys.lsn;
3356
3357 log_sys.last_checkpoint_lsn = checkpoint_lsn;
3358
3359 if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL) {
3360 /* Write a MLOG_CHECKPOINT marker as the first thing,
3361 before generating any other redo log. This ensures
3362 that subsequent crash recovery will be possible even
3363 if the server were killed soon after this. */
3364 fil_names_clear(log_sys.last_checkpoint_lsn, true);
3365 }
3366
3367 MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
3368 log_sys.lsn - log_sys.last_checkpoint_lsn);
3369
3370 log_sys.next_checkpoint_no = ++checkpoint_no;
3371
3372 mutex_enter(&recv_sys->mutex);
3373
3374 recv_sys->apply_log_recs = TRUE;
3375
3376 mutex_exit(&recv_sys->mutex);
3377
3378 log_mutex_exit();
3379
3380 recv_lsn_checks_on = true;
3381
3382 /* The database is now ready to start almost normal processing of user
3383 transactions: transaction rollbacks and the application of the log
3384 records in the hash table can be run in background. */
3385
3386 return(DB_SUCCESS);
3387}
3388
3389/** Complete recovery from a checkpoint. */
3390void
3391recv_recovery_from_checkpoint_finish(void)
3392{
3393 /* Make sure that the recv_writer thread is done. This is
3394 required because it grabs various mutexes and we want to
3395 ensure that when we enable sync_order_checks there is no
3396 mutex currently held by any thread. */
3397 mutex_enter(&recv_sys->writer_mutex);
3398
3399 /* Free the resources of the recovery system */
3400 recv_recovery_on = false;
3401
3402 /* By acquring the mutex we ensure that the recv_writer thread
3403 won't trigger any more LRU batches. Now wait for currently
3404 in progress batches to finish. */
3405 buf_flush_wait_LRU_batch_end();
3406
3407 mutex_exit(&recv_sys->writer_mutex);
3408
3409 ulint count = 0;
3410 while (recv_writer_thread_active) {
3411 ++count;
3412 os_thread_sleep(100000);
3413 if (srv_print_verbose_log && count > 600) {
3414 ib::info() << "Waiting for recv_writer to"
3415 " finish flushing of buffer pool";
3416 count = 0;
3417 }
3418 }
3419
3420 recv_sys_debug_free();
3421
3422 /* Free up the flush_rbt. */
3423 buf_flush_free_flush_rbt();
3424}
3425
3426/********************************************************//**
3427Initiates the rollback of active transactions. */
3428void
3429recv_recovery_rollback_active(void)
3430/*===============================*/
3431{
3432 ut_ad(!recv_writer_thread_active);
3433
3434 /* Switch latching order checks on in sync0debug.cc, if
3435 --innodb-sync-debug=true (default) */
3436 ut_d(sync_check_enable());
3437
3438 /* We can't start any (DDL) transactions if UNDO logging
3439 has been disabled, additionally disable ROLLBACK of recovered
3440 user transactions. */
3441 if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO
3442 && !srv_read_only_mode) {
3443
3444 /* Drop partially created indexes. */
3445 row_merge_drop_temp_indexes();
3446 /* Drop garbage tables. */
3447 row_mysql_drop_garbage_tables();
3448
3449 /* Drop any auxiliary tables that were not dropped when the
3450 parent table was dropped. This can happen if the parent table
3451 was dropped but the server crashed before the auxiliary tables
3452 were dropped. */
3453 fts_drop_orphaned_tables();
3454
3455 /* Rollback the uncommitted transactions which have no user
3456 session */
3457
3458 trx_rollback_is_active = true;
3459 os_thread_create(trx_rollback_all_recovered, 0, 0);
3460 }
3461}
3462
3463/******************************************************//**
3464Resets the logs. The contents of log files will be lost! */
3465void
3466recv_reset_logs(
3467/*============*/
3468 lsn_t lsn) /*!< in: reset to this lsn
3469 rounded up to be divisible by
3470 OS_FILE_LOG_BLOCK_SIZE, after
3471 which we add
3472 LOG_BLOCK_HDR_SIZE */
3473{
3474 ut_ad(log_mutex_own());
3475
3476 log_sys.lsn = ut_uint64_align_up(lsn, OS_FILE_LOG_BLOCK_SIZE);
3477
3478 log_sys.log.lsn = log_sys.lsn;
3479 log_sys.log.lsn_offset = LOG_FILE_HDR_SIZE;
3480
3481 log_sys.buf_next_to_write = 0;
3482 log_sys.write_lsn = log_sys.lsn;
3483
3484 log_sys.next_checkpoint_no = 0;
3485 log_sys.last_checkpoint_lsn = 0;
3486
3487 memset(log_sys.buf, 0, srv_log_buffer_size);
3488 log_block_init(log_sys.buf, log_sys.lsn);
3489 log_block_set_first_rec_group(log_sys.buf, LOG_BLOCK_HDR_SIZE);
3490
3491 log_sys.buf_free = LOG_BLOCK_HDR_SIZE;
3492 log_sys.lsn += LOG_BLOCK_HDR_SIZE;
3493
3494 MONITOR_SET(MONITOR_LSN_CHECKPOINT_AGE,
3495 (log_sys.lsn - log_sys.last_checkpoint_lsn));
3496
3497 log_mutex_exit();
3498
3499 /* Reset the checkpoint fields in logs */
3500
3501 log_make_checkpoint_at(LSN_MAX, TRUE);
3502
3503 log_mutex_enter();
3504}
3505
3506/** Find a doublewrite copy of a page.
3507@param[in] space_id tablespace identifier
3508@param[in] page_no page number
3509@return page frame
3510@retval NULL if no page was found */
3511
3512const byte*
3513recv_dblwr_t::find_page(ulint space_id, ulint page_no)
3514{
3515 typedef std::vector<const byte*, ut_allocator<const byte*> >
3516 matches_t;
3517
3518 matches_t matches;
3519 const byte* result = 0;
3520
3521 for (list::iterator i = pages.begin(); i != pages.end(); ++i) {
3522 if (page_get_space_id(*i) == space_id
3523 && page_get_page_no(*i) == page_no) {
3524 matches.push_back(*i);
3525 }
3526 }
3527
3528 if (matches.size() == 1) {
3529 result = matches[0];
3530 } else if (matches.size() > 1) {
3531
3532 lsn_t max_lsn = 0;
3533 lsn_t page_lsn = 0;
3534
3535 for (matches_t::iterator i = matches.begin();
3536 i != matches.end();
3537 ++i) {
3538
3539 page_lsn = mach_read_from_8(*i + FIL_PAGE_LSN);
3540
3541 if (page_lsn > max_lsn) {
3542 max_lsn = page_lsn;
3543 result = *i;
3544 }
3545 }
3546 }
3547
3548 return(result);
3549}
3550
3551#ifndef DBUG_OFF
3552/** Return string name of the redo log record type.
3553@param[in] type record log record enum
3554@return string name of record log record */
3555const char*
3556get_mlog_string(mlog_id_t type)
3557{
3558 switch (type) {
3559 case MLOG_SINGLE_REC_FLAG:
3560 return("MLOG_SINGLE_REC_FLAG");
3561
3562 case MLOG_1BYTE:
3563 return("MLOG_1BYTE");
3564
3565 case MLOG_2BYTES:
3566 return("MLOG_2BYTES");
3567
3568 case MLOG_4BYTES:
3569 return("MLOG_4BYTES");
3570
3571 case MLOG_8BYTES:
3572 return("MLOG_8BYTES");
3573
3574 case MLOG_REC_INSERT:
3575 return("MLOG_REC_INSERT");
3576
3577 case MLOG_REC_CLUST_DELETE_MARK:
3578 return("MLOG_REC_CLUST_DELETE_MARK");
3579
3580 case MLOG_REC_SEC_DELETE_MARK:
3581 return("MLOG_REC_SEC_DELETE_MARK");
3582
3583 case MLOG_REC_UPDATE_IN_PLACE:
3584 return("MLOG_REC_UPDATE_IN_PLACE");
3585
3586 case MLOG_REC_DELETE:
3587 return("MLOG_REC_DELETE");
3588
3589 case MLOG_LIST_END_DELETE:
3590 return("MLOG_LIST_END_DELETE");
3591
3592 case MLOG_LIST_START_DELETE:
3593 return("MLOG_LIST_START_DELETE");
3594
3595 case MLOG_LIST_END_COPY_CREATED:
3596 return("MLOG_LIST_END_COPY_CREATED");
3597
3598 case MLOG_PAGE_REORGANIZE:
3599 return("MLOG_PAGE_REORGANIZE");
3600
3601 case MLOG_PAGE_CREATE:
3602 return("MLOG_PAGE_CREATE");
3603
3604 case MLOG_UNDO_INSERT:
3605 return("MLOG_UNDO_INSERT");
3606
3607 case MLOG_UNDO_ERASE_END:
3608 return("MLOG_UNDO_ERASE_END");
3609
3610 case MLOG_UNDO_INIT:
3611 return("MLOG_UNDO_INIT");
3612
3613 case MLOG_UNDO_HDR_REUSE:
3614 return("MLOG_UNDO_HDR_REUSE");
3615
3616 case MLOG_UNDO_HDR_CREATE:
3617 return("MLOG_UNDO_HDR_CREATE");
3618
3619 case MLOG_REC_MIN_MARK:
3620 return("MLOG_REC_MIN_MARK");
3621
3622 case MLOG_IBUF_BITMAP_INIT:
3623 return("MLOG_IBUF_BITMAP_INIT");
3624
3625#ifdef UNIV_LOG_LSN_DEBUG
3626 case MLOG_LSN:
3627 return("MLOG_LSN");
3628#endif /* UNIV_LOG_LSN_DEBUG */
3629
3630 case MLOG_WRITE_STRING:
3631 return("MLOG_WRITE_STRING");
3632
3633 case MLOG_MULTI_REC_END:
3634 return("MLOG_MULTI_REC_END");
3635
3636 case MLOG_DUMMY_RECORD:
3637 return("MLOG_DUMMY_RECORD");
3638
3639 case MLOG_FILE_DELETE:
3640 return("MLOG_FILE_DELETE");
3641
3642 case MLOG_COMP_REC_MIN_MARK:
3643 return("MLOG_COMP_REC_MIN_MARK");
3644
3645 case MLOG_COMP_PAGE_CREATE:
3646 return("MLOG_COMP_PAGE_CREATE");
3647
3648 case MLOG_COMP_REC_INSERT:
3649 return("MLOG_COMP_REC_INSERT");
3650
3651 case MLOG_COMP_REC_CLUST_DELETE_MARK:
3652 return("MLOG_COMP_REC_CLUST_DELETE_MARK");
3653
3654 case MLOG_COMP_REC_UPDATE_IN_PLACE:
3655 return("MLOG_COMP_REC_UPDATE_IN_PLACE");
3656
3657 case MLOG_COMP_REC_DELETE:
3658 return("MLOG_COMP_REC_DELETE");
3659
3660 case MLOG_COMP_LIST_END_DELETE:
3661 return("MLOG_COMP_LIST_END_DELETE");
3662
3663 case MLOG_COMP_LIST_START_DELETE:
3664 return("MLOG_COMP_LIST_START_DELETE");
3665
3666 case MLOG_COMP_LIST_END_COPY_CREATED:
3667 return("MLOG_COMP_LIST_END_COPY_CREATED");
3668
3669 case MLOG_COMP_PAGE_REORGANIZE:
3670 return("MLOG_COMP_PAGE_REORGANIZE");
3671
3672 case MLOG_FILE_CREATE2:
3673 return("MLOG_FILE_CREATE2");
3674
3675 case MLOG_ZIP_WRITE_NODE_PTR:
3676 return("MLOG_ZIP_WRITE_NODE_PTR");
3677
3678 case MLOG_ZIP_WRITE_BLOB_PTR:
3679 return("MLOG_ZIP_WRITE_BLOB_PTR");
3680
3681 case MLOG_ZIP_WRITE_HEADER:
3682 return("MLOG_ZIP_WRITE_HEADER");
3683
3684 case MLOG_ZIP_PAGE_COMPRESS:
3685 return("MLOG_ZIP_PAGE_COMPRESS");
3686
3687 case MLOG_ZIP_PAGE_COMPRESS_NO_DATA:
3688 return("MLOG_ZIP_PAGE_COMPRESS_NO_DATA");
3689
3690 case MLOG_ZIP_PAGE_REORGANIZE:
3691 return("MLOG_ZIP_PAGE_REORGANIZE");
3692
3693 case MLOG_ZIP_WRITE_TRX_ID:
3694 return("MLOG_ZIP_WRITE_TRX_ID");
3695
3696 case MLOG_FILE_RENAME2:
3697 return("MLOG_FILE_RENAME2");
3698
3699 case MLOG_FILE_NAME:
3700 return("MLOG_FILE_NAME");
3701
3702 case MLOG_CHECKPOINT:
3703 return("MLOG_CHECKPOINT");
3704
3705 case MLOG_PAGE_CREATE_RTREE:
3706 return("MLOG_PAGE_CREATE_RTREE");
3707
3708 case MLOG_COMP_PAGE_CREATE_RTREE:
3709 return("MLOG_COMP_PAGE_CREATE_RTREE");
3710
3711 case MLOG_INIT_FILE_PAGE2:
3712 return("MLOG_INIT_FILE_PAGE2");
3713
3714 case MLOG_INDEX_LOAD:
3715 return("MLOG_INDEX_LOAD");
3716
3717 case MLOG_TRUNCATE:
3718 return("MLOG_TRUNCATE");
3719
3720 case MLOG_FILE_WRITE_CRYPT_DATA:
3721 return("MLOG_FILE_WRITE_CRYPT_DATA");
3722 }
3723 DBUG_ASSERT(0);
3724 return(NULL);
3725}
3726#endif /* !DBUG_OFF */
3727