1/*****************************************************************************
2
3Copyright (C) 2013, 2017, MariaDB Corporation.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free Software
7Foundation; version 2 of the License.
8
9This program is distributed in the hope that it will be useful, but WITHOUT
10ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13You should have received a copy of the GNU General Public License along with
14this program; if not, write to the Free Software Foundation, Inc.,
1551 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16
17*****************************************************************************/
18
19/******************************************************************//**
20@file fil/fil0pagecompress.cc
21Implementation for page compressed file spaces.
22
23Created 11/12/2013 Jan Lindström jan.lindstrom@mariadb.com
24Updated 14/02/2015
25***********************************************************************/
26
27#include "fil0fil.h"
28#include "fil0pagecompress.h"
29
30#include <debug_sync.h>
31#include <my_dbug.h>
32
33#include "mem0mem.h"
34#include "hash0hash.h"
35#include "os0file.h"
36#include "mach0data.h"
37#include "buf0buf.h"
38#include "buf0flu.h"
39#include "log0recv.h"
40#include "fsp0fsp.h"
41#include "srv0srv.h"
42#include "srv0start.h"
43#include "mtr0mtr.h"
44#include "mtr0log.h"
45#include "dict0dict.h"
46#include "page0page.h"
47#include "page0zip.h"
48#include "trx0sys.h"
49#include "row0mysql.h"
50#include "ha_prototypes.h" // IB_LOG_
51#include "buf0lru.h"
52#include "ibuf0ibuf.h"
53#include "sync0sync.h"
54#include "zlib.h"
55#ifdef __linux__
56#include <linux/fs.h>
57#include <sys/ioctl.h>
58#include <fcntl.h>
59#endif
60#include "row0mysql.h"
61#ifdef HAVE_LZ4
62#include "lz4.h"
63#endif
64#ifdef HAVE_LZO
65#include "lzo/lzo1x.h"
66#endif
67#ifdef HAVE_LZMA
68#include "lzma.h"
69#endif
70#ifdef HAVE_BZIP2
71#include "bzlib.h"
72#endif
73#ifdef HAVE_SNAPPY
74#include "snappy-c.h"
75#endif
76
77/* Used for debugging */
78//#define UNIV_PAGECOMPRESS_DEBUG 1
79
80/****************************************************************//**
81For page compressed pages compress the page before actual write
82operation.
83@return compressed page to be written*/
84UNIV_INTERN
85byte*
86fil_compress_page(
87/*==============*/
88 fil_space_t* space, /*!< in,out: tablespace (NULL during IMPORT) */
89 byte* buf, /*!< in: buffer from which to write; in aio
90 this must be appropriately aligned */
91 byte* out_buf, /*!< out: compressed buffer */
92 ulint len, /*!< in: length of input buffer.*/
93 ulint level, /* in: compression level */
94 ulint block_size, /*!< in: block size */
95 bool encrypted, /*!< in: is page also encrypted */
96 ulint* out_len) /*!< out: actual length of compressed
97 page */
98{
99 int err = Z_OK;
100 int comp_level = int(level);
101 ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
102 ulint write_size = 0;
103#if defined(HAVE_LZO)
104 lzo_uint write_size_lzo = write_size;
105#endif
106 /* Cache to avoid change during function execution */
107 ulint comp_method = innodb_compression_algorithm;
108 bool allocated = false;
109
110 /* page_compression does not apply to tables or tablespaces
111 that use ROW_FORMAT=COMPRESSED */
112 ut_ad(!space || !FSP_FLAGS_GET_ZIP_SSIZE(space->flags));
113
114 if (encrypted) {
115 header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE;
116 }
117
118 if (!out_buf) {
119 allocated = true;
120 ulint size = srv_page_size;
121
122 /* Both snappy and lzo compression methods require that
123 output buffer used for compression is bigger than input
124 buffer. Increase the allocated buffer size accordingly. */
125#if defined(HAVE_SNAPPY)
126 if (comp_method == PAGE_SNAPPY_ALGORITHM) {
127 size = snappy_max_compressed_length(size);
128 }
129#endif
130#if defined(HAVE_LZO)
131 if (comp_method == PAGE_LZO_ALGORITHM) {
132 size += LZO1X_1_15_MEM_COMPRESS;
133 }
134#endif
135
136 out_buf = static_cast<byte *>(ut_malloc_nokey(size));
137 }
138
139 ut_ad(buf);
140 ut_ad(out_buf);
141 ut_ad(len);
142 ut_ad(out_len);
143
144 /* Let's not compress file space header or
145 extent descriptor */
146 switch (fil_page_get_type(buf)) {
147 case 0:
148 case FIL_PAGE_TYPE_FSP_HDR:
149 case FIL_PAGE_TYPE_XDES:
150 case FIL_PAGE_PAGE_COMPRESSED:
151 *out_len = len;
152 goto err_exit;
153 }
154
155 /* If no compression level was provided to this table, use system
156 default level */
157 if (comp_level == 0) {
158 comp_level = int(page_zip_level);
159 }
160
161 DBUG_LOG("compress", "Preparing for space "
162 << (space ? space->id : 0) << " '"
163 << (space ? space->name : "(import)") << "' len " << len);
164
165 write_size = srv_page_size - header_len;
166
167 switch(comp_method) {
168#ifdef HAVE_LZ4
169 case PAGE_LZ4_ALGORITHM:
170
171#ifdef HAVE_LZ4_COMPRESS_DEFAULT
172 err = LZ4_compress_default((const char *)buf,
173 (char *)out_buf+header_len, len, write_size);
174#else
175 err = LZ4_compress_limitedOutput((const char *)buf,
176 (char *)out_buf+header_len, len, write_size);
177#endif /* HAVE_LZ4_COMPRESS_DEFAULT */
178 write_size = err;
179
180 if (err == 0) {
181 goto err_exit;
182 }
183 break;
184#endif /* HAVE_LZ4 */
185#ifdef HAVE_LZO
186 case PAGE_LZO_ALGORITHM:
187 err = lzo1x_1_15_compress(
188 buf, len, out_buf+header_len, &write_size_lzo, out_buf+srv_page_size);
189
190 write_size = write_size_lzo;
191
192 if (err != LZO_E_OK || write_size > srv_page_size-header_len) {
193 goto err_exit;
194 }
195
196 break;
197#endif /* HAVE_LZO */
198#ifdef HAVE_LZMA
199 case PAGE_LZMA_ALGORITHM: {
200 size_t out_pos=0;
201
202 err = lzma_easy_buffer_encode(
203 comp_level,
204 LZMA_CHECK_NONE,
205 NULL, /* No custom allocator, use malloc/free */
206 reinterpret_cast<uint8_t*>(buf),
207 len,
208 reinterpret_cast<uint8_t*>(out_buf + header_len),
209 &out_pos,
210 (size_t)write_size);
211
212 if (err != LZMA_OK || out_pos > srv_page_size-header_len) {
213 write_size = out_pos;
214 goto err_exit;
215 }
216
217 write_size = out_pos;
218
219 break;
220 }
221#endif /* HAVE_LZMA */
222
223#ifdef HAVE_BZIP2
224 case PAGE_BZIP2_ALGORITHM: {
225
226 err = BZ2_bzBuffToBuffCompress(
227 (char *)(out_buf + header_len),
228 (unsigned int *)&write_size,
229 (char *)buf,
230 len,
231 1,
232 0,
233 0);
234
235 if (err != BZ_OK || write_size > srv_page_size-header_len) {
236 goto err_exit;
237 }
238 break;
239 }
240#endif /* HAVE_BZIP2 */
241
242#ifdef HAVE_SNAPPY
243 case PAGE_SNAPPY_ALGORITHM:
244 {
245 snappy_status cstatus;
246 write_size = snappy_max_compressed_length(srv_page_size);
247
248 cstatus = snappy_compress(
249 (const char *)buf,
250 (size_t)len,
251 (char *)(out_buf+header_len),
252 (size_t*)&write_size);
253
254 if (cstatus != SNAPPY_OK || write_size > srv_page_size-header_len) {
255 err = (int)cstatus;
256 goto err_exit;
257 }
258 break;
259 }
260#endif /* HAVE_SNAPPY */
261
262 case PAGE_ZLIB_ALGORITHM:
263 err = compress2(out_buf+header_len, (ulong*)&write_size, buf,
264 uLong(len), comp_level);
265
266 if (err != Z_OK) {
267 goto err_exit;
268 }
269 break;
270
271 case PAGE_UNCOMPRESSED:
272 *out_len = len;
273 return (buf);
274 break;
275 default:
276 ut_error;
277 break;
278 }
279
280 /* Set up the page header */
281 memcpy(out_buf, buf, FIL_PAGE_DATA);
282 /* Set up the checksum */
283 mach_write_to_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC);
284
285 /* Set up the compression algorithm */
286 mach_write_to_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, comp_method);
287
288 if (encrypted) {
289 /* Set up the correct page type */
290 mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
291 mach_write_to_2(out_buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE, comp_method);
292 } else {
293 /* Set up the correct page type */
294 mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED);
295 }
296
297 /* Set up the actual payload lenght */
298 mach_write_to_2(out_buf+FIL_PAGE_DATA, write_size);
299
300#ifdef UNIV_DEBUG
301 /* Verify */
302 ut_ad(fil_page_is_compressed(out_buf) || fil_page_is_compressed_encrypted(out_buf));
303 ut_ad(mach_read_from_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM) == BUF_NO_CHECKSUM_MAGIC);
304 ut_ad(mach_read_from_2(out_buf+FIL_PAGE_DATA) == write_size);
305 ut_ad(mach_read_from_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION) == (ulint)comp_method ||
306 mach_read_from_2(out_buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE) == (ulint)comp_method);
307
308 /* Verify that page can be decompressed */
309 {
310 byte *comp_page;
311 byte *uncomp_page;
312
313 comp_page = static_cast<byte *>(ut_malloc_nokey(srv_page_size));
314 uncomp_page = static_cast<byte *>(ut_malloc_nokey(srv_page_size));
315 memcpy(comp_page, out_buf, srv_page_size);
316
317 fil_decompress_page(uncomp_page, comp_page, ulong(len), NULL);
318
319 if (buf_page_is_corrupted(false, uncomp_page, univ_page_size,
320 space)) {
321 buf_page_print(uncomp_page, univ_page_size);
322 ut_ad(0);
323 }
324
325 ut_free(comp_page);
326 ut_free(uncomp_page);
327 }
328#endif /* UNIV_DEBUG */
329
330 write_size+=header_len;
331
332 if (block_size <= 0) {
333 block_size = 512;
334 }
335
336 ut_ad(write_size > 0 && block_size > 0);
337
338 /* Actual write needs to be alligned on block size */
339 if (write_size % block_size) {
340 size_t tmp = write_size;
341 write_size = (size_t)ut_uint64_align_up((ib_uint64_t)write_size, block_size);
342 /* Clean up the end of buffer */
343 memset(out_buf+tmp, 0, write_size - tmp);
344#ifdef UNIV_DEBUG
345 ut_a(write_size > 0 && ((write_size % block_size) == 0));
346 ut_a(write_size >= tmp);
347#endif
348 }
349
350 DBUG_LOG("compress", "Succeeded for space "
351 << (space ? space->id : 0) << " '"
352 << (space ? space->name : "(import)")
353 << "' len " << len << " out_len " << write_size);
354
355 srv_stats.page_compression_saved.add((len - write_size));
356 srv_stats.pages_page_compressed.inc();
357
358 *out_len = write_size;
359
360 if (allocated) {
361 /* TODO: reduce number of memcpy's */
362 memcpy(buf, out_buf, len);
363 goto exit_free;
364 } else {
365 return(out_buf);
366 }
367
368err_exit:
369 /* If error we leave the actual page as it was */
370
371#ifndef UNIV_PAGECOMPRESS_DEBUG
372 if (space && !space->printed_compression_failure) {
373 space->printed_compression_failure = true;
374#endif
375 ib::warn() << "Compression failed for space: "
376 << space->id << " name: "
377 << space->name << " len: "
378 << len << " err: " << err << " write_size: "
379 << write_size
380 << " compression method: "
381 << fil_get_compression_alg_name(comp_method)
382 << ".";
383#ifndef UNIV_PAGECOMPRESS_DEBUG
384 }
385#endif
386 srv_stats.pages_page_compression_error.inc();
387 *out_len = len;
388
389exit_free:
390 if (allocated) {
391 ut_free(out_buf);
392 }
393
394 return (buf);
395
396}
397
398/****************************************************************//**
399For page compressed pages decompress the page after actual read
400operation. */
401UNIV_INTERN
402void
403fil_decompress_page(
404/*================*/
405 byte* page_buf, /*!< in: preallocated buffer or NULL */
406 byte* buf, /*!< out: buffer from which to read; in aio
407 this must be appropriately aligned */
408 ulong len, /*!< in: length of output buffer.*/
409 ulint* write_size, /*!< in/out: Actual payload size of
410 the compressed data. */
411 bool return_error) /*!< in: true if only an error should
412 be produced when decompression fails.
413 By default this parameter is false. */
414{
415 int err = 0;
416 ulint actual_size = 0;
417 ib_uint64_t compression_alg = 0;
418 byte *in_buf;
419 ulint ptype;
420 ulint header_len;
421
422 ut_ad(buf);
423 ut_ad(len);
424
425 ptype = mach_read_from_2(buf+FIL_PAGE_TYPE);
426
427 switch (ptype) {
428 case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
429 header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE
430 + FIL_PAGE_COMPRESSION_METHOD_SIZE;
431 break;
432 case FIL_PAGE_PAGE_COMPRESSED:
433 header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
434 break;
435 default:
436 /* The page is not in our format. */
437 return;
438 }
439
440 // If no buffer was given, we need to allocate temporal buffer
441 if (page_buf == NULL) {
442 in_buf = static_cast<byte *>(ut_malloc_nokey(srv_page_size));
443 memset(in_buf, 0, srv_page_size);
444 } else {
445 in_buf = page_buf;
446 }
447
448 /* Before actual decompress, make sure that page type is correct */
449
450 if (mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM)
451 != BUF_NO_CHECKSUM_MAGIC
452 || (ptype != FIL_PAGE_PAGE_COMPRESSED
453 && ptype != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)) {
454 ib::error() << "Corruption: We try to uncompress corrupted "
455 "page CRC "
456 << mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM)
457 << " type " << ptype << " len " << len << ".";
458
459 if (return_error) {
460 goto error_return;
461 }
462 ut_error;
463 }
464
465 /* Get compression algorithm */
466 if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
467 compression_alg = static_cast<ib_uint64_t>(mach_read_from_2(buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE));
468 } else {
469 compression_alg = mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
470 }
471
472 /* Get the actual size of compressed page */
473 actual_size = mach_read_from_2(buf+FIL_PAGE_DATA);
474 /* Check if payload size is corrupted */
475 if (actual_size == 0 || actual_size > srv_page_size) {
476 ib::error() << "Corruption: We try to uncompress corrupted page"
477 << " actual size: " << actual_size
478 << " compression method: "
479 << fil_get_compression_alg_name(compression_alg)
480 << ".";
481 if (return_error) {
482 goto error_return;
483 }
484 ut_error;
485 }
486
487 /* Store actual payload size of the compressed data. This pointer
488 points to buffer pool. */
489 if (write_size) {
490 *write_size = actual_size;
491 }
492
493 DBUG_LOG("compress", "Preparing for decompress for len "
494 << actual_size << ".");
495
496 switch(compression_alg) {
497 case PAGE_ZLIB_ALGORITHM:
498 err= uncompress(in_buf, &len, buf+header_len, (unsigned long)actual_size);
499
500 /* If uncompress fails it means that page is corrupted */
501 if (err != Z_OK) {
502 goto err_exit;
503 if (return_error) {
504 goto error_return;
505 }
506 }
507 break;
508
509#ifdef HAVE_LZ4
510 case PAGE_LZ4_ALGORITHM:
511 err = LZ4_decompress_fast((const char *)buf+header_len, (char *)in_buf, len);
512
513 if (err != (int)actual_size) {
514 goto err_exit;
515 if (return_error) {
516 goto error_return;
517 }
518 }
519 break;
520#endif /* HAVE_LZ4 */
521#ifdef HAVE_LZO
522 case PAGE_LZO_ALGORITHM: {
523 ulint olen = 0;
524 lzo_uint olen_lzo = olen;
525 err = lzo1x_decompress((const unsigned char *)buf+header_len,
526 actual_size,(unsigned char *)in_buf, &olen_lzo, NULL);
527
528 olen = olen_lzo;
529
530 if (err != LZO_E_OK || (olen == 0 || olen > srv_page_size)) {
531 len = olen;
532 goto err_exit;
533 if (return_error) {
534 goto error_return;
535 }
536 }
537 break;
538 }
539#endif /* HAVE_LZO */
540#ifdef HAVE_LZMA
541 case PAGE_LZMA_ALGORITHM: {
542
543 lzma_ret ret;
544 size_t src_pos = 0;
545 size_t dst_pos = 0;
546 uint64_t memlimit = UINT64_MAX;
547
548 ret = lzma_stream_buffer_decode(
549 &memlimit,
550 0,
551 NULL,
552 buf+header_len,
553 &src_pos,
554 actual_size,
555 in_buf,
556 &dst_pos,
557 len);
558
559
560 if (ret != LZMA_OK || (dst_pos == 0 || dst_pos > srv_page_size)) {
561 len = dst_pos;
562 goto err_exit;
563 if (return_error) {
564 goto error_return;
565 }
566 }
567
568 break;
569 }
570#endif /* HAVE_LZMA */
571#ifdef HAVE_BZIP2
572 case PAGE_BZIP2_ALGORITHM: {
573 unsigned int dst_pos = srv_page_size;
574
575 err = BZ2_bzBuffToBuffDecompress(
576 (char *)in_buf,
577 &dst_pos,
578 (char *)(buf+header_len),
579 actual_size,
580 1,
581 0);
582
583 if (err != BZ_OK || (dst_pos == 0 || dst_pos > srv_page_size)) {
584 len = dst_pos;
585 goto err_exit;
586 if (return_error) {
587 goto error_return;
588 }
589 }
590 break;
591 }
592#endif /* HAVE_BZIP2 */
593#ifdef HAVE_SNAPPY
594 case PAGE_SNAPPY_ALGORITHM:
595 {
596 snappy_status cstatus;
597 ulint olen = srv_page_size;
598
599 cstatus = snappy_uncompress(
600 (const char *)(buf+header_len),
601 (size_t)actual_size,
602 (char *)in_buf,
603 (size_t*)&olen);
604
605 if (cstatus != SNAPPY_OK || (olen == 0 || olen > srv_page_size)) {
606 err = (int)cstatus;
607 len = olen;
608 goto err_exit;
609 if (return_error) {
610 goto error_return;
611 }
612 }
613
614 break;
615 }
616#endif /* HAVE_SNAPPY */
617 default:
618 goto err_exit;
619 if (return_error) {
620 goto error_return;
621 }
622 break;
623 }
624
625 srv_stats.pages_page_decompressed.inc();
626
627 /* Copy the uncompressed page to the buffer pool, not
628 really any other options. */
629 memcpy(buf, in_buf, len);
630
631error_return:
632 if (page_buf != in_buf) {
633 ut_free(in_buf);
634 }
635
636 return;
637
638err_exit:
639 /* Note that as we have found the page is corrupted, so
640 all this could be incorrect. */
641 ulint space_id = mach_read_from_4(buf+FIL_PAGE_SPACE_ID);
642 fil_space_t* space = fil_space_acquire_for_io(space_id);
643
644 ib::error() << "Corruption: Page is marked as compressed"
645 << " space: " << space_id << " name: "
646 << (space ? space->name : "NULL")
647 << " but uncompress failed with error: " << err
648 << " size: " << actual_size
649 << " len: " << len
650 << " compression method: "
651 << fil_get_compression_alg_name(compression_alg) << ".";
652
653 buf_page_print(buf, univ_page_size);
654 space->release_for_io();
655 ut_ad(0);
656}
657