1/*****************************************************************************
2
3Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2013, 2017, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file buf/buf0dblwr.cc
22Doublwrite buffer module
23
24Created 2011/12/19
25*******************************************************/
26
27#include "ha_prototypes.h"
28#include "buf0dblwr.h"
29#include "buf0buf.h"
30#include "buf0checksum.h"
31#include "srv0start.h"
32#include "srv0srv.h"
33#include "page0zip.h"
34#include "trx0sys.h"
35#include "fil0crypt.h"
36#include "fil0pagecompress.h"
37
38/** The doublewrite buffer */
39buf_dblwr_t* buf_dblwr = NULL;
40
41/** Set to TRUE when the doublewrite buffer is being created */
42ibool buf_dblwr_being_created = FALSE;
43
44#define TRX_SYS_DOUBLEWRITE_BLOCKS 2
45
46/****************************************************************//**
47Determines if a page number is located inside the doublewrite buffer.
48@return TRUE if the location is inside the two blocks of the
49doublewrite buffer */
50ibool
51buf_dblwr_page_inside(
52/*==================*/
53 ulint page_no) /*!< in: page number */
54{
55 if (buf_dblwr == NULL) {
56
57 return(FALSE);
58 }
59
60 if (page_no >= buf_dblwr->block1
61 && page_no < buf_dblwr->block1
62 + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
63 return(TRUE);
64 }
65
66 if (page_no >= buf_dblwr->block2
67 && page_no < buf_dblwr->block2
68 + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
69 return(TRUE);
70 }
71
72 return(FALSE);
73}
74
75/****************************************************************//**
76Calls buf_page_get() on the TRX_SYS_PAGE and returns a pointer to the
77doublewrite buffer within it.
78@return pointer to the doublewrite buffer within the filespace header
79page. */
80UNIV_INLINE
81byte*
82buf_dblwr_get(
83/*==========*/
84 mtr_t* mtr) /*!< in/out: MTR to hold the page latch */
85{
86 buf_block_t* block;
87
88 block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
89 univ_page_size, RW_X_LATCH, mtr);
90
91 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
92
93 return(buf_block_get_frame(block) + TRX_SYS_DOUBLEWRITE);
94}
95
96/********************************************************************//**
97Flush a batch of writes to the datafiles that have already been
98written to the dblwr buffer on disk. */
99void
100buf_dblwr_sync_datafiles()
101/*======================*/
102{
103 /* Wake possible simulated aio thread to actually post the
104 writes to the operating system */
105 os_aio_simulated_wake_handler_threads();
106
107 /* Wait that all async writes to tablespaces have been posted to
108 the OS */
109 os_aio_wait_until_no_pending_writes();
110}
111
112/****************************************************************//**
113Creates or initialializes the doublewrite buffer at a database start. */
114static
115void
116buf_dblwr_init(
117/*===========*/
118 byte* doublewrite) /*!< in: pointer to the doublewrite buf
119 header on trx sys page */
120{
121 ulint buf_size;
122
123 buf_dblwr = static_cast<buf_dblwr_t*>(
124 ut_zalloc_nokey(sizeof(buf_dblwr_t)));
125
126 /* There are two blocks of same size in the doublewrite
127 buffer. */
128 buf_size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
129
130 /* There must be atleast one buffer for single page writes
131 and one buffer for batch writes. */
132 ut_a(srv_doublewrite_batch_size > 0
133 && srv_doublewrite_batch_size < buf_size);
134
135 mutex_create(LATCH_ID_BUF_DBLWR, &buf_dblwr->mutex);
136
137 buf_dblwr->b_event = os_event_create("dblwr_batch_event");
138 buf_dblwr->s_event = os_event_create("dblwr_single_event");
139 buf_dblwr->first_free = 0;
140 buf_dblwr->s_reserved = 0;
141 buf_dblwr->b_reserved = 0;
142
143 buf_dblwr->block1 = mach_read_from_4(
144 doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK1);
145 buf_dblwr->block2 = mach_read_from_4(
146 doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK2);
147
148 buf_dblwr->in_use = static_cast<bool*>(
149 ut_zalloc_nokey(buf_size * sizeof(bool)));
150
151 buf_dblwr->write_buf_unaligned = static_cast<byte*>(
152 ut_malloc_nokey((1 + buf_size) << srv_page_size_shift));
153
154 buf_dblwr->write_buf = static_cast<byte*>(
155 ut_align(buf_dblwr->write_buf_unaligned,
156 srv_page_size));
157
158 buf_dblwr->buf_block_arr = static_cast<buf_page_t**>(
159 ut_zalloc_nokey(buf_size * sizeof(void*)));
160}
161
162/** Create the doublewrite buffer if the doublewrite buffer header
163is not present in the TRX_SYS page.
164@return whether the operation succeeded
165@retval true if the doublewrite buffer exists or was created
166@retval false if the creation failed (too small first data file) */
167bool
168buf_dblwr_create()
169{
170 buf_block_t* block2;
171 buf_block_t* new_block;
172 byte* doublewrite;
173 byte* fseg_header;
174 ulint page_no;
175 ulint prev_page_no;
176 ulint i;
177 mtr_t mtr;
178
179 if (buf_dblwr) {
180 /* Already inited */
181 return(true);
182 }
183
184start_again:
185 mtr.start();
186 buf_dblwr_being_created = TRUE;
187
188 doublewrite = buf_dblwr_get(&mtr);
189
190 if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
191 == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
192 /* The doublewrite buffer has already been created:
193 just read in some numbers */
194
195 buf_dblwr_init(doublewrite);
196
197 mtr.commit();
198 buf_dblwr_being_created = FALSE;
199 return(true);
200 } else {
201 if (UT_LIST_GET_FIRST(fil_system.sys_space->chain)->size
202 < 3 * FSP_EXTENT_SIZE) {
203 goto too_small;
204 }
205 }
206
207 block2 = fseg_create(fil_system.sys_space, TRX_SYS_PAGE_NO,
208 TRX_SYS_DOUBLEWRITE
209 + TRX_SYS_DOUBLEWRITE_FSEG, &mtr);
210
211 if (block2 == NULL) {
212too_small:
213 ib::error()
214 << "Cannot create doublewrite buffer: "
215 "the first file in innodb_data_file_path"
216 " must be at least "
217 << (3 * (FSP_EXTENT_SIZE
218 >> (20U - srv_page_size_shift)))
219 << "M.";
220 mtr.commit();
221 return(false);
222 }
223
224 ib::info() << "Doublewrite buffer not found: creating new";
225
226 /* FIXME: After this point, the doublewrite buffer creation
227 is not atomic. The doublewrite buffer should not exist in
228 the InnoDB system tablespace file in the first place.
229 It could be located in separate optional file(s) in a
230 user-specified location. */
231
232 /* fseg_create acquires a second latch on the page,
233 therefore we must declare it: */
234
235 buf_block_dbg_add_level(block2, SYNC_NO_ORDER_CHECK);
236
237 fseg_header = doublewrite + TRX_SYS_DOUBLEWRITE_FSEG;
238 prev_page_no = 0;
239
240 for (i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
241 + FSP_EXTENT_SIZE / 2; i++) {
242 new_block = fseg_alloc_free_page(
243 fseg_header, prev_page_no + 1, FSP_UP, &mtr);
244 if (new_block == NULL) {
245 ib::error() << "Cannot create doublewrite buffer: "
246 " you must increase your tablespace size."
247 " Cannot continue operation.";
248 /* This may essentially corrupt the doublewrite
249 buffer. However, usually the doublewrite buffer
250 is created at database initialization, and it
251 should not matter (just remove all newly created
252 InnoDB files and restart). */
253 mtr.commit();
254 return(false);
255 }
256
257 /* We read the allocated pages to the buffer pool;
258 when they are written to disk in a flush, the space
259 id and page number fields are also written to the
260 pages. When we at database startup read pages
261 from the doublewrite buffer, we know that if the
262 space id and page number in them are the same as
263 the page position in the tablespace, then the page
264 has not been written to in doublewrite. */
265
266 ut_ad(rw_lock_get_x_lock_count(&new_block->lock) == 1);
267 page_no = new_block->page.id.page_no();
268
269 if (i == FSP_EXTENT_SIZE / 2) {
270 ut_a(page_no == FSP_EXTENT_SIZE);
271 mlog_write_ulint(doublewrite
272 + TRX_SYS_DOUBLEWRITE_BLOCK1,
273 page_no, MLOG_4BYTES, &mtr);
274 mlog_write_ulint(doublewrite
275 + TRX_SYS_DOUBLEWRITE_REPEAT
276 + TRX_SYS_DOUBLEWRITE_BLOCK1,
277 page_no, MLOG_4BYTES, &mtr);
278
279 } else if (i == FSP_EXTENT_SIZE / 2
280 + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
281 ut_a(page_no == 2 * FSP_EXTENT_SIZE);
282 mlog_write_ulint(doublewrite
283 + TRX_SYS_DOUBLEWRITE_BLOCK2,
284 page_no, MLOG_4BYTES, &mtr);
285 mlog_write_ulint(doublewrite
286 + TRX_SYS_DOUBLEWRITE_REPEAT
287 + TRX_SYS_DOUBLEWRITE_BLOCK2,
288 page_no, MLOG_4BYTES, &mtr);
289
290 } else if (i > FSP_EXTENT_SIZE / 2) {
291 ut_a(page_no == prev_page_no + 1);
292 }
293
294 if (((i + 1) & 15) == 0) {
295 /* rw_locks can only be recursively x-locked
296 2048 times. (on 32 bit platforms,
297 (lint) 0 - (X_LOCK_DECR * 2049)
298 is no longer a negative number, and thus
299 lock_word becomes like a shared lock).
300 For 4k page size this loop will
301 lock the fseg header too many times. Since
302 this code is not done while any other threads
303 are active, restart the MTR occasionally. */
304 mtr_commit(&mtr);
305 mtr_start(&mtr);
306 doublewrite = buf_dblwr_get(&mtr);
307 fseg_header = doublewrite
308 + TRX_SYS_DOUBLEWRITE_FSEG;
309 }
310
311 prev_page_no = page_no;
312 }
313
314 mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC,
315 TRX_SYS_DOUBLEWRITE_MAGIC_N,
316 MLOG_4BYTES, &mtr);
317 mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC
318 + TRX_SYS_DOUBLEWRITE_REPEAT,
319 TRX_SYS_DOUBLEWRITE_MAGIC_N,
320 MLOG_4BYTES, &mtr);
321
322 mlog_write_ulint(doublewrite
323 + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED,
324 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N,
325 MLOG_4BYTES, &mtr);
326 mtr_commit(&mtr);
327
328 /* Flush the modified pages to disk and make a checkpoint */
329 log_make_checkpoint_at(LSN_MAX, TRUE);
330
331 /* Remove doublewrite pages from LRU */
332 buf_pool_invalidate();
333
334 ib::info() << "Doublewrite buffer created";
335
336 goto start_again;
337}
338
339/**
340At database startup initializes the doublewrite buffer memory structure if
341we already have a doublewrite buffer created in the data files. If we are
342upgrading to an InnoDB version which supports multiple tablespaces, then this
343function performs the necessary update operations. If we are in a crash
344recovery, this function loads the pages from double write buffer into memory.
345@param[in] file File handle
346@param[in] path Path name of file
347@return DB_SUCCESS or error code */
348dberr_t
349buf_dblwr_init_or_load_pages(
350 pfs_os_file_t file,
351 const char* path)
352{
353 byte* buf;
354 byte* page;
355 ulint block1;
356 ulint block2;
357 ulint space_id;
358 byte* read_buf;
359 byte* doublewrite;
360 byte* unaligned_read_buf;
361 ibool reset_space_ids = FALSE;
362 recv_dblwr_t& recv_dblwr = recv_sys->dblwr;
363
364 /* We do the file i/o past the buffer pool */
365
366 unaligned_read_buf = static_cast<byte*>(
367 ut_malloc_nokey(3U << srv_page_size_shift));
368
369 read_buf = static_cast<byte*>(
370 ut_align(unaligned_read_buf, srv_page_size));
371
372 /* Read the trx sys header to check if we are using the doublewrite
373 buffer */
374 dberr_t err;
375
376 IORequest read_request(IORequest::READ);
377
378 err = os_file_read(
379 read_request,
380 file, read_buf, TRX_SYS_PAGE_NO << srv_page_size_shift,
381 srv_page_size);
382
383 if (err != DB_SUCCESS) {
384
385 ib::error()
386 << "Failed to read the system tablespace header page";
387
388 ut_free(unaligned_read_buf);
389
390 return(err);
391 }
392
393 doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
394
395 /* TRX_SYS_PAGE_NO is not encrypted see fil_crypt_rotate_page() */
396
397 if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
398 == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
399 /* The doublewrite buffer has been created */
400
401 buf_dblwr_init(doublewrite);
402
403 block1 = buf_dblwr->block1;
404 block2 = buf_dblwr->block2;
405
406 buf = buf_dblwr->write_buf;
407 } else {
408 ut_free(unaligned_read_buf);
409 return(DB_SUCCESS);
410 }
411
412 if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED)
413 != TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N) {
414
415 /* We are upgrading from a version < 4.1.x to a version where
416 multiple tablespaces are supported. We must reset the space id
417 field in the pages in the doublewrite buffer because starting
418 from this version the space id is stored to
419 FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
420
421 reset_space_ids = TRUE;
422
423 ib::info() << "Resetting space id's in the doublewrite buffer";
424 }
425
426 /* Read the pages from the doublewrite buffer to memory */
427 err = os_file_read(
428 read_request,
429 file, buf, block1 << srv_page_size_shift,
430 TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
431
432 if (err != DB_SUCCESS) {
433
434 ib::error()
435 << "Failed to read the first double write buffer "
436 "extent";
437
438 ut_free(unaligned_read_buf);
439
440 return(err);
441 }
442
443 err = os_file_read(
444 read_request,
445 file,
446 buf + (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift),
447 block2 << srv_page_size_shift,
448 TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
449
450 if (err != DB_SUCCESS) {
451
452 ib::error()
453 << "Failed to read the second double write buffer "
454 "extent";
455
456 ut_free(unaligned_read_buf);
457
458 return(err);
459 }
460
461 /* Check if any of these pages is half-written in data files, in the
462 intended position */
463
464 page = buf;
465
466 for (ulint i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * 2; i++) {
467 if (reset_space_ids) {
468 ulint source_page_no;
469
470 space_id = 0;
471 mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
472 space_id);
473 /* We do not need to calculate new checksums for the
474 pages because the field .._SPACE_ID does not affect
475 them. Write the page back to where we read it from. */
476
477 if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
478 source_page_no = block1 + i;
479 } else {
480 source_page_no = block2
481 + i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
482 }
483
484 IORequest write_request(IORequest::WRITE);
485
486 err = os_file_write(
487 write_request, path, file, page,
488 source_page_no << srv_page_size_shift,
489 srv_page_size);
490 if (err != DB_SUCCESS) {
491
492 ib::error()
493 << "Failed to write to the double write"
494 " buffer";
495
496 ut_free(unaligned_read_buf);
497
498 return(err);
499 }
500
501 } else if (memcmp(field_ref_zero, page + FIL_PAGE_LSN, 8)) {
502 /* Each valid page header must contain
503 a nonzero FIL_PAGE_LSN field. */
504 recv_dblwr.add(page);
505 }
506
507 page += srv_page_size;
508 }
509
510 if (reset_space_ids) {
511 os_file_flush(file);
512 }
513
514 ut_free(unaligned_read_buf);
515
516 return(DB_SUCCESS);
517}
518
519/** Process and remove the double write buffer pages for all tablespaces. */
520void
521buf_dblwr_process()
522{
523 ulint page_no_dblwr = 0;
524 byte* read_buf;
525 byte* unaligned_read_buf;
526 recv_dblwr_t& recv_dblwr = recv_sys->dblwr;
527
528 if (!buf_dblwr) {
529 return;
530 }
531
532 unaligned_read_buf = static_cast<byte*>(
533 ut_malloc_nokey(2U << srv_page_size_shift));
534
535 read_buf = static_cast<byte*>(
536 ut_align(unaligned_read_buf, srv_page_size));
537
538 for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
539 i != recv_dblwr.pages.end();
540 ++i, ++page_no_dblwr) {
541 byte* page = *i;
542 ulint space_id = page_get_space_id(page);
543 fil_space_t* space = fil_space_get(space_id);
544
545 if (space == NULL) {
546 /* Maybe we have dropped the tablespace
547 and this page once belonged to it: do nothing */
548 continue;
549 }
550
551 fil_space_open_if_needed(space);
552
553 const ulint page_no = page_get_page_no(page);
554 const page_id_t page_id(space_id, page_no);
555
556 if (page_no >= space->size) {
557
558 /* Do not report the warning if the tablespace
559 is scheduled for truncation or was truncated
560 and we have parsed an MLOG_TRUNCATE record. */
561 if (!srv_is_tablespace_truncated(space_id)
562 && !srv_was_tablespace_truncated(space)) {
563 ib::warn() << "A copy of page " << page_id
564 << " in the doublewrite buffer slot "
565 << page_no_dblwr
566 << " is not within space bounds";
567 }
568 continue;
569 }
570
571 const page_size_t page_size(space->flags);
572 ut_ad(!buf_page_is_zeroes(page, page_size));
573
574 /* We want to ensure that for partial reads the
575 unread portion of the page is NUL. */
576 memset(read_buf, 0x0, page_size.physical());
577
578 IORequest request;
579
580 request.dblwr_recover();
581
582 /* Read in the actual page from the file */
583 dberr_t err = fil_io(
584 request, true,
585 page_id, page_size,
586 0, page_size.physical(), read_buf, NULL);
587
588 if (err != DB_SUCCESS) {
589 ib::warn()
590 << "Double write buffer recovery: "
591 << page_id << " read failed with "
592 << "error: " << ut_strerr(err);
593 }
594
595 const bool is_all_zero = buf_page_is_zeroes(
596 read_buf, page_size);
597
598 if (is_all_zero) {
599 /* We will check if the copy in the
600 doublewrite buffer is valid. If not, we will
601 ignore this page (there should be redo log
602 records to initialize it). */
603 } else {
604 if (fil_page_is_compressed_encrypted(read_buf) ||
605 fil_page_is_compressed(read_buf)) {
606 /* Decompress the page before
607 validating the checksum. */
608 fil_decompress_page(
609 NULL, read_buf, srv_page_size,
610 NULL, true);
611 }
612
613 if (fil_space_verify_crypt_checksum(
614 read_buf, page_size, space_id, page_no)
615 || !buf_page_is_corrupted(
616 true, read_buf, page_size, space)) {
617 /* The page is good; there is no need
618 to consult the doublewrite buffer. */
619 continue;
620 }
621
622 /* We intentionally skip this message for
623 is_all_zero pages. */
624 ib::info()
625 << "Trying to recover page " << page_id
626 << " from the doublewrite buffer.";
627 }
628
629 /* Next, validate the doublewrite page. */
630 if (fil_page_is_compressed_encrypted(page) ||
631 fil_page_is_compressed(page)) {
632 /* Decompress the page before
633 validating the checksum. */
634 fil_decompress_page(
635 NULL, page, srv_page_size, NULL, true);
636 }
637
638 if (!fil_space_verify_crypt_checksum(page, page_size,
639 space_id, page_no)
640 && buf_page_is_corrupted(true, page, page_size, space)) {
641 if (!is_all_zero) {
642 ib::warn() << "A doublewrite copy of page "
643 << page_id << " is corrupted.";
644 }
645 /* Theoretically we could have another good
646 copy for this page in the doublewrite
647 buffer. If not, we will report a fatal error
648 for a corrupted page somewhere else if that
649 page was truly needed. */
650 continue;
651 }
652
653 if (page_no == 0) {
654 /* Check the FSP_SPACE_FLAGS. */
655 ulint flags = fsp_header_get_flags(page);
656 if (!fsp_flags_is_valid(flags, space_id)
657 && fsp_flags_convert_from_101(flags)
658 == ULINT_UNDEFINED) {
659 ib::warn() << "Ignoring a doublewrite copy"
660 " of page " << page_id
661 << " due to invalid flags "
662 << ib::hex(flags);
663 continue;
664 }
665 /* The flags on the page should be converted later. */
666 }
667
668 /* Write the good page from the doublewrite buffer to
669 the intended position. */
670
671 IORequest write_request(IORequest::WRITE);
672
673 fil_io(write_request, true, page_id, page_size,
674 0, page_size.physical(),
675 const_cast<byte*>(page), NULL);
676
677 ib::info() << "Recovered page " << page_id
678 << " from the doublewrite buffer.";
679 }
680
681 recv_dblwr.pages.clear();
682
683 fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
684 ut_free(unaligned_read_buf);
685}
686
687/****************************************************************//**
688Frees doublewrite buffer. */
689void
690buf_dblwr_free()
691{
692 /* Free the double write data structures. */
693 ut_a(buf_dblwr != NULL);
694 ut_ad(buf_dblwr->s_reserved == 0);
695 ut_ad(buf_dblwr->b_reserved == 0);
696
697 os_event_destroy(buf_dblwr->b_event);
698 os_event_destroy(buf_dblwr->s_event);
699 ut_free(buf_dblwr->write_buf_unaligned);
700 buf_dblwr->write_buf_unaligned = NULL;
701
702 ut_free(buf_dblwr->buf_block_arr);
703 buf_dblwr->buf_block_arr = NULL;
704
705 ut_free(buf_dblwr->in_use);
706 buf_dblwr->in_use = NULL;
707
708 mutex_free(&buf_dblwr->mutex);
709 ut_free(buf_dblwr);
710 buf_dblwr = NULL;
711}
712
713/********************************************************************//**
714Updates the doublewrite buffer when an IO request is completed. */
715void
716buf_dblwr_update(
717/*=============*/
718 const buf_page_t* bpage, /*!< in: buffer block descriptor */
719 buf_flush_t flush_type)/*!< in: flush type */
720{
721 ut_ad(srv_use_doublewrite_buf);
722 ut_ad(buf_dblwr);
723 ut_ad(!fsp_is_system_temporary(bpage->id.space()));
724 ut_ad(!srv_read_only_mode);
725
726 switch (flush_type) {
727 case BUF_FLUSH_LIST:
728 case BUF_FLUSH_LRU:
729 mutex_enter(&buf_dblwr->mutex);
730
731 ut_ad(buf_dblwr->batch_running);
732 ut_ad(buf_dblwr->b_reserved > 0);
733 ut_ad(buf_dblwr->b_reserved <= buf_dblwr->first_free);
734
735 buf_dblwr->b_reserved--;
736
737 if (buf_dblwr->b_reserved == 0) {
738 mutex_exit(&buf_dblwr->mutex);
739 /* This will finish the batch. Sync data files
740 to the disk. */
741 fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
742 mutex_enter(&buf_dblwr->mutex);
743
744 /* We can now reuse the doublewrite memory buffer: */
745 buf_dblwr->first_free = 0;
746 buf_dblwr->batch_running = false;
747 os_event_set(buf_dblwr->b_event);
748 }
749
750 mutex_exit(&buf_dblwr->mutex);
751 break;
752 case BUF_FLUSH_SINGLE_PAGE:
753 {
754 const ulint size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
755 ulint i;
756 mutex_enter(&buf_dblwr->mutex);
757 for (i = srv_doublewrite_batch_size; i < size; ++i) {
758 if (buf_dblwr->buf_block_arr[i] == bpage) {
759 buf_dblwr->s_reserved--;
760 buf_dblwr->buf_block_arr[i] = NULL;
761 buf_dblwr->in_use[i] = false;
762 break;
763 }
764 }
765
766 /* The block we are looking for must exist as a
767 reserved block. */
768 ut_a(i < size);
769 }
770 os_event_set(buf_dblwr->s_event);
771 mutex_exit(&buf_dblwr->mutex);
772 break;
773 case BUF_FLUSH_N_TYPES:
774 ut_error;
775 }
776}
777
778/********************************************************************//**
779Check the LSN values on the page. */
780static
781void
782buf_dblwr_check_page_lsn(
783/*=====================*/
784 const page_t* page) /*!< in: page to check */
785{
786 ibool page_compressed = (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED);
787 uint key_version = mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
788
789 /* Ignore page compressed or encrypted pages */
790 if (page_compressed || key_version) {
791 return;
792 }
793
794 if (memcmp(page + (FIL_PAGE_LSN + 4),
795 page + (srv_page_size
796 - FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
797 4)) {
798
799 const ulint lsn1 = mach_read_from_4(
800 page + FIL_PAGE_LSN + 4);
801 const ulint lsn2 = mach_read_from_4(
802 page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM
803 + 4);
804
805 ib::error() << "The page to be written seems corrupt!"
806 " The low 4 bytes of LSN fields do not match"
807 " (" << lsn1 << " != " << lsn2 << ")!"
808 " Noticed in the buffer pool.";
809 }
810}
811
812/********************************************************************//**
813Asserts when a corrupt block is find during writing out data to the
814disk. */
815static
816void
817buf_dblwr_assert_on_corrupt_block(
818/*==============================*/
819 const buf_block_t* block) /*!< in: block to check */
820{
821 buf_page_print(block->frame, univ_page_size);
822
823 ib::fatal() << "Apparent corruption of an index page "
824 << block->page.id
825 << " to be written to data file. We intentionally crash"
826 " the server to prevent corrupt data from ending up in"
827 " data files.";
828}
829
830/********************************************************************//**
831Check the LSN values on the page with which this block is associated.
832Also validate the page if the option is set. */
833static
834void
835buf_dblwr_check_block(
836/*==================*/
837 const buf_block_t* block) /*!< in: block to check */
838{
839 ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
840
841 if (block->skip_flush_check) {
842 return;
843 }
844
845 switch (fil_page_get_type(block->frame)) {
846 case FIL_PAGE_INDEX:
847 case FIL_PAGE_TYPE_INSTANT:
848 case FIL_PAGE_RTREE:
849 if (page_is_comp(block->frame)) {
850 if (page_simple_validate_new(block->frame)) {
851 return;
852 }
853 } else if (page_simple_validate_old(block->frame)) {
854 return;
855 }
856 /* While it is possible that this is not an index page
857 but just happens to have wrongly set FIL_PAGE_TYPE,
858 such pages should never be modified to without also
859 adjusting the page type during page allocation or
860 buf_flush_init_for_writing() or fil_page_reset_type(). */
861 break;
862 case FIL_PAGE_TYPE_FSP_HDR:
863 case FIL_PAGE_IBUF_BITMAP:
864 case FIL_PAGE_TYPE_UNKNOWN:
865 /* Do not complain again, we already reset this field. */
866 case FIL_PAGE_UNDO_LOG:
867 case FIL_PAGE_INODE:
868 case FIL_PAGE_IBUF_FREE_LIST:
869 case FIL_PAGE_TYPE_SYS:
870 case FIL_PAGE_TYPE_TRX_SYS:
871 case FIL_PAGE_TYPE_XDES:
872 case FIL_PAGE_TYPE_BLOB:
873 case FIL_PAGE_TYPE_ZBLOB:
874 case FIL_PAGE_TYPE_ZBLOB2:
875 /* TODO: validate also non-index pages */
876 return;
877 case FIL_PAGE_TYPE_ALLOCATED:
878 /* empty pages should never be flushed */
879 return;
880 }
881
882 buf_dblwr_assert_on_corrupt_block(block);
883}
884
885/********************************************************************//**
886Writes a page that has already been written to the doublewrite buffer
887to the datafile. It is the job of the caller to sync the datafile. */
888static
889void
890buf_dblwr_write_block_to_datafile(
891/*==============================*/
892 const buf_page_t* bpage, /*!< in: page to write */
893 bool sync) /*!< in: true if sync IO
894 is requested */
895{
896 ut_a(buf_page_in_file(bpage));
897
898 ulint type = IORequest::WRITE;
899
900 if (sync) {
901 type |= IORequest::DO_NOT_WAKE;
902 }
903
904 IORequest request(type, const_cast<buf_page_t*>(bpage));
905
906 /* We request frame here to get correct buffer in case of
907 encryption and/or page compression */
908 void * frame = buf_page_get_frame(bpage);
909
910 if (bpage->zip.data != NULL) {
911 ut_ad(bpage->size.is_compressed());
912
913 fil_io(request, sync, bpage->id, bpage->size, 0,
914 bpage->size.physical(),
915 (void*) frame,
916 (void*) bpage);
917 } else {
918 ut_ad(!bpage->size.is_compressed());
919
920 /* Our IO API is common for both reads and writes and is
921 therefore geared towards a non-const parameter. */
922
923 buf_block_t* block = reinterpret_cast<buf_block_t*>(
924 const_cast<buf_page_t*>(bpage));
925
926 ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
927 buf_dblwr_check_page_lsn(block->frame);
928
929 fil_io(request,
930 sync, bpage->id, bpage->size, 0, bpage->real_size,
931 frame, block);
932 }
933}
934
935/********************************************************************//**
936Flushes possible buffered writes from the doublewrite memory buffer to disk,
937and also wakes up the aio thread if simulated aio is used. It is very
938important to call this function after a batch of writes has been posted,
939and also when we may have to wait for a page latch! Otherwise a deadlock
940of threads can occur. */
941void
942buf_dblwr_flush_buffered_writes()
943{
944 byte* write_buf;
945 ulint first_free;
946 ulint len;
947
948 if (!srv_use_doublewrite_buf || buf_dblwr == NULL) {
949 /* Sync the writes to the disk. */
950 buf_dblwr_sync_datafiles();
951 /* Now we flush the data to disk (for example, with fsync) */
952 fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
953 return;
954 }
955
956 ut_ad(!srv_read_only_mode);
957
958try_again:
959 mutex_enter(&buf_dblwr->mutex);
960
961 /* Write first to doublewrite buffer blocks. We use synchronous
962 aio and thus know that file write has been completed when the
963 control returns. */
964
965 if (buf_dblwr->first_free == 0) {
966
967 mutex_exit(&buf_dblwr->mutex);
968
969 /* Wake possible simulated aio thread as there could be
970 system temporary tablespace pages active for flushing.
971 Note: system temporary tablespace pages are not scheduled
972 for doublewrite. */
973 os_aio_simulated_wake_handler_threads();
974
975 return;
976 }
977
978 if (buf_dblwr->batch_running) {
979 /* Another thread is running the batch right now. Wait
980 for it to finish. */
981 int64_t sig_count = os_event_reset(buf_dblwr->b_event);
982 mutex_exit(&buf_dblwr->mutex);
983
984 os_event_wait_low(buf_dblwr->b_event, sig_count);
985 goto try_again;
986 }
987
988 ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
989
990 /* Disallow anyone else to post to doublewrite buffer or to
991 start another batch of flushing. */
992 buf_dblwr->batch_running = true;
993 first_free = buf_dblwr->first_free;
994
995 /* Now safe to release the mutex. Note that though no other
996 thread is allowed to post to the doublewrite batch flushing
997 but any threads working on single page flushes are allowed
998 to proceed. */
999 mutex_exit(&buf_dblwr->mutex);
1000
1001 write_buf = buf_dblwr->write_buf;
1002
1003 for (ulint len2 = 0, i = 0;
1004 i < buf_dblwr->first_free;
1005 len2 += srv_page_size, i++) {
1006
1007 const buf_block_t* block;
1008
1009 block = (buf_block_t*) buf_dblwr->buf_block_arr[i];
1010
1011 if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
1012 || block->page.zip.data) {
1013 /* No simple validate for compressed
1014 pages exists. */
1015 continue;
1016 }
1017
1018 /* Check that the actual page in the buffer pool is
1019 not corrupt and the LSN values are sane. */
1020 buf_dblwr_check_block(block);
1021
1022 /* Check that the page as written to the doublewrite
1023 buffer has sane LSN values. */
1024 buf_dblwr_check_page_lsn(write_buf + len2);
1025 }
1026
1027 /* Write out the first block of the doublewrite buffer */
1028 len = std::min<ulint>(TRX_SYS_DOUBLEWRITE_BLOCK_SIZE,
1029 buf_dblwr->first_free) << srv_page_size_shift;
1030
1031 fil_io(IORequestWrite, true,
1032 page_id_t(TRX_SYS_SPACE, buf_dblwr->block1), univ_page_size,
1033 0, len, (void*) write_buf, NULL);
1034
1035 if (buf_dblwr->first_free <= TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1036 /* No unwritten pages in the second block. */
1037 goto flush;
1038 }
1039
1040 /* Write out the second block of the doublewrite buffer. */
1041 len = (buf_dblwr->first_free - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE)
1042 << srv_page_size_shift;
1043
1044 write_buf = buf_dblwr->write_buf
1045 + (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
1046
1047 fil_io(IORequestWrite, true,
1048 page_id_t(TRX_SYS_SPACE, buf_dblwr->block2), univ_page_size,
1049 0, len, (void*) write_buf, NULL);
1050
1051flush:
1052 /* increment the doublewrite flushed pages counter */
1053 srv_stats.dblwr_pages_written.add(buf_dblwr->first_free);
1054 srv_stats.dblwr_writes.inc();
1055
1056 /* Now flush the doublewrite buffer data to disk */
1057 fil_flush(TRX_SYS_SPACE);
1058
1059 /* We know that the writes have been flushed to disk now
1060 and in recovery we will find them in the doublewrite buffer
1061 blocks. Next do the writes to the intended positions. */
1062
1063 /* Up to this point first_free and buf_dblwr->first_free are
1064 same because we have set the buf_dblwr->batch_running flag
1065 disallowing any other thread to post any request but we
1066 can't safely access buf_dblwr->first_free in the loop below.
1067 This is so because it is possible that after we are done with
1068 the last iteration and before we terminate the loop, the batch
1069 gets finished in the IO helper thread and another thread posts
1070 a new batch setting buf_dblwr->first_free to a higher value.
1071 If this happens and we are using buf_dblwr->first_free in the
1072 loop termination condition then we'll end up dispatching
1073 the same block twice from two different threads. */
1074 ut_ad(first_free == buf_dblwr->first_free);
1075 for (ulint i = 0; i < first_free; i++) {
1076 buf_dblwr_write_block_to_datafile(
1077 buf_dblwr->buf_block_arr[i], false);
1078 }
1079
1080 /* Wake possible simulated aio thread to actually post the
1081 writes to the operating system. We don't flush the files
1082 at this point. We leave it to the IO helper thread to flush
1083 datafiles when the whole batch has been processed. */
1084 os_aio_simulated_wake_handler_threads();
1085}
1086
1087/********************************************************************//**
1088Posts a buffer page for writing. If the doublewrite memory buffer is
1089full, calls buf_dblwr_flush_buffered_writes and waits for for free
1090space to appear. */
1091void
1092buf_dblwr_add_to_batch(
1093/*====================*/
1094 buf_page_t* bpage) /*!< in: buffer block to write */
1095{
1096 ut_a(buf_page_in_file(bpage));
1097
1098try_again:
1099 mutex_enter(&buf_dblwr->mutex);
1100
1101 ut_a(buf_dblwr->first_free <= srv_doublewrite_batch_size);
1102
1103 if (buf_dblwr->batch_running) {
1104
1105 /* This not nearly as bad as it looks. There is only
1106 page_cleaner thread which does background flushing
1107 in batches therefore it is unlikely to be a contention
1108 point. The only exception is when a user thread is
1109 forced to do a flush batch because of a sync
1110 checkpoint. */
1111 int64_t sig_count = os_event_reset(buf_dblwr->b_event);
1112 mutex_exit(&buf_dblwr->mutex);
1113
1114 os_event_wait_low(buf_dblwr->b_event, sig_count);
1115 goto try_again;
1116 }
1117
1118 if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1119 mutex_exit(&(buf_dblwr->mutex));
1120
1121 buf_dblwr_flush_buffered_writes();
1122
1123 goto try_again;
1124 }
1125
1126 byte* p = buf_dblwr->write_buf
1127 + srv_page_size * buf_dblwr->first_free;
1128
1129 /* We request frame here to get correct buffer in case of
1130 encryption and/or page compression */
1131 void * frame = buf_page_get_frame(bpage);
1132
1133 if (bpage->size.is_compressed()) {
1134 UNIV_MEM_ASSERT_RW(bpage->zip.data, bpage->size.physical());
1135 /* Copy the compressed page and clear the rest. */
1136
1137 memcpy(p, frame, bpage->size.physical());
1138
1139 memset(p + bpage->size.physical(), 0x0,
1140 srv_page_size - bpage->size.physical());
1141 } else {
1142 ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
1143
1144 UNIV_MEM_ASSERT_RW(frame,
1145 bpage->size.logical());
1146
1147 memcpy(p, frame, bpage->size.logical());
1148 }
1149
1150 buf_dblwr->buf_block_arr[buf_dblwr->first_free] = bpage;
1151
1152 buf_dblwr->first_free++;
1153 buf_dblwr->b_reserved++;
1154
1155 ut_ad(!buf_dblwr->batch_running);
1156 ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
1157 ut_ad(buf_dblwr->b_reserved <= srv_doublewrite_batch_size);
1158
1159 if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1160 mutex_exit(&(buf_dblwr->mutex));
1161
1162 buf_dblwr_flush_buffered_writes();
1163
1164 return;
1165 }
1166
1167 mutex_exit(&(buf_dblwr->mutex));
1168}
1169
1170/********************************************************************//**
1171Writes a page to the doublewrite buffer on disk, sync it, then write
1172the page to the datafile and sync the datafile. This function is used
1173for single page flushes. If all the buffers allocated for single page
1174flushes in the doublewrite buffer are in use we wait here for one to
1175become free. We are guaranteed that a slot will become free because any
1176thread that is using a slot must also release the slot before leaving
1177this function. */
1178void
1179buf_dblwr_write_single_page(
1180/*========================*/
1181 buf_page_t* bpage, /*!< in: buffer block to write */
1182 bool sync) /*!< in: true if sync IO requested */
1183{
1184 ulint n_slots;
1185 ulint size;
1186 ulint offset;
1187 ulint i;
1188
1189 ut_a(buf_page_in_file(bpage));
1190 ut_a(srv_use_doublewrite_buf);
1191 ut_a(buf_dblwr != NULL);
1192
1193 /* total number of slots available for single page flushes
1194 starts from srv_doublewrite_batch_size to the end of the
1195 buffer. */
1196 size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1197 ut_a(size > srv_doublewrite_batch_size);
1198 n_slots = size - srv_doublewrite_batch_size;
1199
1200 if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
1201
1202 /* Check that the actual page in the buffer pool is
1203 not corrupt and the LSN values are sane. */
1204 buf_dblwr_check_block((buf_block_t*) bpage);
1205
1206 /* Check that the page as written to the doublewrite
1207 buffer has sane LSN values. */
1208 if (!bpage->zip.data) {
1209 buf_dblwr_check_page_lsn(
1210 ((buf_block_t*) bpage)->frame);
1211 }
1212 }
1213
1214retry:
1215 mutex_enter(&buf_dblwr->mutex);
1216 if (buf_dblwr->s_reserved == n_slots) {
1217
1218 /* All slots are reserved. */
1219 int64_t sig_count = os_event_reset(buf_dblwr->s_event);
1220 mutex_exit(&buf_dblwr->mutex);
1221 os_event_wait_low(buf_dblwr->s_event, sig_count);
1222
1223 goto retry;
1224 }
1225
1226 for (i = srv_doublewrite_batch_size; i < size; ++i) {
1227
1228 if (!buf_dblwr->in_use[i]) {
1229 break;
1230 }
1231 }
1232
1233 /* We are guaranteed to find a slot. */
1234 ut_a(i < size);
1235 buf_dblwr->in_use[i] = true;
1236 buf_dblwr->s_reserved++;
1237 buf_dblwr->buf_block_arr[i] = bpage;
1238
1239 /* increment the doublewrite flushed pages counter */
1240 srv_stats.dblwr_pages_written.inc();
1241 srv_stats.dblwr_writes.inc();
1242
1243 mutex_exit(&buf_dblwr->mutex);
1244
1245 /* Lets see if we are going to write in the first or second
1246 block of the doublewrite buffer. */
1247 if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1248 offset = buf_dblwr->block1 + i;
1249 } else {
1250 offset = buf_dblwr->block2 + i
1251 - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1252 }
1253
1254 /* We deal with compressed and uncompressed pages a little
1255 differently here. In case of uncompressed pages we can
1256 directly write the block to the allocated slot in the
1257 doublewrite buffer in the system tablespace and then after
1258 syncing the system table space we can proceed to write the page
1259 in the datafile.
1260 In case of compressed page we first do a memcpy of the block
1261 to the in-memory buffer of doublewrite before proceeding to
1262 write it. This is so because we want to pad the remaining
1263 bytes in the doublewrite page with zeros. */
1264
1265 /* We request frame here to get correct buffer in case of
1266 encryption and/or page compression */
1267 void * frame = buf_page_get_frame(bpage);
1268
1269 if (bpage->size.is_compressed()) {
1270 memcpy(buf_dblwr->write_buf + srv_page_size * i,
1271 frame, bpage->size.physical());
1272
1273 memset(buf_dblwr->write_buf + srv_page_size * i
1274 + bpage->size.physical(), 0x0,
1275 srv_page_size - bpage->size.physical());
1276
1277 fil_io(IORequestWrite,
1278 true,
1279 page_id_t(TRX_SYS_SPACE, offset),
1280 univ_page_size,
1281 0,
1282 srv_page_size,
1283 (void *)(buf_dblwr->write_buf + srv_page_size * i),
1284 NULL);
1285 } else {
1286 /* It is a regular page. Write it directly to the
1287 doublewrite buffer */
1288 fil_io(IORequestWrite,
1289 true,
1290 page_id_t(TRX_SYS_SPACE, offset),
1291 univ_page_size,
1292 0,
1293 srv_page_size,
1294 (void*) frame,
1295 NULL);
1296 }
1297
1298 /* Now flush the doublewrite buffer data to disk */
1299 fil_flush(TRX_SYS_SPACE);
1300
1301 /* We know that the write has been flushed to disk now
1302 and during recovery we will find it in the doublewrite buffer
1303 blocks. Next do the write to the intended position. */
1304 buf_dblwr_write_block_to_datafile(bpage, sync);
1305}
1306