1/*****************************************************************************
2Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3Copyright (c) 2014, 2018, MariaDB Corporation. All Rights Reserved.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free Software
7Foundation; version 2 of the License.
8
9This program is distributed in the hope that it will be useful, but WITHOUT
10ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13You should have received a copy of the GNU General Public License along with
14this program; if not, write to the Free Software Foundation, Inc.,
1551 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16
17*****************************************************************************/
18/**************************************************//**
19@file fil0crypt.cc
20Innodb file space encrypt/decrypt
21
22Created Jonas Oreland Google
23Modified Jan Lindström jan.lindstrom@mariadb.com
24*******************************************************/
25
26#include "fil0fil.h"
27#include "mtr0types.h"
28#include "mach0data.h"
29#include "page0size.h"
30#include "page0zip.h"
31#ifndef UNIV_INNOCHECKSUM
32#include "fil0crypt.h"
33#include "srv0srv.h"
34#include "srv0start.h"
35#include "log0recv.h"
36#include "mtr0mtr.h"
37#include "mtr0log.h"
38#include "ut0ut.h"
39#include "btr0scrub.h"
40#include "fsp0fsp.h"
41#include "fil0pagecompress.h"
42#include "ha_prototypes.h" // IB_LOG_
43#include <my_crypt.h>
44
45/** Mutex for keys */
46static ib_mutex_t fil_crypt_key_mutex;
47
48static bool fil_crypt_threads_inited = false;
49
50/** Is encryption enabled/disabled */
51UNIV_INTERN ulong srv_encrypt_tables = 0;
52
53/** No of key rotation threads requested */
54UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
55
56/** No of key rotation threads started */
57UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
58
59/** At this age or older a space/page will be rotated */
60UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
61
62/** Event to signal FROM the key rotation threads. */
63static os_event_t fil_crypt_event;
64
65/** Event to signal TO the key rotation threads. */
66UNIV_INTERN os_event_t fil_crypt_threads_event;
67
68/** Event for waking up threads throttle. */
69static os_event_t fil_crypt_throttle_sleep_event;
70
71/** Mutex for key rotation threads. */
72UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
73
74/** Variable ensuring only 1 thread at time does initial conversion */
75static bool fil_crypt_start_converting = false;
76
77/** Variables for throttling */
78UNIV_INTERN uint srv_n_fil_crypt_iops = 100; // 10ms per iop
79static uint srv_alloc_time = 3; // allocate iops for 3s at a time
80static uint n_fil_crypt_iops_allocated = 0;
81
82/** Variables for scrubbing */
83extern uint srv_background_scrub_data_interval;
84extern uint srv_background_scrub_data_check_interval;
85
86#define DEBUG_KEYROTATION_THROTTLING 0
87
88/** Statistics variables */
89static fil_crypt_stat_t crypt_stat;
90static ib_mutex_t crypt_stat_mutex;
91
92/** Is background scrubbing enabled, defined on btr0scrub.cc */
93extern my_bool srv_background_scrub_data_uncompressed;
94extern my_bool srv_background_scrub_data_compressed;
95
96/***********************************************************************
97Check if a key needs rotation given a key_state
98@param[in] crypt_data Encryption information
99@param[in] key_version Current key version
100@param[in] latest_key_version Latest key version
101@param[in] rotate_key_age when to rotate
102@return true if key needs rotation, false if not */
103static bool
104fil_crypt_needs_rotation(
105 const fil_space_crypt_t* crypt_data,
106 uint key_version,
107 uint latest_key_version,
108 uint rotate_key_age)
109 MY_ATTRIBUTE((warn_unused_result));
110
111/*********************************************************************
112Init space crypt */
113UNIV_INTERN
114void
115fil_space_crypt_init()
116{
117 mutex_create(LATCH_ID_FIL_CRYPT_MUTEX, &fil_crypt_key_mutex);
118
119 fil_crypt_throttle_sleep_event = os_event_create(0);
120
121 mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
122 memset(&crypt_stat, 0, sizeof(crypt_stat));
123}
124
125/*********************************************************************
126Cleanup space crypt */
127UNIV_INTERN
128void
129fil_space_crypt_cleanup()
130{
131 os_event_destroy(fil_crypt_throttle_sleep_event);
132 mutex_free(&fil_crypt_key_mutex);
133 mutex_free(&crypt_stat_mutex);
134}
135
136/**
137Get latest key version from encryption plugin.
138@return key version or ENCRYPTION_KEY_VERSION_INVALID */
139uint
140fil_space_crypt_t::key_get_latest_version(void)
141{
142 uint key_version = key_found;
143
144 if (is_key_found()) {
145 key_version = encryption_key_get_latest_version(key_id);
146 srv_stats.n_key_requests.inc();
147 key_found = key_version;
148 }
149
150 return key_version;
151}
152
153/******************************************************************
154Get the latest(key-version), waking the encrypt thread, if needed
155@param[in,out] crypt_data Crypt data */
156static inline
157uint
158fil_crypt_get_latest_key_version(
159 fil_space_crypt_t* crypt_data)
160{
161 ut_ad(crypt_data != NULL);
162
163 uint key_version = crypt_data->key_get_latest_version();
164
165 if (crypt_data->is_key_found()) {
166
167 if (fil_crypt_needs_rotation(
168 crypt_data,
169 crypt_data->min_key_version,
170 key_version,
171 srv_fil_crypt_rotate_key_age)) {
172 /* Below event seen as NULL-pointer at startup
173 when new database was created and we create a
174 checkpoint. Only seen when debugging. */
175 if (fil_crypt_threads_inited) {
176 os_event_set(fil_crypt_threads_event);
177 }
178 }
179 }
180
181 return key_version;
182}
183
184/******************************************************************
185Mutex helper for crypt_data->scheme */
186void
187crypt_data_scheme_locker(
188/*=====================*/
189 st_encryption_scheme* scheme,
190 int exit)
191{
192 fil_space_crypt_t* crypt_data =
193 static_cast<fil_space_crypt_t*>(scheme);
194
195 if (exit) {
196 mutex_exit(&crypt_data->mutex);
197 } else {
198 mutex_enter(&crypt_data->mutex);
199 }
200}
201
202/******************************************************************
203Create a fil_space_crypt_t object
204@param[in] type CRYPT_SCHEME_UNENCRYPTE or
205 CRYPT_SCHEME_1
206@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
207 FIL_ENCRYPTION_ON or
208 FIL_ENCRYPTION_OFF
209@param[in] min_key_version key_version or 0
210@param[in] key_id Used key id
211@return crypt object */
212static
213fil_space_crypt_t*
214fil_space_create_crypt_data(
215 uint type,
216 fil_encryption_t encrypt_mode,
217 uint min_key_version,
218 uint key_id)
219{
220 fil_space_crypt_t* crypt_data = NULL;
221 if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
222 crypt_data = new(buf)
223 fil_space_crypt_t(
224 type,
225 min_key_version,
226 key_id,
227 encrypt_mode);
228 }
229
230 return crypt_data;
231}
232
233/******************************************************************
234Create a fil_space_crypt_t object
235@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
236 FIL_ENCRYPTION_ON or
237 FIL_ENCRYPTION_OFF
238
239@param[in] key_id Encryption key id
240@return crypt object */
241UNIV_INTERN
242fil_space_crypt_t*
243fil_space_create_crypt_data(
244 fil_encryption_t encrypt_mode,
245 uint key_id)
246{
247 return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
248}
249
250/******************************************************************
251Merge fil_space_crypt_t object
252@param[in,out] dst Destination cryp data
253@param[in] src Source crypt data */
254UNIV_INTERN
255void
256fil_space_merge_crypt_data(
257 fil_space_crypt_t* dst,
258 const fil_space_crypt_t* src)
259{
260 mutex_enter(&dst->mutex);
261
262 /* validate that they are mergeable */
263 ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
264 src->type == CRYPT_SCHEME_1);
265
266 ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
267 dst->type == CRYPT_SCHEME_1);
268
269 dst->encryption = src->encryption;
270 dst->type = src->type;
271 dst->min_key_version = src->min_key_version;
272 dst->keyserver_requests += src->keyserver_requests;
273
274 mutex_exit(&dst->mutex);
275}
276
277/** Initialize encryption parameters from a tablespace header page.
278@param[in] page_size page size of the tablespace
279@param[in] page first page of the tablespace
280@return crypt data from page 0
281@retval NULL if not present or not valid */
282UNIV_INTERN
283fil_space_crypt_t*
284fil_space_read_crypt_data(const page_size_t& page_size, const byte* page)
285{
286 const ulint offset = FSP_HEADER_OFFSET
287 + fsp_header_get_encryption_offset(page_size);
288
289 if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
290 /* Crypt data is not stored. */
291 return NULL;
292 }
293
294 uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
295 uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
296 fil_space_crypt_t* crypt_data;
297
298 if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
299 type == CRYPT_SCHEME_1)
300 || iv_length != sizeof crypt_data->iv) {
301 ib::error() << "Found non sensible crypt scheme: "
302 << type << "," << iv_length << " for space: "
303 << page_get_space_id(page) << " offset: "
304 << offset << " bytes: ["
305 << page[offset + 2 + MAGIC_SZ]
306 << page[offset + 3 + MAGIC_SZ]
307 << page[offset + 4 + MAGIC_SZ]
308 << page[offset + 5 + MAGIC_SZ]
309 << "].";
310 return NULL;
311 }
312
313 uint min_key_version = mach_read_from_4
314 (page + offset + MAGIC_SZ + 2 + iv_length);
315
316 uint key_id = mach_read_from_4
317 (page + offset + MAGIC_SZ + 2 + iv_length + 4);
318
319 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
320 page + offset + MAGIC_SZ + 2 + iv_length + 8);
321
322 crypt_data = fil_space_create_crypt_data(encryption, key_id);
323 /* We need to overwrite these as above function will initialize
324 members */
325 crypt_data->type = type;
326 crypt_data->min_key_version = min_key_version;
327 crypt_data->page0_offset = offset;
328 memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
329
330 return crypt_data;
331}
332
333/******************************************************************
334Free a crypt data object
335@param[in,out] crypt_data crypt data to be freed */
336UNIV_INTERN
337void
338fil_space_destroy_crypt_data(
339 fil_space_crypt_t **crypt_data)
340{
341 if (crypt_data != NULL && (*crypt_data) != NULL) {
342 fil_space_crypt_t* c;
343 if (UNIV_LIKELY(fil_crypt_threads_inited)) {
344 mutex_enter(&fil_crypt_threads_mutex);
345 c = *crypt_data;
346 *crypt_data = NULL;
347 mutex_exit(&fil_crypt_threads_mutex);
348 } else {
349 ut_ad(srv_read_only_mode || !srv_was_started);
350 c = *crypt_data;
351 *crypt_data = NULL;
352 }
353 if (c) {
354 c->~fil_space_crypt_t();
355 ut_free(c);
356 }
357 }
358}
359
360/******************************************************************
361Write crypt data to a page (0)
362@param[in] space tablespace
363@param[in,out] page0 first page of the tablespace
364@param[in,out] mtr mini-transaction */
365UNIV_INTERN
366void
367fil_space_crypt_t::write_page0(
368 const fil_space_t* space,
369 byte* page,
370 mtr_t* mtr)
371{
372 ut_ad(this == space->crypt_data);
373 const uint len = sizeof(iv);
374 const ulint offset = FSP_HEADER_OFFSET
375 + fsp_header_get_encryption_offset(page_size_t(space->flags));
376 page0_offset = offset;
377
378 /*
379 redo log this as bytewise updates to page 0
380 followed by an MLOG_FILE_WRITE_CRYPT_DATA
381 (that will during recovery update fil_space_t)
382 */
383 mlog_write_string(page + offset, CRYPT_MAGIC, MAGIC_SZ, mtr);
384 mlog_write_ulint(page + offset + MAGIC_SZ + 0, type, MLOG_1BYTE, mtr);
385 mlog_write_ulint(page + offset + MAGIC_SZ + 1, len, MLOG_1BYTE, mtr);
386 mlog_write_string(page + offset + MAGIC_SZ + 2, iv, len,
387 mtr);
388 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len, min_key_version,
389 MLOG_4BYTES, mtr);
390 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 4, key_id,
391 MLOG_4BYTES, mtr);
392 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 8, encryption,
393 MLOG_1BYTE, mtr);
394
395 byte* log_ptr = mlog_open(mtr, 11 + 17 + len);
396
397 if (log_ptr != NULL) {
398 log_ptr = mlog_write_initial_log_record_fast(
399 page,
400 MLOG_FILE_WRITE_CRYPT_DATA,
401 log_ptr, mtr);
402 mach_write_to_4(log_ptr, space->id);
403 log_ptr += 4;
404 mach_write_to_2(log_ptr, offset);
405 log_ptr += 2;
406 mach_write_to_1(log_ptr, type);
407 log_ptr += 1;
408 mach_write_to_1(log_ptr, len);
409 log_ptr += 1;
410 mach_write_to_4(log_ptr, min_key_version);
411 log_ptr += 4;
412 mach_write_to_4(log_ptr, key_id);
413 log_ptr += 4;
414 mach_write_to_1(log_ptr, encryption);
415 log_ptr += 1;
416 mlog_close(mtr, log_ptr);
417
418 mlog_catenate_string(mtr, iv, len);
419 }
420}
421
422/******************************************************************
423Set crypt data for a tablespace
424@param[in,out] space Tablespace
425@param[in,out] crypt_data Crypt data to be set
426@return crypt_data in tablespace */
427static
428fil_space_crypt_t*
429fil_space_set_crypt_data(
430 fil_space_t* space,
431 fil_space_crypt_t* crypt_data)
432{
433 fil_space_crypt_t* free_crypt_data = NULL;
434 fil_space_crypt_t* ret_crypt_data = NULL;
435
436 /* Provided space is protected using fil_space_acquire()
437 from concurrent operations. */
438 if (space->crypt_data != NULL) {
439 /* There is already crypt data present,
440 merge new crypt_data */
441 fil_space_merge_crypt_data(space->crypt_data,
442 crypt_data);
443 ret_crypt_data = space->crypt_data;
444 free_crypt_data = crypt_data;
445 } else {
446 space->crypt_data = crypt_data;
447 ret_crypt_data = space->crypt_data;
448 }
449
450 if (free_crypt_data != NULL) {
451 /* there was already crypt data present and the new crypt
452 * data provided as argument to this function has been merged
453 * into that => free new crypt data
454 */
455 fil_space_destroy_crypt_data(&free_crypt_data);
456 }
457
458 return ret_crypt_data;
459}
460
461/******************************************************************
462Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
463@param[in] ptr Log entry start
464@param[in] end_ptr Log entry end
465@param[in] block buffer block
466@return position on log buffer */
467UNIV_INTERN
468byte*
469fil_parse_write_crypt_data(
470 byte* ptr,
471 const byte* end_ptr,
472 dberr_t* err)
473{
474 /* check that redo log entry is complete */
475 uint entry_size =
476 4 + // size of space_id
477 2 + // size of offset
478 1 + // size of type
479 1 + // size of iv-len
480 4 + // size of min_key_version
481 4 + // size of key_id
482 1; // fil_encryption_t
483
484 *err = DB_SUCCESS;
485
486 if (ptr + entry_size > end_ptr) {
487 return NULL;
488 }
489
490 ulint space_id = mach_read_from_4(ptr);
491 ptr += 4;
492 uint offset = mach_read_from_2(ptr);
493 ptr += 2;
494 uint type = mach_read_from_1(ptr);
495 ptr += 1;
496 uint len = mach_read_from_1(ptr);
497 ptr += 1;
498
499 ut_a(type == CRYPT_SCHEME_UNENCRYPTED ||
500 type == CRYPT_SCHEME_1); // only supported
501
502 ut_a(len == CRYPT_SCHEME_1_IV_LEN); // only supported
503 uint min_key_version = mach_read_from_4(ptr);
504 ptr += 4;
505
506 uint key_id = mach_read_from_4(ptr);
507 ptr += 4;
508
509 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(ptr);
510 ptr +=1;
511
512 if (ptr + len > end_ptr) {
513 return NULL;
514 }
515
516 fil_space_crypt_t* crypt_data = fil_space_create_crypt_data(encryption, key_id);
517 /* Need to overwrite these as above will initialize fields. */
518 crypt_data->page0_offset = offset;
519 crypt_data->min_key_version = min_key_version;
520 crypt_data->encryption = encryption;
521 memcpy(crypt_data->iv, ptr, len);
522 ptr += len;
523
524 /* update fil_space memory cache with crypt_data */
525 if (fil_space_t* space = fil_space_acquire_silent(space_id)) {
526 crypt_data = fil_space_set_crypt_data(space, crypt_data);
527 space->release();
528 /* Check is used key found from encryption plugin */
529 if (crypt_data->should_encrypt()
530 && !crypt_data->is_key_found()) {
531 *err = DB_DECRYPTION_FAILED;
532 }
533 } else {
534 fil_space_destroy_crypt_data(&crypt_data);
535 }
536
537 return ptr;
538}
539
540/** Encrypt a buffer.
541@param[in,out] crypt_data Crypt data
542@param[in] space space_id
543@param[in] offset Page offset
544@param[in] lsn Log sequence number
545@param[in] src_frame Page to encrypt
546@param[in] page_size Page size
547@param[in,out] dst_frame Output buffer
548@return encrypted buffer or NULL */
549UNIV_INTERN
550byte*
551fil_encrypt_buf(
552 fil_space_crypt_t* crypt_data,
553 ulint space,
554 ulint offset,
555 lsn_t lsn,
556 const byte* src_frame,
557 const page_size_t& page_size,
558 byte* dst_frame)
559{
560 uint size = uint(page_size.physical());
561 uint key_version = fil_crypt_get_latest_key_version(crypt_data);
562
563 ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
564
565 ulint orig_page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
566 ibool page_compressed = (orig_page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
567 uint header_len = FIL_PAGE_DATA;
568
569 if (page_compressed) {
570 header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
571 }
572
573 /* FIL page header is not encrypted */
574 memcpy(dst_frame, src_frame, header_len);
575
576 /* Store key version */
577 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, key_version);
578
579 /* Calculate the start offset in a page */
580 uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
581 uint srclen = size - unencrypted_bytes;
582 const byte* src = src_frame + header_len;
583 byte* dst = dst_frame + header_len;
584 uint32 dstlen = 0;
585
586 if (page_compressed) {
587 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
588 }
589
590 int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
591 crypt_data, key_version,
592 (uint32)space, (uint32)offset, lsn);
593 ut_a(rc == MY_AES_OK);
594 ut_a(dstlen == srclen);
595
596 /* For compressed tables we do not store the FIL header because
597 the whole page is not stored to the disk. In compressed tables only
598 the FIL header + compressed (and now encrypted) payload alligned
599 to sector boundary is written. */
600 if (!page_compressed) {
601 /* FIL page trailer is also not encrypted */
602 memcpy(dst_frame + page_size.physical() - FIL_PAGE_DATA_END,
603 src_frame + page_size.physical() - FIL_PAGE_DATA_END,
604 FIL_PAGE_DATA_END);
605 } else {
606 /* Clean up rest of buffer */
607 memset(dst_frame+header_len+srclen, 0,
608 page_size.physical() - (header_len + srclen));
609 }
610
611 /* handle post encryption checksum */
612 ib_uint32_t checksum = 0;
613
614 checksum = fil_crypt_calculate_checksum(page_size, dst_frame);
615
616 // store the post-encryption checksum after the key-version
617 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum);
618
619 ut_ad(fil_space_verify_crypt_checksum(dst_frame, page_size,
620 space, offset));
621
622 srv_stats.pages_encrypted.inc();
623
624 return dst_frame;
625}
626
627/******************************************************************
628Encrypt a page
629
630@param[in] space Tablespace
631@param[in] offset Page offset
632@param[in] lsn Log sequence number
633@param[in] src_frame Page to encrypt
634@param[in,out] dst_frame Output buffer
635@return encrypted buffer or NULL */
636UNIV_INTERN
637byte*
638fil_space_encrypt(
639 const fil_space_t* space,
640 ulint offset,
641 lsn_t lsn,
642 byte* src_frame,
643 byte* dst_frame)
644{
645 switch (mach_read_from_2(src_frame+FIL_PAGE_TYPE)) {
646 case FIL_PAGE_TYPE_FSP_HDR:
647 case FIL_PAGE_TYPE_XDES:
648 case FIL_PAGE_RTREE:
649 /* File space header, extent descriptor or spatial index
650 are not encrypted. */
651 return src_frame;
652 }
653
654 if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
655 return (src_frame);
656 }
657
658 fil_space_crypt_t* crypt_data = space->crypt_data;
659 const page_size_t page_size(space->flags);
660 ut_ad(space->pending_io());
661 byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn,
662 src_frame, page_size, dst_frame);
663
664#ifdef UNIV_DEBUG
665 if (tmp) {
666 /* Verify that encrypted buffer is not corrupted */
667 byte* tmp_mem = (byte *)malloc(srv_page_size);
668 dberr_t err = DB_SUCCESS;
669 byte* src = src_frame;
670 bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
671 byte* comp_mem = NULL;
672 byte* uncomp_mem = NULL;
673
674 if (page_compressed_encrypted) {
675 comp_mem = (byte *)malloc(srv_page_size);
676 uncomp_mem = (byte *)malloc(srv_page_size);
677 memcpy(comp_mem, src_frame, srv_page_size);
678 fil_decompress_page(uncomp_mem, comp_mem,
679 srv_page_size, NULL);
680 src = uncomp_mem;
681 }
682
683 bool corrupted1 = buf_page_is_corrupted(true, src, page_size, space);
684 bool ok = fil_space_decrypt(crypt_data, tmp_mem, page_size, tmp, &err);
685
686 /* Need to decompress the page if it was also compressed */
687 if (page_compressed_encrypted) {
688 memcpy(comp_mem, tmp_mem, srv_page_size);
689 fil_decompress_page(tmp_mem, comp_mem,
690 srv_page_size, NULL);
691 }
692
693 bool corrupted = buf_page_is_corrupted(true, tmp_mem, page_size, space);
694 memcpy(tmp_mem+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, src+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8);
695 bool different = memcmp(src, tmp_mem, page_size.physical());
696
697 if (!ok || corrupted || corrupted1 || err != DB_SUCCESS || different) {
698 fprintf(stderr, "ok %d corrupted %d corrupted1 %d err %d different %d\n",
699 ok , corrupted, corrupted1, err, different);
700 fprintf(stderr, "src_frame\n");
701 buf_page_print(src_frame, page_size);
702 fprintf(stderr, "encrypted_frame\n");
703 buf_page_print(tmp, page_size);
704 fprintf(stderr, "decrypted_frame\n");
705 buf_page_print(tmp_mem, page_size);
706 ut_ad(0);
707 }
708
709 free(tmp_mem);
710
711 if (comp_mem) {
712 free(comp_mem);
713 }
714
715 if (uncomp_mem) {
716 free(uncomp_mem);
717 }
718 }
719#endif /* UNIV_DEBUG */
720
721 return tmp;
722}
723
724/** Decrypt a page.
725@param[in] crypt_data crypt_data
726@param[in] tmp_frame Temporary buffer
727@param[in] page_size Page size
728@param[in,out] src_frame Page to decrypt
729@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
730@return true if page decrypted, false if not.*/
731UNIV_INTERN
732bool
733fil_space_decrypt(
734 fil_space_crypt_t* crypt_data,
735 byte* tmp_frame,
736 const page_size_t& page_size,
737 byte* src_frame,
738 dberr_t* err)
739{
740 ulint page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
741 uint key_version = mach_read_from_4(src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
742 bool page_compressed = (page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
743 uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
744 uint space = mach_read_from_4(src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
745 ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
746
747 *err = DB_SUCCESS;
748
749 if (key_version == ENCRYPTION_KEY_NOT_ENCRYPTED) {
750 return false;
751 }
752
753 ut_a(crypt_data != NULL && crypt_data->is_encrypted());
754
755 /* read space & lsn */
756 uint header_len = FIL_PAGE_DATA;
757
758 if (page_compressed) {
759 header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
760 }
761
762 /* Copy FIL page header, it is not encrypted */
763 memcpy(tmp_frame, src_frame, header_len);
764
765 /* Calculate the offset where decryption starts */
766 const byte* src = src_frame + header_len;
767 byte* dst = tmp_frame + header_len;
768 uint32 dstlen = 0;
769 uint srclen = uint(page_size.physical())
770 - header_len - FIL_PAGE_DATA_END;
771
772 if (page_compressed) {
773 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
774 }
775
776 int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
777 crypt_data, key_version,
778 space, offset, lsn);
779
780 if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
781
782 if (rc == -1) {
783 *err = DB_DECRYPTION_FAILED;
784 return false;
785 }
786
787 ib::fatal() << "Unable to decrypt data-block "
788 << " src: " << src << "srclen: "
789 << srclen << " buf: " << dst << "buflen: "
790 << dstlen << " return-code: " << rc
791 << " Can't continue!";
792 }
793
794 /* For compressed tables we do not store the FIL header because
795 the whole page is not stored to the disk. In compressed tables only
796 the FIL header + compressed (and now encrypted) payload alligned
797 to sector boundary is written. */
798 if (!page_compressed) {
799 /* Copy FIL trailer */
800 memcpy(tmp_frame + page_size.physical() - FIL_PAGE_DATA_END,
801 src_frame + page_size.physical() - FIL_PAGE_DATA_END,
802 FIL_PAGE_DATA_END);
803 }
804
805 srv_stats.pages_decrypted.inc();
806
807 return true; /* page was decrypted */
808}
809
810/**
811Decrypt a page.
812@param[in] space Tablespace
813@param[in] tmp_frame Temporary buffer used for decrypting
814@param[in,out] src_frame Page to decrypt
815@param[out] decrypted true if page was decrypted
816@return decrypted page, or original not encrypted page if decryption is
817not needed.*/
818UNIV_INTERN
819byte*
820fil_space_decrypt(
821 const fil_space_t* space,
822 byte* tmp_frame,
823 byte* src_frame,
824 bool* decrypted)
825{
826 dberr_t err = DB_SUCCESS;
827 byte* res = NULL;
828 const page_size_t page_size(space->flags);
829 *decrypted = false;
830
831 ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
832 ut_ad(space->pending_io());
833
834 bool encrypted = fil_space_decrypt(space->crypt_data, tmp_frame,
835 page_size, src_frame, &err);
836
837 if (err == DB_SUCCESS) {
838 if (encrypted) {
839 *decrypted = true;
840 /* Copy the decrypted page back to page buffer, not
841 really any other options. */
842 memcpy(src_frame, tmp_frame, page_size.physical());
843 }
844
845 res = src_frame;
846 }
847
848 return res;
849}
850
851/******************************************************************
852Calculate post encryption checksum
853@param[in] page_size page size
854@param[in] dst_frame Block where checksum is calculated
855@return page checksum
856not needed. */
857UNIV_INTERN
858uint32_t
859fil_crypt_calculate_checksum(
860 const page_size_t& page_size,
861 const byte* dst_frame)
862{
863 /* For encrypted tables we use only crc32 and strict_crc32 */
864 return page_size.is_compressed()
865 ? page_zip_calc_checksum(dst_frame, page_size.physical(),
866 SRV_CHECKSUM_ALGORITHM_CRC32)
867 : buf_calc_page_crc32(dst_frame);
868}
869
870/***********************************************************************/
871
872/** A copy of global key state */
873struct key_state_t {
874 key_state_t() : key_id(0), key_version(0),
875 rotate_key_age(srv_fil_crypt_rotate_key_age) {}
876 bool operator==(const key_state_t& other) const {
877 return key_version == other.key_version &&
878 rotate_key_age == other.rotate_key_age;
879 }
880 uint key_id;
881 uint key_version;
882 uint rotate_key_age;
883};
884
885/***********************************************************************
886Copy global key state
887@param[in,out] new_state key state
888@param[in] crypt_data crypt data */
889static void
890fil_crypt_get_key_state(
891 key_state_t* new_state,
892 fil_space_crypt_t* crypt_data)
893{
894 if (srv_encrypt_tables) {
895 new_state->key_version = crypt_data->key_get_latest_version();
896 new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
897
898 ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
899 } else {
900 new_state->key_version = 0;
901 new_state->rotate_key_age = 0;
902 }
903}
904
905/***********************************************************************
906Check if a key needs rotation given a key_state
907@param[in] crypt_data Encryption information
908@param[in] key_version Current key version
909@param[in] latest_key_version Latest key version
910@param[in] rotate_key_age when to rotate
911@return true if key needs rotation, false if not */
912static bool
913fil_crypt_needs_rotation(
914 const fil_space_crypt_t* crypt_data,
915 uint key_version,
916 uint latest_key_version,
917 uint rotate_key_age)
918{
919 if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
920 return false;
921 }
922
923 if (key_version == 0 && latest_key_version != 0) {
924 /* this is rotation unencrypted => encrypted
925 * ignore rotate_key_age */
926 return true;
927 }
928
929 if (latest_key_version == 0 && key_version != 0) {
930 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
931 /* this is rotation encrypted => unencrypted */
932 return true;
933 }
934 return false;
935 }
936
937 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
938 && crypt_data->type == CRYPT_SCHEME_1
939 && srv_encrypt_tables == 0 ) {
940 /* This is rotation encrypted => unencrypted */
941 return true;
942 }
943
944 /* this is rotation encrypted => encrypted,
945 * only reencrypt if key is sufficiently old */
946 if (key_version + rotate_key_age < latest_key_version) {
947 return true;
948 }
949
950 return false;
951}
952
953/** Read page 0 and possible crypt data from there.
954@param[in,out] space Tablespace */
955static inline
956void
957fil_crypt_read_crypt_data(fil_space_t* space)
958{
959 if (space->crypt_data || space->size
960 || !fil_space_get_size(space->id)) {
961 /* The encryption metadata has already been read, or
962 the tablespace is not encrypted and the file has been
963 opened already, or the file cannot be accessed,
964 likely due to a concurrent TRUNCATE or
965 RENAME or DROP (possibly as part of ALTER TABLE).
966 FIXME: The file can become unaccessible any time
967 after this check! We should really remove this
968 function and instead make crypt_data an integral
969 part of fil_space_t. */
970 return;
971 }
972
973 const page_size_t page_size(space->flags);
974 mtr_t mtr;
975 mtr.start();
976 if (buf_block_t* block = buf_page_get(page_id_t(space->id, 0),
977 page_size, RW_S_LATCH, &mtr)) {
978 mutex_enter(&fil_system.mutex);
979 if (!space->crypt_data) {
980 space->crypt_data = fil_space_read_crypt_data(
981 page_size, block->frame);
982 }
983 mutex_exit(&fil_system.mutex);
984 }
985 mtr.commit();
986}
987
988/***********************************************************************
989Start encrypting a space
990@param[in,out] space Tablespace
991@return true if a recheck is needed */
992static
993bool
994fil_crypt_start_encrypting_space(
995 fil_space_t* space)
996{
997 bool recheck = false;
998
999 mutex_enter(&fil_crypt_threads_mutex);
1000
1001 fil_space_crypt_t *crypt_data = space->crypt_data;
1002
1003 /* If space is not encrypted and encryption is not enabled, then
1004 do not continue encrypting the space. */
1005 if (!crypt_data && !srv_encrypt_tables) {
1006 mutex_exit(&fil_crypt_threads_mutex);
1007 return false;
1008 }
1009
1010 if (crypt_data != NULL || fil_crypt_start_converting) {
1011 /* someone beat us to it */
1012 if (fil_crypt_start_converting) {
1013 recheck = true;
1014 }
1015
1016 mutex_exit(&fil_crypt_threads_mutex);
1017 return recheck;
1018 }
1019
1020 /* NOTE: we need to write and flush page 0 before publishing
1021 * the crypt data. This so that after restart there is no
1022 * risk of finding encrypted pages without having
1023 * crypt data in page 0 */
1024
1025 /* 1 - create crypt data */
1026 crypt_data = fil_space_create_crypt_data(FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
1027
1028 if (crypt_data == NULL) {
1029 mutex_exit(&fil_crypt_threads_mutex);
1030 return false;
1031 }
1032
1033 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
1034 crypt_data->min_key_version = 0; // all pages are unencrypted
1035 crypt_data->rotate_state.start_time = time(0);
1036 crypt_data->rotate_state.starting = true;
1037 crypt_data->rotate_state.active_threads = 1;
1038
1039 mutex_enter(&crypt_data->mutex);
1040 crypt_data = fil_space_set_crypt_data(space, crypt_data);
1041 mutex_exit(&crypt_data->mutex);
1042
1043 fil_crypt_start_converting = true;
1044 mutex_exit(&fil_crypt_threads_mutex);
1045
1046 do
1047 {
1048 mtr_t mtr;
1049 mtr.start();
1050 mtr.set_named_space(space);
1051
1052 /* 2 - get page 0 */
1053 dberr_t err = DB_SUCCESS;
1054 buf_block_t* block = buf_page_get_gen(
1055 page_id_t(space->id, 0), page_size_t(space->flags),
1056 RW_X_LATCH, NULL, BUF_GET,
1057 __FILE__, __LINE__,
1058 &mtr, &err);
1059
1060
1061 /* 3 - write crypt data to page 0 */
1062 byte* frame = buf_block_get_frame(block);
1063 crypt_data->type = CRYPT_SCHEME_1;
1064 crypt_data->write_page0(space, frame, &mtr);
1065
1066 mtr.commit();
1067
1068 /* record lsn of update */
1069 lsn_t end_lsn = mtr.commit_lsn();
1070
1071 /* 4 - sync tablespace before publishing crypt data */
1072
1073 bool success = false;
1074 ulint sum_pages = 0;
1075
1076 do {
1077 ulint n_pages = 0;
1078 success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
1079 buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
1080 sum_pages += n_pages;
1081 } while (!success);
1082
1083 /* 5 - publish crypt data */
1084 mutex_enter(&fil_crypt_threads_mutex);
1085 mutex_enter(&crypt_data->mutex);
1086 crypt_data->type = CRYPT_SCHEME_1;
1087 ut_a(crypt_data->rotate_state.active_threads == 1);
1088 crypt_data->rotate_state.active_threads = 0;
1089 crypt_data->rotate_state.starting = false;
1090
1091 fil_crypt_start_converting = false;
1092 mutex_exit(&crypt_data->mutex);
1093 mutex_exit(&fil_crypt_threads_mutex);
1094
1095 return recheck;
1096 } while (0);
1097
1098 mutex_enter(&crypt_data->mutex);
1099 ut_a(crypt_data->rotate_state.active_threads == 1);
1100 crypt_data->rotate_state.active_threads = 0;
1101 mutex_exit(&crypt_data->mutex);
1102
1103 mutex_enter(&fil_crypt_threads_mutex);
1104 fil_crypt_start_converting = false;
1105 mutex_exit(&fil_crypt_threads_mutex);
1106
1107 return recheck;
1108}
1109
1110/** State of a rotation thread */
1111struct rotate_thread_t {
1112 explicit rotate_thread_t(uint no) {
1113 memset(this, 0, sizeof(* this));
1114 thread_no = no;
1115 first = true;
1116 estimated_max_iops = 20;
1117 }
1118
1119 uint thread_no;
1120 bool first; /*!< is position before first space */
1121 fil_space_t* space; /*!< current space or NULL */
1122 ulint offset; /*!< current offset */
1123 ulint batch; /*!< #pages to rotate */
1124 uint min_key_version_found;/*!< min key version found but not rotated */
1125 lsn_t end_lsn; /*!< max lsn when rotating this space */
1126
1127 uint estimated_max_iops; /*!< estimation of max iops */
1128 uint allocated_iops; /*!< allocated iops */
1129 ulint cnt_waited; /*!< #times waited during this slot */
1130 uintmax_t sum_waited_us; /*!< wait time during this slot */
1131
1132 fil_crypt_stat_t crypt_stat; // statistics
1133
1134 btr_scrub_t scrub_data; /* thread local data used by btr_scrub-functions
1135 * when iterating pages of tablespace */
1136
1137 /** @return whether this thread should terminate */
1138 bool should_shutdown() const {
1139 switch (srv_shutdown_state) {
1140 case SRV_SHUTDOWN_NONE:
1141 return thread_no >= srv_n_fil_crypt_threads;
1142 case SRV_SHUTDOWN_EXIT_THREADS:
1143 /* srv_init_abort() must have been invoked */
1144 case SRV_SHUTDOWN_CLEANUP:
1145 return true;
1146 case SRV_SHUTDOWN_FLUSH_PHASE:
1147 case SRV_SHUTDOWN_LAST_PHASE:
1148 break;
1149 }
1150 ut_ad(0);
1151 return true;
1152 }
1153};
1154
1155/***********************************************************************
1156Check if space needs rotation given a key_state
1157@param[in,out] state Key rotation state
1158@param[in,out] key_state Key state
1159@param[in,out] recheck needs recheck ?
1160@return true if space needs key rotation */
1161static
1162bool
1163fil_crypt_space_needs_rotation(
1164 rotate_thread_t* state,
1165 key_state_t* key_state,
1166 bool* recheck)
1167{
1168 fil_space_t* space = state->space;
1169
1170 /* Make sure that tablespace is normal tablespace */
1171 if (space->purpose != FIL_TYPE_TABLESPACE) {
1172 return false;
1173 }
1174
1175 ut_ad(space->referenced());
1176
1177 fil_space_crypt_t *crypt_data = space->crypt_data;
1178
1179 if (crypt_data == NULL) {
1180 /**
1181 * space has no crypt data
1182 * start encrypting it...
1183 */
1184 *recheck = fil_crypt_start_encrypting_space(space);
1185 crypt_data = space->crypt_data;
1186
1187 if (crypt_data == NULL) {
1188 return false;
1189 }
1190
1191 crypt_data->key_get_latest_version();
1192 }
1193
1194 /* If used key_id is not found from encryption plugin we can't
1195 continue to rotate the tablespace */
1196 if (!crypt_data->is_key_found()) {
1197 return false;
1198 }
1199
1200 mutex_enter(&crypt_data->mutex);
1201
1202 do {
1203 /* prevent threads from starting to rotate space */
1204 if (crypt_data->rotate_state.starting) {
1205 /* recheck this space later */
1206 *recheck = true;
1207 break;
1208 }
1209
1210 /* prevent threads from starting to rotate space */
1211 if (space->is_stopping()) {
1212 break;
1213 }
1214
1215 if (crypt_data->rotate_state.flushing) {
1216 break;
1217 }
1218
1219 /* No need to rotate space if encryption is disabled */
1220 if (crypt_data->not_encrypted()) {
1221 break;
1222 }
1223
1224 if (crypt_data->key_id != key_state->key_id) {
1225 key_state->key_id= crypt_data->key_id;
1226 fil_crypt_get_key_state(key_state, crypt_data);
1227 }
1228
1229 bool need_key_rotation = fil_crypt_needs_rotation(
1230 crypt_data,
1231 crypt_data->min_key_version,
1232 key_state->key_version,
1233 key_state->rotate_key_age);
1234
1235 crypt_data->rotate_state.scrubbing.is_active =
1236 btr_scrub_start_space(space->id, &state->scrub_data);
1237
1238 time_t diff = time(0) - crypt_data->rotate_state.scrubbing.
1239 last_scrub_completed;
1240
1241 bool need_scrubbing =
1242 (srv_background_scrub_data_uncompressed ||
1243 srv_background_scrub_data_compressed) &&
1244 crypt_data->rotate_state.scrubbing.is_active
1245 && diff >= 0
1246 && ulint(diff) >= srv_background_scrub_data_interval;
1247
1248 if (need_key_rotation == false && need_scrubbing == false) {
1249 break;
1250 }
1251
1252 mutex_exit(&crypt_data->mutex);
1253
1254 return true;
1255 } while (0);
1256
1257 mutex_exit(&crypt_data->mutex);
1258
1259
1260 return false;
1261}
1262
1263/***********************************************************************
1264Update global statistics with thread statistics
1265@param[in,out] state key rotation statistics */
1266static void
1267fil_crypt_update_total_stat(
1268 rotate_thread_t *state)
1269{
1270 mutex_enter(&crypt_stat_mutex);
1271 crypt_stat.pages_read_from_cache +=
1272 state->crypt_stat.pages_read_from_cache;
1273 crypt_stat.pages_read_from_disk +=
1274 state->crypt_stat.pages_read_from_disk;
1275 crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1276 crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1277 // remote old estimate
1278 crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1279 // add new estimate
1280 crypt_stat.estimated_iops += state->estimated_max_iops;
1281 mutex_exit(&crypt_stat_mutex);
1282
1283 // make new estimate "current" estimate
1284 memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1285 // record our old (current) estimate
1286 state->crypt_stat.estimated_iops = state->estimated_max_iops;
1287}
1288
1289/***********************************************************************
1290Allocate iops to thread from global setting,
1291used before starting to rotate a space.
1292@param[in,out] state Rotation state
1293@return true if allocation succeeded, false if failed */
1294static
1295bool
1296fil_crypt_alloc_iops(
1297 rotate_thread_t *state)
1298{
1299 ut_ad(state->allocated_iops == 0);
1300
1301 /* We have not yet selected the space to rotate, thus
1302 state might not contain space and we can't check
1303 its status yet. */
1304
1305 uint max_iops = state->estimated_max_iops;
1306 mutex_enter(&fil_crypt_threads_mutex);
1307
1308 if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1309 /* this can happen when user decreases srv_fil_crypt_iops */
1310 mutex_exit(&fil_crypt_threads_mutex);
1311 return false;
1312 }
1313
1314 uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1315
1316 if (alloc > max_iops) {
1317 alloc = max_iops;
1318 }
1319
1320 n_fil_crypt_iops_allocated += alloc;
1321 mutex_exit(&fil_crypt_threads_mutex);
1322
1323 state->allocated_iops = alloc;
1324
1325 return alloc > 0;
1326}
1327
1328/***********************************************************************
1329Reallocate iops to thread,
1330used when inside a space
1331@param[in,out] state Rotation state */
1332static
1333void
1334fil_crypt_realloc_iops(
1335 rotate_thread_t *state)
1336{
1337 ut_a(state->allocated_iops > 0);
1338
1339 if (10 * state->cnt_waited > state->batch) {
1340 /* if we waited more than 10% re-estimate max_iops */
1341 ulint avg_wait_time_us =
1342 ulint(state->sum_waited_us / state->cnt_waited);
1343
1344 if (avg_wait_time_us == 0) {
1345 avg_wait_time_us = 1; // prevent division by zero
1346 }
1347
1348 DBUG_PRINT("ib_crypt",
1349 ("thr_no: %u - update estimated_max_iops from %u to "
1350 ULINTPF ".",
1351 state->thread_no,
1352 state->estimated_max_iops,
1353 1000000 / avg_wait_time_us));
1354
1355 state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1356 state->cnt_waited = 0;
1357 state->sum_waited_us = 0;
1358 } else {
1359 DBUG_PRINT("ib_crypt",
1360 ("thr_no: %u only waited " ULINTPF
1361 "%% skip re-estimate.",
1362 state->thread_no,
1363 (100 * state->cnt_waited)
1364 / (state->batch ? state->batch : 1)));
1365 }
1366
1367 if (state->estimated_max_iops <= state->allocated_iops) {
1368 /* return extra iops */
1369 uint extra = state->allocated_iops - state->estimated_max_iops;
1370
1371 if (extra > 0) {
1372 mutex_enter(&fil_crypt_threads_mutex);
1373 if (n_fil_crypt_iops_allocated < extra) {
1374 /* unknown bug!
1375 * crash in debug
1376 * keep n_fil_crypt_iops_allocated unchanged
1377 * in release */
1378 ut_ad(0);
1379 extra = 0;
1380 }
1381 n_fil_crypt_iops_allocated -= extra;
1382 state->allocated_iops -= extra;
1383
1384 if (state->allocated_iops == 0) {
1385 /* no matter how slow io system seems to be
1386 * never decrease allocated_iops to 0... */
1387 state->allocated_iops ++;
1388 n_fil_crypt_iops_allocated ++;
1389 }
1390
1391 os_event_set(fil_crypt_threads_event);
1392 mutex_exit(&fil_crypt_threads_mutex);
1393 }
1394 } else {
1395 /* see if there are more to get */
1396 mutex_enter(&fil_crypt_threads_mutex);
1397 if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1398 /* there are extra iops free */
1399 uint extra = srv_n_fil_crypt_iops -
1400 n_fil_crypt_iops_allocated;
1401 if (state->allocated_iops + extra >
1402 state->estimated_max_iops) {
1403 /* but don't alloc more than our max */
1404 extra = state->estimated_max_iops -
1405 state->allocated_iops;
1406 }
1407 n_fil_crypt_iops_allocated += extra;
1408 state->allocated_iops += extra;
1409
1410 DBUG_PRINT("ib_crypt",
1411 ("thr_no: %u increased iops from %u to %u.",
1412 state->thread_no,
1413 state->allocated_iops - extra,
1414 state->allocated_iops));
1415
1416 }
1417 mutex_exit(&fil_crypt_threads_mutex);
1418 }
1419
1420 fil_crypt_update_total_stat(state);
1421}
1422
1423/***********************************************************************
1424Return allocated iops to global
1425@param[in,out] state Rotation state */
1426static
1427void
1428fil_crypt_return_iops(
1429 rotate_thread_t *state)
1430{
1431 if (state->allocated_iops > 0) {
1432 uint iops = state->allocated_iops;
1433 mutex_enter(&fil_crypt_threads_mutex);
1434 if (n_fil_crypt_iops_allocated < iops) {
1435 /* unknown bug!
1436 * crash in debug
1437 * keep n_fil_crypt_iops_allocated unchanged
1438 * in release */
1439 ut_ad(0);
1440 iops = 0;
1441 }
1442
1443 n_fil_crypt_iops_allocated -= iops;
1444 state->allocated_iops = 0;
1445 os_event_set(fil_crypt_threads_event);
1446 mutex_exit(&fil_crypt_threads_mutex);
1447 }
1448
1449 fil_crypt_update_total_stat(state);
1450}
1451
1452/***********************************************************************
1453Search for a space needing rotation
1454@param[in,out] key_state Key state
1455@param[in,out] state Rotation state
1456@param[in,out] recheck recheck ? */
1457static
1458bool
1459fil_crypt_find_space_to_rotate(
1460 key_state_t* key_state,
1461 rotate_thread_t* state,
1462 bool* recheck)
1463{
1464 /* we need iops to start rotating */
1465 while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1466 os_event_reset(fil_crypt_threads_event);
1467 os_event_wait_time(fil_crypt_threads_event, 100000);
1468 }
1469
1470 if (state->should_shutdown()) {
1471 if (state->space) {
1472 state->space->release();
1473 state->space = NULL;
1474 }
1475 return false;
1476 }
1477
1478 if (state->first) {
1479 state->first = false;
1480 if (state->space) {
1481 state->space->release();
1482 }
1483 state->space = NULL;
1484 }
1485
1486 /* If key rotation is enabled (default) we iterate all tablespaces.
1487 If key rotation is not enabled we iterate only the tablespaces
1488 added to keyrotation list. */
1489 if (srv_fil_crypt_rotate_key_age) {
1490 state->space = fil_space_next(state->space);
1491 } else {
1492 state->space = fil_space_keyrotate_next(state->space);
1493 }
1494
1495 while (!state->should_shutdown() && state->space) {
1496 /* If there is no crypt data and we have not yet read
1497 page 0 for this tablespace, we need to read it before
1498 we can continue. */
1499 if (!state->space->crypt_data) {
1500 fil_crypt_read_crypt_data(state->space);
1501 }
1502
1503 if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1504 ut_ad(key_state->key_id);
1505 /* init state->min_key_version_found before
1506 * starting on a space */
1507 state->min_key_version_found = key_state->key_version;
1508 return true;
1509 }
1510
1511 if (srv_fil_crypt_rotate_key_age) {
1512 state->space = fil_space_next(state->space);
1513 } else {
1514 state->space = fil_space_keyrotate_next(state->space);
1515 }
1516 }
1517
1518 /* if we didn't find any space return iops */
1519 fil_crypt_return_iops(state);
1520
1521 return false;
1522
1523}
1524
1525/***********************************************************************
1526Start rotating a space
1527@param[in] key_state Key state
1528@param[in,out] state Rotation state */
1529static
1530void
1531fil_crypt_start_rotate_space(
1532 const key_state_t* key_state,
1533 rotate_thread_t* state)
1534{
1535 fil_space_crypt_t *crypt_data = state->space->crypt_data;
1536
1537 ut_ad(crypt_data);
1538 mutex_enter(&crypt_data->mutex);
1539 ut_ad(key_state->key_id == crypt_data->key_id);
1540
1541 if (crypt_data->rotate_state.active_threads == 0) {
1542 /* only first thread needs to init */
1543 crypt_data->rotate_state.next_offset = 1; // skip page 0
1544 /* no need to rotate beyond current max
1545 * if space extends, it will be encrypted with newer version */
1546 /* FIXME: max_offset could be removed and instead
1547 space->size consulted.*/
1548 crypt_data->rotate_state.max_offset = state->space->size;
1549 crypt_data->rotate_state.end_lsn = 0;
1550 crypt_data->rotate_state.min_key_version_found =
1551 key_state->key_version;
1552
1553 crypt_data->rotate_state.start_time = time(0);
1554
1555 if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1556 crypt_data->is_encrypted() &&
1557 key_state->key_version != 0) {
1558 /* this is rotation unencrypted => encrypted */
1559 crypt_data->type = CRYPT_SCHEME_1;
1560 }
1561 }
1562
1563 /* count active threads in space */
1564 crypt_data->rotate_state.active_threads++;
1565
1566 /* Initialize thread local state */
1567 state->end_lsn = crypt_data->rotate_state.end_lsn;
1568 state->min_key_version_found =
1569 crypt_data->rotate_state.min_key_version_found;
1570
1571 mutex_exit(&crypt_data->mutex);
1572}
1573
1574/***********************************************************************
1575Search for batch of pages needing rotation
1576@param[in] key_state Key state
1577@param[in,out] state Rotation state
1578@return true if page needing key rotation found, false if not found */
1579static
1580bool
1581fil_crypt_find_page_to_rotate(
1582 const key_state_t* key_state,
1583 rotate_thread_t* state)
1584{
1585 ulint batch = srv_alloc_time * state->allocated_iops;
1586 fil_space_t* space = state->space;
1587
1588 ut_ad(!space || space->referenced());
1589
1590 /* If space is marked to be dropped stop rotation. */
1591 if (!space || space->is_stopping()) {
1592 return false;
1593 }
1594
1595 fil_space_crypt_t *crypt_data = space->crypt_data;
1596
1597 mutex_enter(&crypt_data->mutex);
1598 ut_ad(key_state->key_id == crypt_data->key_id);
1599
1600 bool found = crypt_data->rotate_state.max_offset >=
1601 crypt_data->rotate_state.next_offset;
1602
1603 if (found) {
1604 state->offset = crypt_data->rotate_state.next_offset;
1605 ulint remaining = crypt_data->rotate_state.max_offset -
1606 crypt_data->rotate_state.next_offset;
1607
1608 if (batch <= remaining) {
1609 state->batch = batch;
1610 } else {
1611 state->batch = remaining;
1612 }
1613 }
1614
1615 crypt_data->rotate_state.next_offset += batch;
1616 mutex_exit(&crypt_data->mutex);
1617 return found;
1618}
1619
1620#define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1621 fil_crypt_get_page_throttle_func(state, offset, mtr, \
1622 sleeptime_ms, __FILE__, __LINE__)
1623
1624/***********************************************************************
1625Get a page and compute sleep time
1626@param[in,out] state Rotation state
1627@param[in] offset Page offset
1628@param[in,out] mtr Minitransaction
1629@param[out] sleeptime_ms Sleep time
1630@param[in] file File where called
1631@param[in] line Line where called
1632@return page or NULL*/
1633static
1634buf_block_t*
1635fil_crypt_get_page_throttle_func(
1636 rotate_thread_t* state,
1637 ulint offset,
1638 mtr_t* mtr,
1639 ulint* sleeptime_ms,
1640 const char* file,
1641 unsigned line)
1642{
1643 fil_space_t* space = state->space;
1644 const page_size_t page_size = page_size_t(space->flags);
1645 const page_id_t page_id(space->id, offset);
1646 ut_ad(space->referenced());
1647
1648 /* Before reading from tablespace we need to make sure that
1649 the tablespace is not about to be dropped or truncated. */
1650 if (space->is_stopping()) {
1651 return NULL;
1652 }
1653
1654 dberr_t err = DB_SUCCESS;
1655 buf_block_t* block = buf_page_get_gen(page_id, page_size, RW_X_LATCH,
1656 NULL,
1657 BUF_PEEK_IF_IN_POOL, file, line,
1658 mtr, &err);
1659 if (block != NULL) {
1660 /* page was in buffer pool */
1661 state->crypt_stat.pages_read_from_cache++;
1662 return block;
1663 }
1664
1665 if (space->is_stopping()) {
1666 return NULL;
1667 }
1668
1669 state->crypt_stat.pages_read_from_disk++;
1670
1671 uintmax_t start = ut_time_us(NULL);
1672 block = buf_page_get_gen(page_id, page_size,
1673 RW_X_LATCH,
1674 NULL, BUF_GET_POSSIBLY_FREED,
1675 file, line, mtr, &err);
1676 uintmax_t end = ut_time_us(NULL);
1677
1678 if (end < start) {
1679 end = start; // safety...
1680 }
1681
1682 state->cnt_waited++;
1683 state->sum_waited_us += (end - start);
1684
1685 /* average page load */
1686 ulint add_sleeptime_ms = 0;
1687 ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1688 ulint alloc_wait_us = 1000000 / state->allocated_iops;
1689
1690 if (avg_wait_time_us < alloc_wait_us) {
1691 /* we reading faster than we allocated */
1692 add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1693 } else {
1694 /* if page load time is longer than we want, skip sleeping */
1695 }
1696
1697 *sleeptime_ms += add_sleeptime_ms;
1698
1699 return block;
1700}
1701
1702
1703/***********************************************************************
1704Get block and allocation status
1705
1706note: innodb locks fil_space_latch and then block when allocating page
1707but locks block and then fil_space_latch when freeing page.
1708
1709@param[in,out] state Rotation state
1710@param[in] offset Page offset
1711@param[in,out] mtr Minitransaction
1712@param[out] allocation_status Allocation status
1713@param[out] sleeptime_ms Sleep time
1714@return block or NULL
1715*/
1716static
1717buf_block_t*
1718btr_scrub_get_block_and_allocation_status(
1719 rotate_thread_t* state,
1720 ulint offset,
1721 mtr_t* mtr,
1722 btr_scrub_page_allocation_status_t *allocation_status,
1723 ulint* sleeptime_ms)
1724{
1725 mtr_t local_mtr;
1726 buf_block_t *block = NULL;
1727 fil_space_t* space = state->space;
1728
1729 ut_ad(space->referenced());
1730
1731 mtr_start(&local_mtr);
1732
1733 *allocation_status = fseg_page_is_free(space, (uint32_t)offset) ?
1734 BTR_SCRUB_PAGE_FREE :
1735 BTR_SCRUB_PAGE_ALLOCATED;
1736
1737 if (*allocation_status == BTR_SCRUB_PAGE_FREE) {
1738 /* this is easy case, we lock fil_space_latch first and
1739 then block */
1740 block = fil_crypt_get_page_throttle(state,
1741 offset, mtr,
1742 sleeptime_ms);
1743 mtr_commit(&local_mtr);
1744 } else {
1745 /* page is allocated according to xdes */
1746
1747 /* release fil_space_latch *before* fetching block */
1748 mtr_commit(&local_mtr);
1749
1750 /* NOTE: when we have locked dict_index_get_lock(),
1751 * it's safe to release fil_space_latch and then fetch block
1752 * as dict_index_get_lock() is needed to make tree modifications
1753 * such as free-ing a page
1754 */
1755
1756 block = fil_crypt_get_page_throttle(state,
1757 offset, mtr,
1758 sleeptime_ms);
1759 }
1760
1761 return block;
1762}
1763
1764
1765/***********************************************************************
1766Rotate one page
1767@param[in,out] key_state Key state
1768@param[in,out] state Rotation state */
1769static
1770void
1771fil_crypt_rotate_page(
1772 const key_state_t* key_state,
1773 rotate_thread_t* state)
1774{
1775 fil_space_t*space = state->space;
1776 ulint space_id = space->id;
1777 ulint offset = state->offset;
1778 ulint sleeptime_ms = 0;
1779 fil_space_crypt_t *crypt_data = space->crypt_data;
1780
1781 ut_ad(space->referenced());
1782 ut_ad(offset > 0);
1783
1784 /* In fil_crypt_thread where key rotation is done we have
1785 acquired space and checked that this space is not yet
1786 marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
1787 Check here also to give DROP TABLE or similar a change. */
1788 if (space->is_stopping()) {
1789 return;
1790 }
1791
1792 if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
1793 /* don't encrypt this as it contains address to dblwr buffer */
1794 return;
1795 }
1796
1797 ut_d(const bool was_free = fseg_page_is_free(space, (uint32_t)offset));
1798
1799 mtr_t mtr;
1800 mtr.start();
1801 if (buf_block_t* block = fil_crypt_get_page_throttle(state,
1802 offset, &mtr,
1803 &sleeptime_ms)) {
1804 bool modified = false;
1805 int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
1806 lsn_t block_lsn = block->page.newest_modification;
1807 byte* frame = buf_block_get_frame(block);
1808 uint kv = mach_read_from_4(frame+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
1809
1810 if (space->is_stopping()) {
1811 /* The tablespace is closing (in DROP TABLE or
1812 TRUNCATE TABLE or similar): avoid further access */
1813 } else if (!*reinterpret_cast<uint32_t*>(FIL_PAGE_OFFSET
1814 + frame)) {
1815 /* It looks like this page was never
1816 allocated. Because key rotation is accessing
1817 pages in a pattern that is unlike the normal
1818 B-tree and undo log access pattern, we cannot
1819 invoke fseg_page_is_free() here, because that
1820 could result in a deadlock. If we invoked
1821 fseg_page_is_free() and released the
1822 tablespace latch before acquiring block->lock,
1823 then the fseg_page_is_free() information
1824 could be stale already. */
1825 ut_ad(was_free);
1826 ut_ad(kv == 0);
1827 ut_ad(page_get_space_id(frame) == 0);
1828 } else if (fil_crypt_needs_rotation(
1829 crypt_data,
1830 kv,
1831 key_state->key_version,
1832 key_state->rotate_key_age)) {
1833
1834 mtr.set_named_space(space);
1835 modified = true;
1836
1837 /* force rotation by dummy updating page */
1838 mlog_write_ulint(frame + FIL_PAGE_SPACE_ID,
1839 space_id, MLOG_4BYTES, &mtr);
1840
1841 /* statistics */
1842 state->crypt_stat.pages_modified++;
1843 } else {
1844 if (crypt_data->is_encrypted()) {
1845 if (kv < state->min_key_version_found) {
1846 state->min_key_version_found = kv;
1847 }
1848 }
1849
1850 needs_scrubbing = btr_page_needs_scrubbing(
1851 &state->scrub_data, block,
1852 BTR_SCRUB_PAGE_ALLOCATION_UNKNOWN);
1853 }
1854
1855 mtr.commit();
1856 lsn_t end_lsn = mtr.commit_lsn();
1857
1858 if (needs_scrubbing == BTR_SCRUB_PAGE) {
1859 mtr.start();
1860 /*
1861 * refetch page and allocation status
1862 */
1863 btr_scrub_page_allocation_status_t allocated;
1864
1865 block = btr_scrub_get_block_and_allocation_status(
1866 state, offset, &mtr,
1867 &allocated,
1868 &sleeptime_ms);
1869
1870 if (block) {
1871 mtr.set_named_space(space);
1872
1873 /* get required table/index and index-locks */
1874 needs_scrubbing = btr_scrub_recheck_page(
1875 &state->scrub_data, block, allocated, &mtr);
1876
1877 if (needs_scrubbing == BTR_SCRUB_PAGE) {
1878 /* we need to refetch it once more now that we have
1879 * index locked */
1880 block = btr_scrub_get_block_and_allocation_status(
1881 state, offset, &mtr,
1882 &allocated,
1883 &sleeptime_ms);
1884
1885 needs_scrubbing = btr_scrub_page(&state->scrub_data,
1886 block, allocated,
1887 &mtr);
1888 }
1889
1890 /* NOTE: mtr is committed inside btr_scrub_recheck_page()
1891 * and/or btr_scrub_page. This is to make sure that
1892 * locks & pages are latched in corrected order,
1893 * the mtr is in some circumstances restarted.
1894 * (mtr_commit() + mtr_start())
1895 */
1896 }
1897 }
1898
1899 if (needs_scrubbing != BTR_SCRUB_PAGE) {
1900 /* if page didn't need scrubbing it might be that cleanups
1901 are needed. do those outside of any mtr to prevent deadlocks.
1902
1903 the information what kinds of cleanups that are needed are
1904 encoded inside the needs_scrubbing, but this is opaque to
1905 this function (except the value BTR_SCRUB_PAGE) */
1906 btr_scrub_skip_page(&state->scrub_data, needs_scrubbing);
1907 }
1908
1909 if (needs_scrubbing == BTR_SCRUB_TURNED_OFF) {
1910 /* if we just detected that scrubbing was turned off
1911 * update global state to reflect this */
1912 ut_ad(crypt_data);
1913 mutex_enter(&crypt_data->mutex);
1914 crypt_data->rotate_state.scrubbing.is_active = false;
1915 mutex_exit(&crypt_data->mutex);
1916 }
1917
1918 if (modified) {
1919 /* if we modified page, we take lsn from mtr */
1920 ut_a(end_lsn > state->end_lsn);
1921 ut_a(end_lsn > block_lsn);
1922 state->end_lsn = end_lsn;
1923 } else {
1924 /* if we did not modify page, check for max lsn */
1925 if (block_lsn > state->end_lsn) {
1926 state->end_lsn = block_lsn;
1927 }
1928 }
1929 } else {
1930 /* If block read failed mtr memo and log should be empty. */
1931 ut_ad(!mtr.has_modifications());
1932 ut_ad(!mtr.is_dirty());
1933 ut_ad(mtr.get_memo()->size() == 0);
1934 ut_ad(mtr.get_log()->size() == 0);
1935 mtr.commit();
1936 }
1937
1938 if (sleeptime_ms) {
1939 os_event_reset(fil_crypt_throttle_sleep_event);
1940 os_event_wait_time(fil_crypt_throttle_sleep_event,
1941 1000 * sleeptime_ms);
1942 }
1943}
1944
1945/***********************************************************************
1946Rotate a batch of pages
1947@param[in,out] key_state Key state
1948@param[in,out] state Rotation state */
1949static
1950void
1951fil_crypt_rotate_pages(
1952 const key_state_t* key_state,
1953 rotate_thread_t* state)
1954{
1955 ulint space = state->space->id;
1956 ulint end = std::min(state->offset + state->batch,
1957 state->space->free_limit);
1958
1959 ut_ad(state->space->referenced());
1960
1961 for (; state->offset < end; state->offset++) {
1962
1963 /* we can't rotate pages in dblwr buffer as
1964 * it's not possible to read those due to lots of asserts
1965 * in buffer pool.
1966 *
1967 * However since these are only (short-lived) copies of
1968 * real pages, they will be updated anyway when the
1969 * real page is updated
1970 */
1971 if (space == TRX_SYS_SPACE &&
1972 buf_dblwr_page_inside(state->offset)) {
1973 continue;
1974 }
1975
1976 /* If space is marked as stopping, stop rotating
1977 pages. */
1978 if (state->space->is_stopping()) {
1979 break;
1980 }
1981
1982 fil_crypt_rotate_page(key_state, state);
1983 }
1984}
1985
1986/***********************************************************************
1987Flush rotated pages and then update page 0
1988
1989@param[in,out] state rotation state */
1990static
1991void
1992fil_crypt_flush_space(
1993 rotate_thread_t* state)
1994{
1995 fil_space_t* space = state->space;
1996 fil_space_crypt_t *crypt_data = space->crypt_data;
1997
1998 ut_ad(space->referenced());
1999
2000 /* flush tablespace pages so that there are no pages left with old key */
2001 lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2002
2003 if (end_lsn > 0 && !space->is_stopping()) {
2004 bool success = false;
2005 ulint n_pages = 0;
2006 ulint sum_pages = 0;
2007 uintmax_t start = ut_time_us(NULL);
2008
2009 do {
2010 success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
2011 buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
2012 sum_pages += n_pages;
2013 } while (!success && !space->is_stopping());
2014
2015 uintmax_t end = ut_time_us(NULL);
2016
2017 if (sum_pages && end > start) {
2018 state->cnt_waited += sum_pages;
2019 state->sum_waited_us += (end - start);
2020
2021 /* statistics */
2022 state->crypt_stat.pages_flushed += sum_pages;
2023 }
2024 }
2025
2026 if (crypt_data->min_key_version == 0) {
2027 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2028 }
2029
2030 if (space->is_stopping()) {
2031 return;
2032 }
2033
2034 /* update page 0 */
2035 mtr_t mtr;
2036 mtr.start();
2037
2038 dberr_t err;
2039
2040 if (buf_block_t* block = buf_page_get_gen(
2041 page_id_t(space->id, 0), page_size_t(space->flags),
2042 RW_X_LATCH, NULL, BUF_GET,
2043 __FILE__, __LINE__, &mtr, &err)) {
2044 mtr.set_named_space(space);
2045 crypt_data->write_page0(space, block->frame, &mtr);
2046 }
2047
2048 mtr.commit();
2049}
2050
2051/***********************************************************************
2052Complete rotating a space
2053@param[in,out] state Rotation state */
2054static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2055{
2056 fil_space_crypt_t *crypt_data = state->space->crypt_data;
2057
2058 ut_ad(crypt_data);
2059 ut_ad(state->space->referenced());
2060
2061 /* Space might already be dropped */
2062 if (!state->space->is_stopping()) {
2063 mutex_enter(&crypt_data->mutex);
2064
2065 /**
2066 * Update crypt data state with state from thread
2067 */
2068 if (state->min_key_version_found <
2069 crypt_data->rotate_state.min_key_version_found) {
2070 crypt_data->rotate_state.min_key_version_found =
2071 state->min_key_version_found;
2072 }
2073
2074 if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2075 crypt_data->rotate_state.end_lsn = state->end_lsn;
2076 }
2077
2078 ut_a(crypt_data->rotate_state.active_threads > 0);
2079 crypt_data->rotate_state.active_threads--;
2080 bool last = crypt_data->rotate_state.active_threads == 0;
2081
2082 /**
2083 * check if space is fully done
2084 * this as when threads shutdown, it could be that we "complete"
2085 * iterating before we have scanned the full space.
2086 */
2087 bool done = crypt_data->rotate_state.next_offset >=
2088 crypt_data->rotate_state.max_offset;
2089
2090 /**
2091 * we should flush space if we're last thread AND
2092 * the iteration is done
2093 */
2094 bool should_flush = last && done;
2095
2096 if (should_flush) {
2097 /* we're the last active thread */
2098 crypt_data->rotate_state.flushing = true;
2099 crypt_data->min_key_version =
2100 crypt_data->rotate_state.min_key_version_found;
2101 }
2102
2103 /* inform scrubbing */
2104 crypt_data->rotate_state.scrubbing.is_active = false;
2105 mutex_exit(&crypt_data->mutex);
2106
2107 /* all threads must call btr_scrub_complete_space wo/ mutex held */
2108 if (state->scrub_data.scrubbing) {
2109 btr_scrub_complete_space(&state->scrub_data);
2110 if (should_flush) {
2111 /* only last thread updates last_scrub_completed */
2112 ut_ad(crypt_data);
2113 mutex_enter(&crypt_data->mutex);
2114 crypt_data->rotate_state.scrubbing.
2115 last_scrub_completed = time(0);
2116 mutex_exit(&crypt_data->mutex);
2117 }
2118 }
2119
2120 if (should_flush) {
2121 fil_crypt_flush_space(state);
2122
2123 mutex_enter(&crypt_data->mutex);
2124 crypt_data->rotate_state.flushing = false;
2125 mutex_exit(&crypt_data->mutex);
2126 }
2127 } else {
2128 mutex_enter(&crypt_data->mutex);
2129 ut_a(crypt_data->rotate_state.active_threads > 0);
2130 crypt_data->rotate_state.active_threads--;
2131 mutex_exit(&crypt_data->mutex);
2132 }
2133}
2134
2135/*********************************************************************//**
2136A thread which monitors global key state and rotates tablespaces accordingly
2137@return a dummy parameter */
2138extern "C" UNIV_INTERN
2139os_thread_ret_t
2140DECLARE_THREAD(fil_crypt_thread)(
2141/*=============================*/
2142 void* arg __attribute__((unused))) /*!< in: a dummy parameter required
2143 * by os_thread_create */
2144{
2145 UT_NOT_USED(arg);
2146
2147 mutex_enter(&fil_crypt_threads_mutex);
2148 uint thread_no = srv_n_fil_crypt_threads_started;
2149 srv_n_fil_crypt_threads_started++;
2150 os_event_set(fil_crypt_event); /* signal that we started */
2151 mutex_exit(&fil_crypt_threads_mutex);
2152
2153 /* state of this thread */
2154 rotate_thread_t thr(thread_no);
2155
2156 /* if we find a space that is starting, skip over it and recheck it later */
2157 bool recheck = false;
2158
2159 while (!thr.should_shutdown()) {
2160
2161 key_state_t new_state;
2162
2163 time_t wait_start = time(0);
2164
2165 while (!thr.should_shutdown()) {
2166
2167 /* wait for key state changes
2168 * i.e either new key version of change or
2169 * new rotate_key_age */
2170 os_event_reset(fil_crypt_threads_event);
2171
2172 if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2173 break;
2174 }
2175
2176 if (recheck) {
2177 /* check recheck here, after sleep, so
2178 * that we don't busy loop while when one thread is starting
2179 * a space*/
2180 break;
2181 }
2182
2183 time_t waited = time(0) - wait_start;
2184
2185 /* Break if we have waited the background scrub
2186 internal and background scrubbing is enabled */
2187 if (waited >= 0
2188 && ulint(waited) >= srv_background_scrub_data_check_interval
2189 && (srv_background_scrub_data_uncompressed
2190 || srv_background_scrub_data_compressed)) {
2191 break;
2192 }
2193 }
2194
2195 recheck = false;
2196 thr.first = true; // restart from first tablespace
2197
2198 /* iterate all spaces searching for those needing rotation */
2199 while (!thr.should_shutdown() &&
2200 fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2201
2202 /* we found a space to rotate */
2203 fil_crypt_start_rotate_space(&new_state, &thr);
2204
2205 /* iterate all pages (cooperativly with other threads) */
2206 while (!thr.should_shutdown() &&
2207 fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2208
2209 if (!thr.space->is_stopping()) {
2210 /* rotate a (set) of pages */
2211 fil_crypt_rotate_pages(&new_state, &thr);
2212 }
2213
2214 /* If space is marked as stopping, release
2215 space and stop rotation. */
2216 if (thr.space->is_stopping()) {
2217 fil_crypt_complete_rotate_space(&thr);
2218 thr.space->release();
2219 thr.space = NULL;
2220 break;
2221 }
2222
2223 /* realloc iops */
2224 fil_crypt_realloc_iops(&thr);
2225 }
2226
2227 /* complete rotation */
2228 if (thr.space) {
2229 fil_crypt_complete_rotate_space(&thr);
2230 }
2231
2232 /* force key state refresh */
2233 new_state.key_id = 0;
2234
2235 /* return iops */
2236 fil_crypt_return_iops(&thr);
2237 }
2238 }
2239
2240 /* return iops if shutting down */
2241 fil_crypt_return_iops(&thr);
2242
2243 /* release current space if shutting down */
2244 if (thr.space) {
2245 thr.space->release();
2246 thr.space = NULL;
2247 }
2248
2249 mutex_enter(&fil_crypt_threads_mutex);
2250 srv_n_fil_crypt_threads_started--;
2251 os_event_set(fil_crypt_event); /* signal that we stopped */
2252 mutex_exit(&fil_crypt_threads_mutex);
2253
2254 /* We count the number of threads in os_thread_exit(). A created
2255 thread should always use that to exit and not use return() to exit. */
2256
2257 os_thread_exit();
2258
2259 OS_THREAD_DUMMY_RETURN;
2260}
2261
2262/*********************************************************************
2263Adjust thread count for key rotation
2264@param[in] enw_cnt Number of threads to be used */
2265UNIV_INTERN
2266void
2267fil_crypt_set_thread_cnt(
2268 const uint new_cnt)
2269{
2270 if (!fil_crypt_threads_inited) {
2271 fil_crypt_threads_init();
2272 }
2273
2274 mutex_enter(&fil_crypt_threads_mutex);
2275
2276 if (new_cnt > srv_n_fil_crypt_threads) {
2277 uint add = new_cnt - srv_n_fil_crypt_threads;
2278 srv_n_fil_crypt_threads = new_cnt;
2279 for (uint i = 0; i < add; i++) {
2280 os_thread_id_t rotation_thread_id;
2281 os_thread_create(fil_crypt_thread, NULL, &rotation_thread_id);
2282 ib::info() << "Creating #"
2283 << i+1 << " encryption thread id "
2284 << os_thread_pf(rotation_thread_id)
2285 << " total threads " << new_cnt << ".";
2286 }
2287 } else if (new_cnt < srv_n_fil_crypt_threads) {
2288 srv_n_fil_crypt_threads = new_cnt;
2289 os_event_set(fil_crypt_threads_event);
2290 }
2291
2292 mutex_exit(&fil_crypt_threads_mutex);
2293
2294 while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2295 os_event_reset(fil_crypt_event);
2296 os_event_wait_time(fil_crypt_event, 100000);
2297 }
2298
2299 /* Send a message to encryption threads that there could be
2300 something to do. */
2301 if (srv_n_fil_crypt_threads) {
2302 os_event_set(fil_crypt_threads_event);
2303 }
2304}
2305
2306/*********************************************************************
2307Adjust max key age
2308@param[in] val New max key age */
2309UNIV_INTERN
2310void
2311fil_crypt_set_rotate_key_age(
2312 uint val)
2313{
2314 srv_fil_crypt_rotate_key_age = val;
2315 os_event_set(fil_crypt_threads_event);
2316}
2317
2318/*********************************************************************
2319Adjust rotation iops
2320@param[in] val New max roation iops */
2321UNIV_INTERN
2322void
2323fil_crypt_set_rotation_iops(
2324 uint val)
2325{
2326 srv_n_fil_crypt_iops = val;
2327 os_event_set(fil_crypt_threads_event);
2328}
2329
2330/*********************************************************************
2331Adjust encrypt tables
2332@param[in] val New setting for innodb-encrypt-tables */
2333UNIV_INTERN
2334void
2335fil_crypt_set_encrypt_tables(
2336 uint val)
2337{
2338 srv_encrypt_tables = val;
2339 os_event_set(fil_crypt_threads_event);
2340}
2341
2342/*********************************************************************
2343Init threads for key rotation */
2344UNIV_INTERN
2345void
2346fil_crypt_threads_init()
2347{
2348 if (!fil_crypt_threads_inited) {
2349 fil_crypt_event = os_event_create(0);
2350 fil_crypt_threads_event = os_event_create(0);
2351 mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2352 &fil_crypt_threads_mutex);
2353
2354 uint cnt = srv_n_fil_crypt_threads;
2355 srv_n_fil_crypt_threads = 0;
2356 fil_crypt_threads_inited = true;
2357 fil_crypt_set_thread_cnt(cnt);
2358 }
2359}
2360
2361/*********************************************************************
2362Clean up key rotation threads resources */
2363UNIV_INTERN
2364void
2365fil_crypt_threads_cleanup()
2366{
2367 if (!fil_crypt_threads_inited) {
2368 return;
2369 }
2370 ut_a(!srv_n_fil_crypt_threads_started);
2371 os_event_destroy(fil_crypt_event);
2372 os_event_destroy(fil_crypt_threads_event);
2373 mutex_free(&fil_crypt_threads_mutex);
2374 fil_crypt_threads_inited = false;
2375}
2376
2377/*********************************************************************
2378Wait for crypt threads to stop accessing space
2379@param[in] space Tablespace */
2380UNIV_INTERN
2381void
2382fil_space_crypt_close_tablespace(
2383 const fil_space_t* space)
2384{
2385 fil_space_crypt_t* crypt_data = space->crypt_data;
2386
2387 if (!crypt_data) {
2388 return;
2389 }
2390
2391 mutex_enter(&fil_crypt_threads_mutex);
2392
2393 time_t start = time(0);
2394 time_t last = start;
2395
2396 mutex_enter(&crypt_data->mutex);
2397 mutex_exit(&fil_crypt_threads_mutex);
2398
2399 ulint cnt = crypt_data->rotate_state.active_threads;
2400 bool flushing = crypt_data->rotate_state.flushing;
2401
2402 while (cnt > 0 || flushing) {
2403 mutex_exit(&crypt_data->mutex);
2404 /* release dict mutex so that scrub threads can release their
2405 * table references */
2406 dict_mutex_exit_for_mysql();
2407
2408 /* wakeup throttle (all) sleepers */
2409 os_event_set(fil_crypt_throttle_sleep_event);
2410
2411 os_thread_sleep(20000);
2412 dict_mutex_enter_for_mysql();
2413 mutex_enter(&crypt_data->mutex);
2414 cnt = crypt_data->rotate_state.active_threads;
2415 flushing = crypt_data->rotate_state.flushing;
2416
2417 time_t now = time(0);
2418
2419 if (now >= last + 30) {
2420 ib::warn() << "Waited "
2421 << now - start
2422 << " seconds to drop space: "
2423 << space->name << " ("
2424 << space->id << ") active threads "
2425 << cnt << "flushing="
2426 << flushing << ".";
2427 last = now;
2428 }
2429 }
2430
2431 mutex_exit(&crypt_data->mutex);
2432}
2433
2434/*********************************************************************
2435Get crypt status for a space (used by information_schema)
2436@param[in] space Tablespace
2437@param[out] status Crypt status */
2438UNIV_INTERN
2439void
2440fil_space_crypt_get_status(
2441 const fil_space_t* space,
2442 struct fil_space_crypt_status_t* status)
2443{
2444 memset(status, 0, sizeof(*status));
2445
2446 ut_ad(space->referenced());
2447
2448 /* If there is no crypt data and we have not yet read
2449 page 0 for this tablespace, we need to read it before
2450 we can continue. */
2451 if (!space->crypt_data) {
2452 fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2453 }
2454
2455 status->space = ULINT_UNDEFINED;
2456
2457 if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2458 status->space = space->id;
2459 mutex_enter(&crypt_data->mutex);
2460 status->scheme = crypt_data->type;
2461 status->keyserver_requests = crypt_data->keyserver_requests;
2462 status->min_key_version = crypt_data->min_key_version;
2463 status->key_id = crypt_data->key_id;
2464
2465 if (crypt_data->rotate_state.active_threads > 0 ||
2466 crypt_data->rotate_state.flushing) {
2467 status->rotating = true;
2468 status->flushing =
2469 crypt_data->rotate_state.flushing;
2470 status->rotate_next_page_number =
2471 crypt_data->rotate_state.next_offset;
2472 status->rotate_max_page_number =
2473 crypt_data->rotate_state.max_offset;
2474 }
2475
2476 mutex_exit(&crypt_data->mutex);
2477
2478 if (srv_encrypt_tables || crypt_data->min_key_version) {
2479 status->current_key_version =
2480 fil_crypt_get_latest_key_version(crypt_data);
2481 }
2482 }
2483}
2484
2485/*********************************************************************
2486Return crypt statistics
2487@param[out] stat Crypt statistics */
2488UNIV_INTERN
2489void
2490fil_crypt_total_stat(
2491 fil_crypt_stat_t *stat)
2492{
2493 mutex_enter(&crypt_stat_mutex);
2494 *stat = crypt_stat;
2495 mutex_exit(&crypt_stat_mutex);
2496}
2497
2498/*********************************************************************
2499Get scrub status for a space (used by information_schema)
2500
2501@param[in] space Tablespace
2502@param[out] status Scrub status */
2503UNIV_INTERN
2504void
2505fil_space_get_scrub_status(
2506 const fil_space_t* space,
2507 struct fil_space_scrub_status_t* status)
2508{
2509 memset(status, 0, sizeof(*status));
2510
2511 ut_ad(space->referenced());
2512 fil_space_crypt_t* crypt_data = space->crypt_data;
2513
2514 status->space = space->id;
2515
2516 if (crypt_data != NULL) {
2517 status->compressed = FSP_FLAGS_GET_ZIP_SSIZE(space->flags) > 0;
2518 mutex_enter(&crypt_data->mutex);
2519 status->last_scrub_completed =
2520 crypt_data->rotate_state.scrubbing.last_scrub_completed;
2521 if (crypt_data->rotate_state.active_threads > 0 &&
2522 crypt_data->rotate_state.scrubbing.is_active) {
2523 status->scrubbing = true;
2524 status->current_scrub_started =
2525 crypt_data->rotate_state.start_time;
2526 status->current_scrub_active_threads =
2527 crypt_data->rotate_state.active_threads;
2528 status->current_scrub_page_number =
2529 crypt_data->rotate_state.next_offset;
2530 status->current_scrub_max_page_number =
2531 crypt_data->rotate_state.max_offset;
2532 }
2533
2534 mutex_exit(&crypt_data->mutex);
2535 }
2536}
2537#endif /* UNIV_INNOCHECKSUM */
2538
2539/**
2540Verify that post encryption checksum match calculated checksum.
2541This function should be called only if tablespace contains crypt_data
2542metadata (this is strong indication that tablespace is encrypted).
2543Function also verifies that traditional checksum does not match
2544calculated checksum as if it does page could be valid unencrypted,
2545encrypted, or corrupted.
2546
2547@param[in,out] page page frame (checksum is temporarily modified)
2548@param[in] page_size page size
2549@param[in] space tablespace identifier
2550@param[in] offset page number
2551@return true if page is encrypted AND OK, false otherwise */
2552UNIV_INTERN
2553bool
2554fil_space_verify_crypt_checksum(
2555 byte* page,
2556 const page_size_t& page_size,
2557 ulint space,
2558 ulint offset)
2559{
2560 uint key_version = mach_read_from_4(page+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
2561
2562 /* If page is not encrypted, return false */
2563 if (key_version == 0) {
2564 return false;
2565 }
2566
2567 /* Read stored post encryption checksum. */
2568 uint32_t checksum = mach_read_from_4(
2569 page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2570
2571 /* Declare empty pages non-corrupted */
2572 if (checksum == 0
2573 && *reinterpret_cast<const ib_uint64_t*>(page + FIL_PAGE_LSN) == 0
2574 && buf_page_is_zeroes(page, page_size)) {
2575 return(true);
2576 }
2577
2578 /* Compressed and encrypted pages do not have checksum. Assume not
2579 corrupted. Page verification happens after decompression in
2580 buf_page_io_complete() using buf_page_is_corrupted(). */
2581 if (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2582 return (true);
2583 }
2584
2585 uint32 cchecksum1, cchecksum2;
2586
2587 /* Calculate checksums */
2588 if (page_size.is_compressed()) {
2589 cchecksum1 = page_zip_calc_checksum(
2590 page, page_size.physical(),
2591 SRV_CHECKSUM_ALGORITHM_CRC32);
2592
2593 cchecksum2 = (cchecksum1 == checksum)
2594 ? 0
2595 : page_zip_calc_checksum(
2596 page, page_size.physical(),
2597 SRV_CHECKSUM_ALGORITHM_INNODB);
2598 } else {
2599 cchecksum1 = buf_calc_page_crc32(page);
2600 cchecksum2 = (cchecksum1 == checksum)
2601 ? 0
2602 : buf_calc_page_new_checksum(page);
2603 }
2604
2605 /* If stored checksum matches one of the calculated checksums
2606 page is not corrupted. */
2607
2608 bool encrypted = (checksum == cchecksum1 || checksum == cchecksum2
2609 || checksum == BUF_NO_CHECKSUM_MAGIC);
2610
2611 /* MySQL 5.6 and MariaDB 10.0 and 10.1 will write an LSN to the
2612 first page of each system tablespace file at
2613 FIL_PAGE_FILE_FLUSH_LSN offset. On other pages and in other files,
2614 the field might have been uninitialized until MySQL 5.5. In MySQL 5.7
2615 (and MariaDB Server 10.2.2) WL#7990 stopped writing the field for other
2616 than page 0 of the system tablespace.
2617
2618 Starting from MariaDB 10.1 the field has been repurposed for
2619 encryption key_version.
2620
2621 Starting with MySQL 5.7 (and MariaDB Server 10.2), the
2622 field has been repurposed for SPATIAL INDEX pages for
2623 FIL_RTREE_SPLIT_SEQ_NUM.
2624
2625 Note that FIL_PAGE_FILE_FLUSH_LSN is not included in the InnoDB page
2626 checksum.
2627
2628 Thus, FIL_PAGE_FILE_FLUSH_LSN could contain any value. While the
2629 field would usually be 0 for pages that are not encrypted, we cannot
2630 assume that a nonzero value means that the page is encrypted.
2631 Therefore we must validate the page both as encrypted and unencrypted
2632 when FIL_PAGE_FILE_FLUSH_LSN does not contain 0.
2633 */
2634
2635 uint32_t checksum1 = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
2636 uint32_t checksum2;
2637
2638 bool valid;
2639
2640 if (page_size.is_compressed()) {
2641 valid = checksum1 == cchecksum1;
2642 checksum2 = checksum1;
2643 } else {
2644 checksum2 = mach_read_from_4(
2645 page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM);
2646 valid = buf_page_is_checksum_valid_crc32(
2647 page, checksum1, checksum2, false
2648 /* FIXME: also try the original crc32 that was
2649 buggy on big-endian architectures? */)
2650 || buf_page_is_checksum_valid_innodb(
2651 page, checksum1, checksum2);
2652 }
2653
2654 if (encrypted && valid) {
2655 /* If page is encrypted and traditional checksums match,
2656 page could be still encrypted, or not encrypted and valid or
2657 corrupted. */
2658#ifdef UNIV_INNOCHECKSUM
2659 fprintf(log_file ? log_file : stderr,
2660 "Page " ULINTPF ":" ULINTPF " may be corrupted."
2661 " Post encryption checksum %u"
2662 " stored [%u:%u] key_version %u\n",
2663 space, offset, checksum, checksum1, checksum2,
2664 key_version);
2665#else /* UNIV_INNOCHECKSUM */
2666 ib::error()
2667 << " Page " << space << ":" << offset
2668 << " may be corrupted."
2669 " Post encryption checksum " << checksum
2670 << " stored [" << checksum1 << ":" << checksum2
2671 << "] key_version " << key_version;
2672#endif
2673 encrypted = false;
2674 }
2675
2676 return(encrypted);
2677}
2678