1/*****************************************************************************
2
3Copyright (c) 2011, 2018, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2017, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file row/row0log.cc
22Modification log for online index creation and online table rebuild
23
24Created 2011-05-26 Marko Makela
25*******************************************************/
26
27#include "row0log.h"
28#include "row0row.h"
29#include "row0ins.h"
30#include "row0upd.h"
31#include "row0merge.h"
32#include "row0ext.h"
33#include "log0crypt.h"
34#include "data0data.h"
35#include "que0que.h"
36#include "srv0mon.h"
37#include "handler0alter.h"
38#include "ut0new.h"
39#include "ut0stage.h"
40#include "trx0rec.h"
41
42#include <algorithm>
43#include <map>
44
45ulint onlineddl_rowlog_rows;
46ulint onlineddl_rowlog_pct_used;
47ulint onlineddl_pct_progress;
48
49/** Table row modification operations during online table rebuild.
50Delete-marked records are not copied to the rebuilt table. */
51enum row_tab_op {
52 /** Insert a record */
53 ROW_T_INSERT = 0x41,
54 /** Update a record in place */
55 ROW_T_UPDATE,
56 /** Delete (purge) a record */
57 ROW_T_DELETE
58};
59
60/** Index record modification operations during online index creation */
61enum row_op {
62 /** Insert a record */
63 ROW_OP_INSERT = 0x61,
64 /** Delete a record */
65 ROW_OP_DELETE
66};
67
68/** Size of the modification log entry header, in bytes */
69#define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
70
71/** Log block for modifications during online ALTER TABLE */
72struct row_log_buf_t {
73 byte* block; /*!< file block buffer */
74 size_t size; /*!< length of block in bytes */
75 ut_new_pfx_t block_pfx; /*!< opaque descriptor of "block". Set
76 by ut_allocator::allocate_large() and fed to
77 ut_allocator::deallocate_large(). */
78 mrec_buf_t buf; /*!< buffer for accessing a record
79 that spans two blocks */
80 ulint blocks; /*!< current position in blocks */
81 ulint bytes; /*!< current position within block */
82 ulonglong total; /*!< logical position, in bytes from
83 the start of the row_log_table log;
84 0 for row_log_online_op() and
85 row_log_apply(). */
86};
87
88/** Tracks BLOB allocation during online ALTER TABLE */
89class row_log_table_blob_t {
90public:
91 /** Constructor (declaring a BLOB freed)
92 @param offset_arg row_log_t::tail::total */
93#ifdef UNIV_DEBUG
94 row_log_table_blob_t(ulonglong offset_arg) :
95 old_offset (0), free_offset (offset_arg),
96 offset (BLOB_FREED) {}
97#else /* UNIV_DEBUG */
98 row_log_table_blob_t() :
99 offset (BLOB_FREED) {}
100#endif /* UNIV_DEBUG */
101
102 /** Declare a BLOB freed again.
103 @param offset_arg row_log_t::tail::total */
104#ifdef UNIV_DEBUG
105 void blob_free(ulonglong offset_arg)
106#else /* UNIV_DEBUG */
107 void blob_free()
108#endif /* UNIV_DEBUG */
109 {
110 ut_ad(offset < offset_arg);
111 ut_ad(offset != BLOB_FREED);
112 ut_d(old_offset = offset);
113 ut_d(free_offset = offset_arg);
114 offset = BLOB_FREED;
115 }
116 /** Declare a freed BLOB reused.
117 @param offset_arg row_log_t::tail::total */
118 void blob_alloc(ulonglong offset_arg) {
119 ut_ad(free_offset <= offset_arg);
120 ut_d(old_offset = offset);
121 offset = offset_arg;
122 }
123 /** Determine if a BLOB was freed at a given log position
124 @param offset_arg row_log_t::head::total after the log record
125 @return true if freed */
126 bool is_freed(ulonglong offset_arg) const {
127 /* This is supposed to be the offset at the end of the
128 current log record. */
129 ut_ad(offset_arg > 0);
130 /* We should never get anywhere close the magic value. */
131 ut_ad(offset_arg < BLOB_FREED);
132 return(offset_arg < offset);
133 }
134private:
135 /** Magic value for a freed BLOB */
136 static const ulonglong BLOB_FREED = ~0ULL;
137#ifdef UNIV_DEBUG
138 /** Old offset, in case a page was freed, reused, freed, ... */
139 ulonglong old_offset;
140 /** Offset of last blob_free() */
141 ulonglong free_offset;
142#endif /* UNIV_DEBUG */
143 /** Byte offset to the log file */
144 ulonglong offset;
145};
146
147/** @brief Map of off-page column page numbers to 0 or log byte offsets.
148
149If there is no mapping for a page number, it is safe to access.
150If a page number maps to 0, it is an off-page column that has been freed.
151If a page number maps to a nonzero number, the number is a byte offset
152into the index->online_log, indicating that the page is safe to access
153when applying log records starting from that offset. */
154typedef std::map<
155 ulint,
156 row_log_table_blob_t,
157 std::less<ulint>,
158 ut_allocator<std::pair<const ulint, row_log_table_blob_t> > >
159 page_no_map;
160
161/** @brief Buffer for logging modifications during online index creation
162
163All modifications to an index that is being created will be logged by
164row_log_online_op() to this buffer.
165
166All modifications to a table that is being rebuilt will be logged by
167row_log_table_delete(), row_log_table_update(), row_log_table_insert()
168to this buffer.
169
170When head.blocks == tail.blocks, the reader will access tail.block
171directly. When also head.bytes == tail.bytes, both counts will be
172reset to 0 and the file will be truncated. */
173struct row_log_t {
174 pfs_os_file_t fd; /*!< file descriptor */
175 ib_mutex_t mutex; /*!< mutex protecting error,
176 max_trx and tail */
177 page_no_map* blobs; /*!< map of page numbers of off-page columns
178 that have been freed during table-rebuilding
179 ALTER TABLE (row_log_table_*); protected by
180 index->lock X-latch only */
181 dict_table_t* table; /*!< table that is being rebuilt,
182 or NULL when this is a secondary
183 index that is being created online */
184 bool same_pk;/*!< whether the definition of the PRIMARY KEY
185 has remained the same */
186 const dtuple_t* defaults;
187 /*!< default values of added, changed columns,
188 or NULL */
189 const ulint* col_map;/*!< mapping of old column numbers to
190 new ones, or NULL if !table */
191 dberr_t error; /*!< error that occurred during online
192 table rebuild */
193 /** The transaction ID of the ALTER TABLE transaction. Any
194 concurrent DML would necessarily be logged with a larger
195 transaction ID, because ha_innobase::prepare_inplace_alter_table()
196 acts as a barrier that ensures that any concurrent transaction
197 that operates on the table would have been started after
198 ha_innobase::prepare_inplace_alter_table() returns and before
199 ha_innobase::commit_inplace_alter_table(commit=true) is invoked.
200
201 Due to the nondeterministic nature of purge and due to the
202 possibility of upgrading from an earlier version of MariaDB
203 or MySQL, it is possible that row_log_table_low() would be
204 fed DB_TRX_ID that precedes than min_trx. We must normalize
205 such references to reset_trx_id[]. */
206 trx_id_t min_trx;
207 trx_id_t max_trx;/*!< biggest observed trx_id in
208 row_log_online_op();
209 protected by mutex and index->lock S-latch,
210 or by index->lock X-latch only */
211 row_log_buf_t tail; /*!< writer context;
212 protected by mutex and index->lock S-latch,
213 or by index->lock X-latch only */
214 byte* crypt_tail; /*!< writer context;
215 temporary buffer used in encryption,
216 decryption or NULL*/
217 row_log_buf_t head; /*!< reader context; protected by MDL only;
218 modifiable by row_log_apply_ops() */
219 byte* crypt_head; /*!< reader context;
220 temporary buffer used in encryption,
221 decryption or NULL */
222 const char* path; /*!< where to create temporary file during
223 log operation */
224 /** the number of core fields in the clustered index of the
225 source table; before row_log_table_apply() completes, the
226 table could be emptied, so that table->is_instant() no longer holds,
227 but all log records must be in the "instant" format. */
228 unsigned n_core_fields;
229 bool ignore; /*!< Whether the alter ignore is being used;
230 if not, NULL values will not be converted to
231 defaults */
232
233 /** Determine whether the log should be in the 'instant ADD' format
234 @param[in] index the clustered index of the source table
235 @return whether to use the 'instant ADD COLUMN' format */
236 bool is_instant(const dict_index_t* index) const
237 {
238 ut_ad(table);
239 ut_ad(n_core_fields <= index->n_fields);
240 return n_core_fields != index->n_fields;
241 }
242};
243
244/** Create the file or online log if it does not exist.
245@param[in,out] log online rebuild log
246@return true if success, false if not */
247static MY_ATTRIBUTE((warn_unused_result))
248pfs_os_file_t
249row_log_tmpfile(
250 row_log_t* log)
251{
252 DBUG_ENTER("row_log_tmpfile");
253 if (log->fd == OS_FILE_CLOSED) {
254 log->fd = row_merge_file_create_low(log->path);
255 DBUG_EXECUTE_IF("row_log_tmpfile_fail",
256 if (log->fd != OS_FILE_CLOSED)
257 row_merge_file_destroy_low(log->fd);
258 log->fd = OS_FILE_CLOSED;);
259 if (log->fd != OS_FILE_CLOSED) {
260 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES);
261 }
262 }
263
264 DBUG_RETURN(log->fd);
265}
266
267/** Allocate the memory for the log buffer.
268@param[in,out] log_buf Buffer used for log operation
269@return TRUE if success, false if not */
270static MY_ATTRIBUTE((warn_unused_result))
271bool
272row_log_block_allocate(
273 row_log_buf_t& log_buf)
274{
275 DBUG_ENTER("row_log_block_allocate");
276 if (log_buf.block == NULL) {
277 DBUG_EXECUTE_IF(
278 "simulate_row_log_allocation_failure",
279 DBUG_RETURN(false);
280 );
281
282 log_buf.block = ut_allocator<byte>(mem_key_row_log_buf)
283 .allocate_large(srv_sort_buf_size, &log_buf.block_pfx);
284
285 if (log_buf.block == NULL) {
286 DBUG_RETURN(false);
287 }
288 log_buf.size = srv_sort_buf_size;
289 }
290 DBUG_RETURN(true);
291}
292
293/** Free the log buffer.
294@param[in,out] log_buf Buffer used for log operation */
295static
296void
297row_log_block_free(
298 row_log_buf_t& log_buf)
299{
300 DBUG_ENTER("row_log_block_free");
301 if (log_buf.block != NULL) {
302 ut_allocator<byte>(mem_key_row_log_buf).deallocate_large(
303 log_buf.block, &log_buf.block_pfx, log_buf.size);
304 log_buf.block = NULL;
305 }
306 DBUG_VOID_RETURN;
307}
308
309/******************************************************//**
310Logs an operation to a secondary index that is (or was) being created. */
311void
312row_log_online_op(
313/*==============*/
314 dict_index_t* index, /*!< in/out: index, S or X latched */
315 const dtuple_t* tuple, /*!< in: index tuple */
316 trx_id_t trx_id) /*!< in: transaction ID for insert,
317 or 0 for delete */
318{
319 byte* b;
320 ulint extra_size;
321 ulint size;
322 ulint mrec_size;
323 ulint avail_size;
324 row_log_t* log;
325
326 ut_ad(dtuple_validate(tuple));
327 ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
328 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_S)
329 || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
330
331 if (index->is_corrupted()) {
332 return;
333 }
334
335 ut_ad(dict_index_is_online_ddl(index));
336
337 /* Compute the size of the record. This differs from
338 row_merge_buf_encode(), because here we do not encode
339 extra_size+1 (and reserve 0 as the end-of-chunk marker). */
340
341 size = rec_get_converted_size_temp(
342 index, tuple->fields, tuple->n_fields, &extra_size);
343 ut_ad(size >= extra_size);
344 ut_ad(size <= sizeof log->tail.buf);
345
346 mrec_size = ROW_LOG_HEADER_SIZE
347 + (extra_size >= 0x80) + size
348 + (trx_id ? DATA_TRX_ID_LEN : 0);
349
350 log = index->online_log;
351 mutex_enter(&log->mutex);
352
353 if (trx_id > log->max_trx) {
354 log->max_trx = trx_id;
355 }
356
357 if (!row_log_block_allocate(log->tail)) {
358 log->error = DB_OUT_OF_MEMORY;
359 goto err_exit;
360 }
361
362 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
363
364 ut_ad(log->tail.bytes < srv_sort_buf_size);
365 avail_size = srv_sort_buf_size - log->tail.bytes;
366
367 if (mrec_size > avail_size) {
368 b = log->tail.buf;
369 } else {
370 b = log->tail.block + log->tail.bytes;
371 }
372
373 if (trx_id != 0) {
374 *b++ = ROW_OP_INSERT;
375 trx_write_trx_id(b, trx_id);
376 b += DATA_TRX_ID_LEN;
377 } else {
378 *b++ = ROW_OP_DELETE;
379 }
380
381 if (extra_size < 0x80) {
382 *b++ = (byte) extra_size;
383 } else {
384 ut_ad(extra_size < 0x8000);
385 *b++ = (byte) (0x80 | (extra_size >> 8));
386 *b++ = (byte) extra_size;
387 }
388
389 rec_convert_dtuple_to_temp(
390 b + extra_size, index, tuple->fields, tuple->n_fields);
391 b += size;
392
393 if (mrec_size >= avail_size) {
394 const os_offset_t byte_offset
395 = (os_offset_t) log->tail.blocks
396 * srv_sort_buf_size;
397 IORequest request(IORequest::WRITE);
398 byte* buf = log->tail.block;
399
400 if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
401 goto write_failed;
402 }
403
404 if (mrec_size == avail_size) {
405 ut_ad(b == &buf[srv_sort_buf_size]);
406 } else {
407 ut_ad(b == log->tail.buf + mrec_size);
408 memcpy(buf + log->tail.bytes,
409 log->tail.buf, avail_size);
410 }
411
412 UNIV_MEM_ASSERT_RW(buf, srv_sort_buf_size);
413
414 if (row_log_tmpfile(log) == OS_FILE_CLOSED) {
415 log->error = DB_OUT_OF_MEMORY;
416 goto err_exit;
417 }
418
419 /* If encryption is enabled encrypt buffer before writing it
420 to file system. */
421 if (log_tmp_is_encrypted()) {
422 if (!log_tmp_block_encrypt(
423 buf, srv_sort_buf_size,
424 log->crypt_tail, byte_offset,
425 index->table->space->id)) {
426 log->error = DB_DECRYPTION_FAILED;
427 goto write_failed;
428 }
429
430 srv_stats.n_rowlog_blocks_encrypted.inc();
431 buf = log->crypt_tail;
432 }
433
434 log->tail.blocks++;
435 if (!os_file_write(
436 request,
437 "(modification log)",
438 log->fd,
439 buf, byte_offset, srv_sort_buf_size)) {
440write_failed:
441 /* We set the flag directly instead of invoking
442 dict_set_corrupted_index_cache_only(index) here,
443 because the index is not "public" yet. */
444 index->type |= DICT_CORRUPT;
445 }
446
447 UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
448 UNIV_MEM_INVALID(buf, srv_sort_buf_size);
449
450 memcpy(log->tail.block, log->tail.buf + avail_size,
451 mrec_size - avail_size);
452 log->tail.bytes = mrec_size - avail_size;
453 } else {
454 log->tail.bytes += mrec_size;
455 ut_ad(b == log->tail.block + log->tail.bytes);
456 }
457
458 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
459err_exit:
460 mutex_exit(&log->mutex);
461}
462
463/******************************************************//**
464Gets the error status of the online index rebuild log.
465@return DB_SUCCESS or error code */
466dberr_t
467row_log_table_get_error(
468/*====================*/
469 const dict_index_t* index) /*!< in: clustered index of a table
470 that is being rebuilt online */
471{
472 ut_ad(dict_index_is_clust(index));
473 ut_ad(dict_index_is_online_ddl(index));
474 return(index->online_log->error);
475}
476
477/******************************************************//**
478Starts logging an operation to a table that is being rebuilt.
479@return pointer to log, or NULL if no logging is necessary */
480static MY_ATTRIBUTE((nonnull, warn_unused_result))
481byte*
482row_log_table_open(
483/*===============*/
484 row_log_t* log, /*!< in/out: online rebuild log */
485 ulint size, /*!< in: size of log record */
486 ulint* avail) /*!< out: available size for log record */
487{
488 mutex_enter(&log->mutex);
489
490 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
491
492 if (log->error != DB_SUCCESS) {
493err_exit:
494 mutex_exit(&log->mutex);
495 return(NULL);
496 }
497
498 if (!row_log_block_allocate(log->tail)) {
499 log->error = DB_OUT_OF_MEMORY;
500 goto err_exit;
501 }
502
503 ut_ad(log->tail.bytes < srv_sort_buf_size);
504 *avail = srv_sort_buf_size - log->tail.bytes;
505
506 if (size > *avail) {
507 /* Make sure log->tail.buf is large enough */
508 ut_ad(size <= sizeof log->tail.buf);
509 return(log->tail.buf);
510 } else {
511 return(log->tail.block + log->tail.bytes);
512 }
513}
514
515/******************************************************//**
516Stops logging an operation to a table that is being rebuilt. */
517static MY_ATTRIBUTE((nonnull))
518void
519row_log_table_close_func(
520/*=====================*/
521 dict_index_t* index, /*!< in/out: online rebuilt index */
522#ifdef UNIV_DEBUG
523 const byte* b, /*!< in: end of log record */
524#endif /* UNIV_DEBUG */
525 ulint size, /*!< in: size of log record */
526 ulint avail) /*!< in: available size for log record */
527{
528 row_log_t* log = index->online_log;
529
530 ut_ad(mutex_own(&log->mutex));
531
532 if (size >= avail) {
533 const os_offset_t byte_offset
534 = (os_offset_t) log->tail.blocks
535 * srv_sort_buf_size;
536 IORequest request(IORequest::WRITE);
537 byte* buf = log->tail.block;
538
539 if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
540 goto write_failed;
541 }
542
543 if (size == avail) {
544 ut_ad(b == &buf[srv_sort_buf_size]);
545 } else {
546 ut_ad(b == log->tail.buf + size);
547 memcpy(buf + log->tail.bytes, log->tail.buf, avail);
548 }
549
550 UNIV_MEM_ASSERT_RW(buf, srv_sort_buf_size);
551
552 if (row_log_tmpfile(log) == OS_FILE_CLOSED) {
553 log->error = DB_OUT_OF_MEMORY;
554 goto err_exit;
555 }
556
557 /* If encryption is enabled encrypt buffer before writing it
558 to file system. */
559 if (log_tmp_is_encrypted()) {
560 if (!log_tmp_block_encrypt(
561 log->tail.block, srv_sort_buf_size,
562 log->crypt_tail, byte_offset,
563 index->table->space->id)) {
564 log->error = DB_DECRYPTION_FAILED;
565 goto err_exit;
566 }
567
568 srv_stats.n_rowlog_blocks_encrypted.inc();
569 buf = log->crypt_tail;
570 }
571
572 log->tail.blocks++;
573 if (!os_file_write(
574 request,
575 "(modification log)",
576 log->fd,
577 buf, byte_offset, srv_sort_buf_size)) {
578write_failed:
579 log->error = DB_ONLINE_LOG_TOO_BIG;
580 }
581 UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
582 UNIV_MEM_INVALID(buf, srv_sort_buf_size);
583 memcpy(log->tail.block, log->tail.buf + avail, size - avail);
584 log->tail.bytes = size - avail;
585 } else {
586 log->tail.bytes += size;
587 ut_ad(b == log->tail.block + log->tail.bytes);
588 }
589
590 log->tail.total += size;
591 UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
592err_exit:
593 mutex_exit(&log->mutex);
594
595 my_atomic_addlint(&onlineddl_rowlog_rows, 1);
596 /* 10000 means 100.00%, 4525 means 45.25% */
597 onlineddl_rowlog_pct_used = static_cast<ulint>((log->tail.total * 10000) / srv_online_max_size);
598}
599
600#ifdef UNIV_DEBUG
601# define row_log_table_close(index, b, size, avail) \
602 row_log_table_close_func(index, b, size, avail)
603#else /* UNIV_DEBUG */
604# define row_log_table_close(log, b, size, avail) \
605 row_log_table_close_func(index, size, avail)
606#endif /* UNIV_DEBUG */
607
608/** Check whether a virtual column is indexed in the new table being
609created during alter table
610@param[in] index cluster index
611@param[in] v_no virtual column number
612@return true if it is indexed, else false */
613bool
614row_log_col_is_indexed(
615 const dict_index_t* index,
616 ulint v_no)
617{
618 return(dict_table_get_nth_v_col(
619 index->online_log->table, v_no)->m_col.ord_part);
620}
621
622/******************************************************//**
623Logs a delete operation to a table that is being rebuilt.
624This will be merged in row_log_table_apply_delete(). */
625void
626row_log_table_delete(
627/*=================*/
628 const rec_t* rec, /*!< in: clustered index leaf page record,
629 page X-latched */
630 dict_index_t* index, /*!< in/out: clustered index, S-latched
631 or X-latched */
632 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
633 const byte* sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should
634 be logged, or NULL to use those in rec */
635{
636 ulint old_pk_extra_size;
637 ulint old_pk_size;
638 ulint mrec_size;
639 ulint avail_size;
640 mem_heap_t* heap = NULL;
641 const dtuple_t* old_pk;
642
643 ut_ad(dict_index_is_clust(index));
644 ut_ad(rec_offs_validate(rec, index, offsets));
645 ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
646 ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
647 ut_ad(rw_lock_own_flagged(
648 &index->lock,
649 RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
650
651 if (index->online_status != ONLINE_INDEX_CREATION
652 || (index->type & DICT_CORRUPT) || index->table->corrupted
653 || index->online_log->error != DB_SUCCESS) {
654 return;
655 }
656
657 dict_table_t* new_table = index->online_log->table;
658 dict_index_t* new_index = dict_table_get_first_index(new_table);
659
660 ut_ad(dict_index_is_clust(new_index));
661 ut_ad(!dict_index_is_online_ddl(new_index));
662 ut_ad(index->online_log->min_trx);
663
664 /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */
665 if (index->online_log->same_pk) {
666 dtuple_t* tuple;
667 ut_ad(new_index->n_uniq == index->n_uniq);
668
669 /* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first
670 fields of the record. */
671 heap = mem_heap_create(
672 DATA_TRX_ID_LEN
673 + DTUPLE_EST_ALLOC(unsigned(new_index->n_uniq) + 2));
674 old_pk = tuple = dtuple_create(
675 heap, unsigned(new_index->n_uniq) + 2);
676 dict_index_copy_types(tuple, new_index, tuple->n_fields);
677 dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
678
679 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
680 ulint len;
681 const void* field = rec_get_nth_field(
682 rec, offsets, i, &len);
683 dfield_t* dfield = dtuple_get_nth_field(
684 tuple, i);
685 ut_ad(len != UNIV_SQL_NULL);
686 ut_ad(!rec_offs_nth_extern(offsets, i));
687 dfield_set_data(dfield, field, len);
688 }
689
690 dfield_t* db_trx_id = dtuple_get_nth_field(
691 tuple, new_index->n_uniq);
692
693 const bool replace_sys_fields
694 = sys
695 || trx_read_trx_id(static_cast<byte*>(db_trx_id->data))
696 < index->online_log->min_trx;
697
698 if (replace_sys_fields) {
699 if (!sys || trx_read_trx_id(sys)
700 < index->online_log->min_trx) {
701 sys = reset_trx_id;
702 }
703
704 dfield_set_data(db_trx_id, sys, DATA_TRX_ID_LEN);
705 dfield_set_data(db_trx_id + 1, sys + DATA_TRX_ID_LEN,
706 DATA_ROLL_PTR_LEN);
707 }
708
709 ut_d(trx_id_check(db_trx_id->data,
710 index->online_log->min_trx));
711 } else {
712 /* The PRIMARY KEY has changed. Translate the tuple. */
713 old_pk = row_log_table_get_pk(
714 rec, index, offsets, NULL, &heap);
715
716 if (!old_pk) {
717 ut_ad(index->online_log->error != DB_SUCCESS);
718 if (heap) {
719 goto func_exit;
720 }
721 return;
722 }
723 }
724
725 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
726 old_pk, old_pk->n_fields - 2)->len);
727 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
728 old_pk, old_pk->n_fields - 1)->len);
729 old_pk_size = rec_get_converted_size_temp(
730 new_index, old_pk->fields, old_pk->n_fields,
731 &old_pk_extra_size);
732 ut_ad(old_pk_extra_size < 0x100);
733
734 /* 2 = 1 (extra_size) + at least 1 byte payload */
735 mrec_size = 2 + old_pk_size;
736
737 if (byte* b = row_log_table_open(index->online_log,
738 mrec_size, &avail_size)) {
739 *b++ = ROW_T_DELETE;
740 *b++ = static_cast<byte>(old_pk_extra_size);
741
742 rec_convert_dtuple_to_temp(
743 b + old_pk_extra_size, new_index,
744 old_pk->fields, old_pk->n_fields);
745
746 b += old_pk_size;
747
748 row_log_table_close(index, b, mrec_size, avail_size);
749 }
750
751func_exit:
752 mem_heap_free(heap);
753}
754
755/******************************************************//**
756Logs an insert or update to a table that is being rebuilt. */
757static
758void
759row_log_table_low_redundant(
760/*========================*/
761 const rec_t* rec, /*!< in: clustered index leaf
762 page record in ROW_FORMAT=REDUNDANT,
763 page X-latched */
764 dict_index_t* index, /*!< in/out: clustered index, S-latched
765 or X-latched */
766 bool insert, /*!< in: true if insert,
767 false if update */
768 const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value
769 (if !insert and a PRIMARY KEY
770 is being created) */
771 const dict_index_t* new_index)
772 /*!< in: clustered index of the
773 new table, not latched */
774{
775 ulint old_pk_size;
776 ulint old_pk_extra_size;
777 ulint size;
778 ulint extra_size;
779 ulint mrec_size;
780 ulint avail_size;
781 mem_heap_t* heap = NULL;
782 dtuple_t* tuple;
783 const ulint n_fields = rec_get_n_fields_old(rec);
784
785 ut_ad(!page_is_comp(page_align(rec)));
786 ut_ad(index->n_fields >= n_fields);
787 ut_ad(index->n_fields == n_fields || index->is_instant());
788 ut_ad(dict_tf2_is_valid(index->table->flags, index->table->flags2));
789 ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */
790 ut_ad(dict_index_is_clust(new_index));
791
792 heap = mem_heap_create(DTUPLE_EST_ALLOC(n_fields));
793 tuple = dtuple_create(heap, n_fields);
794 dict_index_copy_types(tuple, index, n_fields);
795
796 dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index));
797
798 if (rec_get_1byte_offs_flag(rec)) {
799 for (ulint i = 0; i < n_fields; i++) {
800 dfield_t* dfield;
801 ulint len;
802 const void* field;
803
804 dfield = dtuple_get_nth_field(tuple, i);
805 field = rec_get_nth_field_old(rec, i, &len);
806
807 dfield_set_data(dfield, field, len);
808 }
809 } else {
810 for (ulint i = 0; i < n_fields; i++) {
811 dfield_t* dfield;
812 ulint len;
813 const void* field;
814
815 dfield = dtuple_get_nth_field(tuple, i);
816 field = rec_get_nth_field_old(rec, i, &len);
817
818 dfield_set_data(dfield, field, len);
819
820 if (rec_2_is_field_extern(rec, i)) {
821 dfield_set_ext(dfield);
822 }
823 }
824 }
825
826 dfield_t* db_trx_id = dtuple_get_nth_field(tuple, index->n_uniq);
827 ut_ad(dfield_get_len(db_trx_id) == DATA_TRX_ID_LEN);
828 ut_ad(dfield_get_len(db_trx_id + 1) == DATA_ROLL_PTR_LEN);
829
830 if (trx_read_trx_id(static_cast<const byte*>
831 (dfield_get_data(db_trx_id)))
832 < index->online_log->min_trx) {
833 dfield_set_data(db_trx_id, reset_trx_id, DATA_TRX_ID_LEN);
834 dfield_set_data(db_trx_id + 1, reset_trx_id + DATA_TRX_ID_LEN,
835 DATA_ROLL_PTR_LEN);
836 }
837
838 const bool is_instant = index->online_log->is_instant(index);
839 rec_comp_status_t status = is_instant
840 ? REC_STATUS_COLUMNS_ADDED : REC_STATUS_ORDINARY;
841
842 size = rec_get_converted_size_temp(
843 index, tuple->fields, tuple->n_fields, &extra_size, status);
844 if (is_instant) {
845 size++;
846 extra_size++;
847 }
848
849 mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
850
851 if (insert || index->online_log->same_pk) {
852 ut_ad(!old_pk);
853 old_pk_extra_size = old_pk_size = 0;
854 } else {
855 ut_ad(old_pk);
856 ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
857 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
858 old_pk, old_pk->n_fields - 2)->len);
859 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
860 old_pk, old_pk->n_fields - 1)->len);
861
862 old_pk_size = rec_get_converted_size_temp(
863 new_index, old_pk->fields, old_pk->n_fields,
864 &old_pk_extra_size);
865 ut_ad(old_pk_extra_size < 0x100);
866 mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
867 }
868
869 if (byte* b = row_log_table_open(index->online_log,
870 mrec_size, &avail_size)) {
871 if (insert) {
872 *b++ = ROW_T_INSERT;
873 } else {
874 *b++ = ROW_T_UPDATE;
875
876 if (old_pk_size) {
877 *b++ = static_cast<byte>(old_pk_extra_size);
878
879 rec_convert_dtuple_to_temp(
880 b + old_pk_extra_size, new_index,
881 old_pk->fields, old_pk->n_fields);
882 b += old_pk_size;
883 }
884 }
885
886 if (extra_size < 0x80) {
887 *b++ = static_cast<byte>(extra_size);
888 } else {
889 ut_ad(extra_size < 0x8000);
890 *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
891 *b++ = static_cast<byte>(extra_size);
892 }
893
894 if (status == REC_STATUS_COLUMNS_ADDED) {
895 ut_ad(is_instant);
896 if (n_fields <= index->online_log->n_core_fields) {
897 status = REC_STATUS_ORDINARY;
898 }
899 *b = status;
900 }
901
902 rec_convert_dtuple_to_temp(
903 b + extra_size, index, tuple->fields, tuple->n_fields,
904 status);
905 b += size;
906
907 row_log_table_close(index, b, mrec_size, avail_size);
908 }
909
910 mem_heap_free(heap);
911}
912
913/******************************************************//**
914Logs an insert or update to a table that is being rebuilt. */
915static
916void
917row_log_table_low(
918/*==============*/
919 const rec_t* rec, /*!< in: clustered index leaf page record,
920 page X-latched */
921 dict_index_t* index, /*!< in/out: clustered index, S-latched
922 or X-latched */
923 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
924 bool insert, /*!< in: true if insert, false if update */
925 const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert
926 and a PRIMARY KEY is being created) */
927{
928 ulint old_pk_size;
929 ulint old_pk_extra_size;
930 ulint extra_size;
931 ulint mrec_size;
932 ulint avail_size;
933 const dict_index_t* new_index;
934
935 new_index = dict_table_get_first_index(index->online_log->table);
936
937 ut_ad(dict_index_is_clust(index));
938 ut_ad(dict_index_is_clust(new_index));
939 ut_ad(!dict_index_is_online_ddl(new_index));
940 ut_ad(rec_offs_validate(rec, index, offsets));
941 ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
942 ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
943 ut_ad(rw_lock_own_flagged(
944 &index->lock,
945 RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
946#ifdef UNIV_DEBUG
947 switch (fil_page_get_type(page_align(rec))) {
948 case FIL_PAGE_INDEX:
949 break;
950 case FIL_PAGE_TYPE_INSTANT:
951 ut_ad(index->is_instant());
952 ut_ad(page_is_root(page_align(rec)));
953 break;
954 default:
955 ut_ad(!"wrong page type");
956 }
957#endif /* UNIV_DEBUG */
958 ut_ad(!rec_is_default_row(rec, index));
959 ut_ad(page_rec_is_leaf(rec));
960 ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
961 /* old_pk=row_log_table_get_pk() [not needed in INSERT] is a prefix
962 of the clustered index record (PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR),
963 with no information on virtual columns */
964 ut_ad(!old_pk || !insert);
965 ut_ad(!old_pk || old_pk->n_v_fields == 0);
966
967 if (index->online_status != ONLINE_INDEX_CREATION
968 || (index->type & DICT_CORRUPT) || index->table->corrupted
969 || index->online_log->error != DB_SUCCESS) {
970 return;
971 }
972
973 if (!rec_offs_comp(offsets)) {
974 row_log_table_low_redundant(
975 rec, index, insert, old_pk, new_index);
976 return;
977 }
978
979 ut_ad(page_is_comp(page_align(rec)));
980 ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY
981 || rec_get_status(rec) == REC_STATUS_COLUMNS_ADDED);
982
983 const ulint omit_size = REC_N_NEW_EXTRA_BYTES;
984
985 const ulint rec_extra_size = rec_offs_extra_size(offsets) - omit_size;
986 const bool is_instant = index->online_log->is_instant(index);
987 extra_size = rec_extra_size + is_instant;
988
989 mrec_size = ROW_LOG_HEADER_SIZE
990 + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size
991 + is_instant;
992
993 if (insert || index->online_log->same_pk) {
994 ut_ad(!old_pk);
995 old_pk_extra_size = old_pk_size = 0;
996 } else {
997 ut_ad(old_pk);
998 ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
999 ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
1000 old_pk, old_pk->n_fields - 2)->len);
1001 ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
1002 old_pk, old_pk->n_fields - 1)->len);
1003
1004 old_pk_size = rec_get_converted_size_temp(
1005 new_index, old_pk->fields, old_pk->n_fields,
1006 &old_pk_extra_size);
1007 ut_ad(old_pk_extra_size < 0x100);
1008 mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
1009 }
1010
1011 if (byte* b = row_log_table_open(index->online_log,
1012 mrec_size, &avail_size)) {
1013 if (insert) {
1014 *b++ = ROW_T_INSERT;
1015 } else {
1016 *b++ = ROW_T_UPDATE;
1017
1018 if (old_pk_size) {
1019 *b++ = static_cast<byte>(old_pk_extra_size);
1020
1021 rec_convert_dtuple_to_temp(
1022 b + old_pk_extra_size, new_index,
1023 old_pk->fields, old_pk->n_fields);
1024 b += old_pk_size;
1025 }
1026 }
1027
1028 if (extra_size < 0x80) {
1029 *b++ = static_cast<byte>(extra_size);
1030 } else {
1031 ut_ad(extra_size < 0x8000);
1032 *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
1033 *b++ = static_cast<byte>(extra_size);
1034 }
1035
1036 if (is_instant) {
1037 *b++ = rec_get_status(rec);
1038 } else {
1039 ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
1040 }
1041
1042 memcpy(b, rec - rec_extra_size - omit_size, rec_extra_size);
1043 b += rec_extra_size;
1044 ulint len;
1045 ulint trx_id_offs = rec_get_nth_field_offs(
1046 offsets, index->n_uniq, &len);
1047 ut_ad(len == DATA_TRX_ID_LEN);
1048 memcpy(b, rec, rec_offs_data_size(offsets));
1049 if (trx_read_trx_id(b + trx_id_offs)
1050 < index->online_log->min_trx) {
1051 memcpy(b + trx_id_offs,
1052 reset_trx_id, sizeof reset_trx_id);
1053 }
1054 b += rec_offs_data_size(offsets);
1055
1056 row_log_table_close(index, b, mrec_size, avail_size);
1057 }
1058}
1059
1060/******************************************************//**
1061Logs an update to a table that is being rebuilt.
1062This will be merged in row_log_table_apply_update(). */
1063void
1064row_log_table_update(
1065/*=================*/
1066 const rec_t* rec, /*!< in: clustered index leaf page record,
1067 page X-latched */
1068 dict_index_t* index, /*!< in/out: clustered index, S-latched
1069 or X-latched */
1070 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
1071 const dtuple_t* old_pk) /*!< in: row_log_table_get_pk()
1072 before the update */
1073{
1074 row_log_table_low(rec, index, offsets, false, old_pk);
1075}
1076
1077/** Gets the old table column of a PRIMARY KEY column.
1078@param table old table (before ALTER TABLE)
1079@param col_map mapping of old column numbers to new ones
1080@param col_no column position in the new table
1081@return old table column, or NULL if this is an added column */
1082static
1083const dict_col_t*
1084row_log_table_get_pk_old_col(
1085/*=========================*/
1086 const dict_table_t* table,
1087 const ulint* col_map,
1088 ulint col_no)
1089{
1090 for (ulint i = 0; i < table->n_cols; i++) {
1091 if (col_no == col_map[i]) {
1092 return(dict_table_get_nth_col(table, i));
1093 }
1094 }
1095
1096 return(NULL);
1097}
1098
1099/** Maps an old table column of a PRIMARY KEY column.
1100@param[in] ifield clustered index field in the new table (after
1101ALTER TABLE)
1102@param[in,out] dfield clustered index tuple field in the new table
1103@param[in,out] heap memory heap for allocating dfield contents
1104@param[in] rec clustered index leaf page record in the old
1105table
1106@param[in] offsets rec_get_offsets(rec)
1107@param[in] i rec field corresponding to col
1108@param[in] page_size page size of the old table
1109@param[in] max_len maximum length of dfield
1110@retval DB_INVALID_NULL if a NULL value is encountered
1111@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */
1112static
1113dberr_t
1114row_log_table_get_pk_col(
1115 const dict_field_t* ifield,
1116 dfield_t* dfield,
1117 mem_heap_t* heap,
1118 const rec_t* rec,
1119 const ulint* offsets,
1120 ulint i,
1121 const page_size_t& page_size,
1122 ulint max_len,
1123 bool ignore,
1124 const dtuple_t* defaults)
1125{
1126 const byte* field;
1127 ulint len;
1128
1129 field = rec_get_nth_field(rec, offsets, i, &len);
1130
1131 if (len == UNIV_SQL_NULL) {
1132 if (!ignore || !defaults->fields[i].data) {
1133 return(DB_INVALID_NULL);
1134 }
1135
1136 field = static_cast<const byte*>(defaults->fields[i].data);
1137 len = defaults->fields[i].len;
1138 }
1139
1140 if (rec_offs_nth_extern(offsets, i)) {
1141 ulint field_len = ifield->prefix_len;
1142 byte* blob_field;
1143
1144 if (!field_len) {
1145 field_len = ifield->fixed_len;
1146 if (!field_len) {
1147 field_len = max_len + 1;
1148 }
1149 }
1150
1151 blob_field = static_cast<byte*>(
1152 mem_heap_alloc(heap, field_len));
1153
1154 len = btr_copy_externally_stored_field_prefix(
1155 blob_field, field_len, page_size, field, len);
1156 if (len >= max_len + 1) {
1157 return(DB_TOO_BIG_INDEX_COL);
1158 }
1159
1160 dfield_set_data(dfield, blob_field, len);
1161 } else {
1162 dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
1163 }
1164
1165 return(DB_SUCCESS);
1166}
1167
1168/******************************************************//**
1169Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
1170of a table that is being rebuilt.
1171@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
1172or NULL if the PRIMARY KEY definition does not change */
1173const dtuple_t*
1174row_log_table_get_pk(
1175/*=================*/
1176 const rec_t* rec, /*!< in: clustered index leaf page record,
1177 page X-latched */
1178 dict_index_t* index, /*!< in/out: clustered index, S-latched
1179 or X-latched */
1180 const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
1181 byte* sys, /*!< out: DB_TRX_ID,DB_ROLL_PTR for
1182 row_log_table_delete(), or NULL */
1183 mem_heap_t** heap) /*!< in/out: memory heap where allocated */
1184{
1185 dtuple_t* tuple = NULL;
1186 row_log_t* log = index->online_log;
1187
1188 ut_ad(dict_index_is_clust(index));
1189 ut_ad(dict_index_is_online_ddl(index));
1190 ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
1191 ut_ad(rw_lock_own_flagged(
1192 &index->lock,
1193 RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1194
1195 ut_ad(log);
1196 ut_ad(log->table);
1197 ut_ad(log->min_trx);
1198
1199 if (log->same_pk) {
1200 /* The PRIMARY KEY columns are unchanged. */
1201 if (sys) {
1202 /* Store the DB_TRX_ID,DB_ROLL_PTR. */
1203 ulint trx_id_offs = index->trx_id_offset;
1204
1205 if (!trx_id_offs) {
1206 ulint pos = dict_index_get_sys_col_pos(
1207 index, DATA_TRX_ID);
1208 ulint len;
1209 ut_ad(pos > 0);
1210
1211 if (!offsets) {
1212 offsets = rec_get_offsets(
1213 rec, index, NULL, true,
1214 pos + 1, heap);
1215 }
1216
1217 trx_id_offs = rec_get_nth_field_offs(
1218 offsets, pos, &len);
1219 ut_ad(len == DATA_TRX_ID_LEN);
1220 }
1221
1222 const byte* ptr = trx_read_trx_id(rec + trx_id_offs)
1223 < log->min_trx
1224 ? reset_trx_id
1225 : rec + trx_id_offs;
1226
1227 memcpy(sys, ptr, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1228 ut_d(trx_id_check(sys, log->min_trx));
1229 }
1230
1231 return(NULL);
1232 }
1233
1234 mutex_enter(&log->mutex);
1235
1236 /* log->error is protected by log->mutex. */
1237 if (log->error == DB_SUCCESS) {
1238 dict_table_t* new_table = log->table;
1239 dict_index_t* new_index
1240 = dict_table_get_first_index(new_table);
1241 const ulint new_n_uniq
1242 = dict_index_get_n_unique(new_index);
1243
1244 if (!*heap) {
1245 ulint size = 0;
1246
1247 if (!offsets) {
1248 size += (1 + REC_OFFS_HEADER_SIZE
1249 + unsigned(index->n_fields))
1250 * sizeof *offsets;
1251 }
1252
1253 for (ulint i = 0; i < new_n_uniq; i++) {
1254 size += dict_col_get_min_size(
1255 dict_index_get_nth_col(new_index, i));
1256 }
1257
1258 *heap = mem_heap_create(
1259 DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
1260 }
1261
1262 if (!offsets) {
1263 offsets = rec_get_offsets(rec, index, NULL, true,
1264 ULINT_UNDEFINED, heap);
1265 }
1266
1267 tuple = dtuple_create(*heap, new_n_uniq + 2);
1268 dict_index_copy_types(tuple, new_index, tuple->n_fields);
1269 dtuple_set_n_fields_cmp(tuple, new_n_uniq);
1270
1271 const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
1272
1273 const page_size_t& page_size
1274 = dict_table_page_size(index->table);
1275
1276 for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
1277 dict_field_t* ifield;
1278 dfield_t* dfield;
1279 ulint prtype;
1280 ulint mbminlen, mbmaxlen;
1281
1282 ifield = dict_index_get_nth_field(new_index, new_i);
1283 dfield = dtuple_get_nth_field(tuple, new_i);
1284
1285 const ulint col_no
1286 = dict_field_get_col(ifield)->ind;
1287
1288 if (const dict_col_t* col
1289 = row_log_table_get_pk_old_col(
1290 index->table, log->col_map, col_no)) {
1291 ulint i = dict_col_get_clust_pos(col, index);
1292
1293 if (i == ULINT_UNDEFINED) {
1294 ut_ad(0);
1295 log->error = DB_CORRUPTION;
1296 goto err_exit;
1297 }
1298
1299 log->error = row_log_table_get_pk_col(
1300 ifield, dfield, *heap,
1301 rec, offsets, i, page_size, max_len,
1302 log->ignore, log->defaults);
1303
1304 if (log->error != DB_SUCCESS) {
1305err_exit:
1306 tuple = NULL;
1307 goto func_exit;
1308 }
1309
1310 mbminlen = col->mbminlen;
1311 mbmaxlen = col->mbmaxlen;
1312 prtype = col->prtype;
1313 } else {
1314 /* No matching column was found in the old
1315 table, so this must be an added column.
1316 Copy the default value. */
1317 ut_ad(log->defaults);
1318
1319 dfield_copy(dfield, dtuple_get_nth_field(
1320 log->defaults, col_no));
1321 mbminlen = dfield->type.mbminlen;
1322 mbmaxlen = dfield->type.mbmaxlen;
1323 prtype = dfield->type.prtype;
1324 }
1325
1326 ut_ad(!dfield_is_ext(dfield));
1327 ut_ad(!dfield_is_null(dfield));
1328
1329 if (ifield->prefix_len) {
1330 ulint len = dtype_get_at_most_n_mbchars(
1331 prtype, mbminlen, mbmaxlen,
1332 ifield->prefix_len,
1333 dfield_get_len(dfield),
1334 static_cast<const char*>(
1335 dfield_get_data(dfield)));
1336
1337 ut_ad(len <= dfield_get_len(dfield));
1338 dfield_set_len(dfield, len);
1339 }
1340 }
1341
1342 const byte* trx_roll = rec
1343 + row_get_trx_id_offset(index, offsets);
1344
1345 /* Copy the fields, because the fields will be updated
1346 or the record may be moved somewhere else in the B-tree
1347 as part of the upcoming operation. */
1348 if (trx_read_trx_id(trx_roll) < log->min_trx) {
1349 trx_roll = reset_trx_id;
1350 if (sys) {
1351 memcpy(sys, trx_roll,
1352 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1353 }
1354 } else if (sys) {
1355 memcpy(sys, trx_roll,
1356 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1357 trx_roll = sys;
1358 } else {
1359 trx_roll = static_cast<const byte*>(
1360 mem_heap_dup(
1361 *heap, trx_roll,
1362 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
1363 }
1364
1365 ut_d(trx_id_check(trx_roll, log->min_trx));
1366
1367 dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
1368 trx_roll, DATA_TRX_ID_LEN);
1369 dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
1370 trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
1371 }
1372
1373func_exit:
1374 mutex_exit(&log->mutex);
1375 return(tuple);
1376}
1377
1378/******************************************************//**
1379Logs an insert to a table that is being rebuilt.
1380This will be merged in row_log_table_apply_insert(). */
1381void
1382row_log_table_insert(
1383/*=================*/
1384 const rec_t* rec, /*!< in: clustered index leaf page record,
1385 page X-latched */
1386 dict_index_t* index, /*!< in/out: clustered index, S-latched
1387 or X-latched */
1388 const ulint* offsets)/*!< in: rec_get_offsets(rec,index) */
1389{
1390 row_log_table_low(rec, index, offsets, true, NULL);
1391}
1392
1393/******************************************************//**
1394Notes that a BLOB is being freed during online ALTER TABLE. */
1395void
1396row_log_table_blob_free(
1397/*====================*/
1398 dict_index_t* index, /*!< in/out: clustered index, X-latched */
1399 ulint page_no)/*!< in: starting page number of the BLOB */
1400{
1401 ut_ad(dict_index_is_clust(index));
1402 ut_ad(dict_index_is_online_ddl(index));
1403 ut_ad(rw_lock_own_flagged(
1404 &index->lock,
1405 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1406 ut_ad(page_no != FIL_NULL);
1407
1408 if (index->online_log->error != DB_SUCCESS) {
1409 return;
1410 }
1411
1412 page_no_map* blobs = index->online_log->blobs;
1413
1414 if (blobs == NULL) {
1415 index->online_log->blobs = blobs = UT_NEW_NOKEY(page_no_map());
1416 }
1417
1418#ifdef UNIV_DEBUG
1419 const ulonglong log_pos = index->online_log->tail.total;
1420#else
1421# define log_pos /* empty */
1422#endif /* UNIV_DEBUG */
1423
1424 const page_no_map::value_type v(page_no,
1425 row_log_table_blob_t(log_pos));
1426
1427 std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
1428
1429 if (!p.second) {
1430 /* Update the existing mapping. */
1431 ut_ad(p.first->first == page_no);
1432 p.first->second.blob_free(log_pos);
1433 }
1434#undef log_pos
1435}
1436
1437/******************************************************//**
1438Notes that a BLOB is being allocated during online ALTER TABLE. */
1439void
1440row_log_table_blob_alloc(
1441/*=====================*/
1442 dict_index_t* index, /*!< in/out: clustered index, X-latched */
1443 ulint page_no)/*!< in: starting page number of the BLOB */
1444{
1445 ut_ad(dict_index_is_clust(index));
1446 ut_ad(dict_index_is_online_ddl(index));
1447
1448 ut_ad(rw_lock_own_flagged(
1449 &index->lock,
1450 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1451
1452 ut_ad(page_no != FIL_NULL);
1453
1454 if (index->online_log->error != DB_SUCCESS) {
1455 return;
1456 }
1457
1458 /* Only track allocations if the same page has been freed
1459 earlier. Double allocation without a free is not allowed. */
1460 if (page_no_map* blobs = index->online_log->blobs) {
1461 page_no_map::iterator p = blobs->find(page_no);
1462
1463 if (p != blobs->end()) {
1464 ut_ad(p->first == page_no);
1465 p->second.blob_alloc(index->online_log->tail.total);
1466 }
1467 }
1468}
1469
1470/******************************************************//**
1471Converts a log record to a table row.
1472@return converted row, or NULL if the conversion fails */
1473static MY_ATTRIBUTE((nonnull, warn_unused_result))
1474const dtuple_t*
1475row_log_table_apply_convert_mrec(
1476/*=============================*/
1477 const mrec_t* mrec, /*!< in: merge record */
1478 dict_index_t* index, /*!< in: index of mrec */
1479 const ulint* offsets, /*!< in: offsets of mrec */
1480 const row_log_t* log, /*!< in: rebuild context */
1481 mem_heap_t* heap, /*!< in/out: memory heap */
1482 dberr_t* error) /*!< out: DB_SUCCESS or
1483 DB_MISSING_HISTORY or
1484 reason of failure */
1485{
1486 dtuple_t* row;
1487
1488 *error = DB_SUCCESS;
1489
1490 /* This is based on row_build(). */
1491 if (log->defaults) {
1492 row = dtuple_copy(log->defaults, heap);
1493 /* dict_table_copy_types() would set the fields to NULL */
1494 for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
1495 dict_col_copy_type(
1496 dict_table_get_nth_col(log->table, i),
1497 dfield_get_type(dtuple_get_nth_field(row, i)));
1498 }
1499 } else {
1500 row = dtuple_create(heap, dict_table_get_n_cols(log->table));
1501 dict_table_copy_types(row, log->table);
1502 }
1503
1504 for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
1505 const dict_field_t* ind_field
1506 = dict_index_get_nth_field(index, i);
1507
1508 if (ind_field->prefix_len) {
1509 /* Column prefixes can only occur in key
1510 fields, which cannot be stored externally. For
1511 a column prefix, there should also be the full
1512 field in the clustered index tuple. The row
1513 tuple comprises full fields, not prefixes. */
1514 ut_ad(!rec_offs_nth_extern(offsets, i));
1515 continue;
1516 }
1517
1518 const dict_col_t* col
1519 = dict_field_get_col(ind_field);
1520
1521 ulint col_no
1522 = log->col_map[dict_col_get_no(col)];
1523
1524 if (col_no == ULINT_UNDEFINED) {
1525 /* dropped column */
1526 continue;
1527 }
1528
1529 dfield_t* dfield
1530 = dtuple_get_nth_field(row, col_no);
1531
1532 ulint len;
1533 const byte* data;
1534
1535 if (rec_offs_nth_extern(offsets, i)) {
1536 ut_ad(rec_offs_any_extern(offsets));
1537 rw_lock_x_lock(dict_index_get_lock(index));
1538
1539 if (const page_no_map* blobs = log->blobs) {
1540 data = rec_get_nth_field(
1541 mrec, offsets, i, &len);
1542 ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1543
1544 ulint page_no = mach_read_from_4(
1545 data + len - (BTR_EXTERN_FIELD_REF_SIZE
1546 - BTR_EXTERN_PAGE_NO));
1547 page_no_map::const_iterator p = blobs->find(
1548 page_no);
1549 if (p != blobs->end()
1550 && p->second.is_freed(log->head.total)) {
1551 /* This BLOB has been freed.
1552 We must not access the row. */
1553 *error = DB_MISSING_HISTORY;
1554 dfield_set_data(dfield, data, len);
1555 dfield_set_ext(dfield);
1556 goto blob_done;
1557 }
1558 }
1559
1560 data = btr_rec_copy_externally_stored_field(
1561 mrec, offsets,
1562 dict_table_page_size(index->table),
1563 i, &len, heap);
1564 ut_a(data);
1565 dfield_set_data(dfield, data, len);
1566blob_done:
1567 rw_lock_x_unlock(dict_index_get_lock(index));
1568 } else {
1569 data = rec_get_nth_field(mrec, offsets, i, &len);
1570 if (len == UNIV_SQL_DEFAULT) {
1571 data = index->instant_field_value(i, &len);
1572 }
1573 dfield_set_data(dfield, data, len);
1574 }
1575
1576 if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL
1577 && col->len != len && !dict_table_is_comp(log->table)) {
1578
1579 ut_ad(col->len >= len);
1580 if (dict_table_is_comp(index->table)) {
1581 byte* buf = (byte*) mem_heap_alloc(heap,
1582 col->len);
1583 memcpy(buf, dfield->data, len);
1584 memset(buf + len, 0x20, col->len - len);
1585
1586 dfield_set_data(dfield, buf, col->len);
1587 } else {
1588 /* field length mismatch should not happen
1589 when rebuilding the redundant row format
1590 table. */
1591 ut_ad(0);
1592 *error = DB_CORRUPTION;
1593 return(NULL);
1594 }
1595 }
1596
1597 /* See if any columns were changed to NULL or NOT NULL. */
1598 const dict_col_t* new_col
1599 = dict_table_get_nth_col(log->table, col_no);
1600 ut_ad(new_col->mtype == col->mtype);
1601
1602 /* Assert that prtype matches except for nullability. */
1603 ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
1604 ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
1605 & ~DATA_NOT_NULL));
1606
1607 if (new_col->prtype == col->prtype) {
1608 continue;
1609 }
1610
1611 if ((new_col->prtype & DATA_NOT_NULL)
1612 && dfield_is_null(dfield)) {
1613
1614 const dfield_t& default_field
1615 = log->defaults->fields[col_no];
1616
1617 if (!log->ignore || !default_field.data) {
1618 /* We got a NULL value for a NOT NULL column. */
1619 *error = DB_INVALID_NULL;
1620 return NULL;
1621 }
1622
1623 *dfield = default_field;
1624 }
1625
1626 /* Adjust the DATA_NOT_NULL flag in the parsed row. */
1627 dfield_get_type(dfield)->prtype = new_col->prtype;
1628
1629 ut_ad(dict_col_type_assert_equal(new_col,
1630 dfield_get_type(dfield)));
1631 }
1632
1633 return(row);
1634}
1635
1636/******************************************************//**
1637Replays an insert operation on a table that was rebuilt.
1638@return DB_SUCCESS or error code */
1639static MY_ATTRIBUTE((nonnull, warn_unused_result))
1640dberr_t
1641row_log_table_apply_insert_low(
1642/*===========================*/
1643 que_thr_t* thr, /*!< in: query graph */
1644 const dtuple_t* row, /*!< in: table row
1645 in the old table definition */
1646 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1647 that can be emptied */
1648 mem_heap_t* heap, /*!< in/out: memory heap */
1649 row_merge_dup_t* dup) /*!< in/out: for reporting
1650 duplicate key errors */
1651{
1652 dberr_t error;
1653 dtuple_t* entry;
1654 const row_log_t*log = dup->index->online_log;
1655 dict_index_t* index = dict_table_get_first_index(log->table);
1656 ulint n_index = 0;
1657
1658 ut_ad(dtuple_validate(row));
1659
1660 DBUG_LOG("ib_alter_table",
1661 "insert table " << index->table->id << " (index "
1662 << index->id << "): " << rec_printer(row).str());
1663
1664 static const ulint flags
1665 = (BTR_CREATE_FLAG
1666 | BTR_NO_LOCKING_FLAG
1667 | BTR_NO_UNDO_LOG_FLAG
1668 | BTR_KEEP_SYS_FLAG);
1669
1670 entry = row_build_index_entry(row, NULL, index, heap);
1671
1672 error = row_ins_clust_index_entry_low(
1673 flags, BTR_MODIFY_TREE, index, index->n_uniq,
1674 entry, 0, thr, false);
1675
1676 switch (error) {
1677 case DB_SUCCESS:
1678 break;
1679 case DB_SUCCESS_LOCKED_REC:
1680 /* The row had already been copied to the table. */
1681 return(DB_SUCCESS);
1682 default:
1683 return(error);
1684 }
1685
1686 ut_ad(dict_index_is_clust(index));
1687
1688 for (n_index += index->type != DICT_CLUSTERED;
1689 (index = dict_table_get_next_index(index)); n_index++) {
1690 if (index->type & DICT_FTS) {
1691 continue;
1692 }
1693
1694 entry = row_build_index_entry(row, NULL, index, heap);
1695 error = row_ins_sec_index_entry_low(
1696 flags, BTR_MODIFY_TREE,
1697 index, offsets_heap, heap, entry,
1698 thr_get_trx(thr)->id, thr, false);
1699
1700 if (error != DB_SUCCESS) {
1701 if (error == DB_DUPLICATE_KEY) {
1702 thr_get_trx(thr)->error_key_num = n_index;
1703 }
1704 break;
1705 }
1706 }
1707
1708 return(error);
1709}
1710
1711/******************************************************//**
1712Replays an insert operation on a table that was rebuilt.
1713@return DB_SUCCESS or error code */
1714static MY_ATTRIBUTE((nonnull, warn_unused_result))
1715dberr_t
1716row_log_table_apply_insert(
1717/*=======================*/
1718 que_thr_t* thr, /*!< in: query graph */
1719 const mrec_t* mrec, /*!< in: record to insert */
1720 const ulint* offsets, /*!< in: offsets of mrec */
1721 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1722 that can be emptied */
1723 mem_heap_t* heap, /*!< in/out: memory heap */
1724 row_merge_dup_t* dup) /*!< in/out: for reporting
1725 duplicate key errors */
1726{
1727 const row_log_t*log = dup->index->online_log;
1728 dberr_t error;
1729 const dtuple_t* row = row_log_table_apply_convert_mrec(
1730 mrec, dup->index, offsets, log, heap, &error);
1731
1732 switch (error) {
1733 case DB_MISSING_HISTORY:
1734 ut_ad(log->blobs);
1735 /* Because some BLOBs are missing, we know that the
1736 transaction was rolled back later (a rollback of
1737 an insert can free BLOBs).
1738 We can simply skip the insert: the subsequent
1739 ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will
1740 be interpreted as ROW_T_INSERT. */
1741 return(DB_SUCCESS);
1742 case DB_SUCCESS:
1743 ut_ad(row != NULL);
1744 break;
1745 default:
1746 ut_ad(0);
1747 case DB_INVALID_NULL:
1748 ut_ad(row == NULL);
1749 return(error);
1750 }
1751
1752 error = row_log_table_apply_insert_low(
1753 thr, row, offsets_heap, heap, dup);
1754 if (error != DB_SUCCESS) {
1755 /* Report the erroneous row using the new
1756 version of the table. */
1757 innobase_row_to_mysql(dup->table, log->table, row);
1758 }
1759 return(error);
1760}
1761
1762/******************************************************//**
1763Deletes a record from a table that is being rebuilt.
1764@return DB_SUCCESS or error code */
1765static MY_ATTRIBUTE((nonnull, warn_unused_result))
1766dberr_t
1767row_log_table_apply_delete_low(
1768/*===========================*/
1769 btr_pcur_t* pcur, /*!< in/out: B-tree cursor,
1770 will be trashed */
1771 const ulint* offsets, /*!< in: offsets on pcur */
1772 mem_heap_t* heap, /*!< in/out: memory heap */
1773 mtr_t* mtr) /*!< in/out: mini-transaction,
1774 will be committed */
1775{
1776 dberr_t error;
1777 row_ext_t* ext;
1778 dtuple_t* row;
1779 dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index;
1780
1781 ut_ad(dict_index_is_clust(index));
1782
1783 DBUG_LOG("ib_alter_table",
1784 "delete table " << index->table->id << " (index "
1785 << index->id << "): "
1786 << rec_printer(btr_pcur_get_rec(pcur), offsets).str());
1787
1788 if (dict_table_get_next_index(index)) {
1789 /* Build a row template for purging secondary index entries. */
1790 row = row_build(
1791 ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
1792 offsets, NULL, NULL, NULL, &ext, heap);
1793 } else {
1794 row = NULL;
1795 }
1796
1797 btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
1798 BTR_CREATE_FLAG, false, mtr);
1799 mtr_commit(mtr);
1800
1801 if (error != DB_SUCCESS) {
1802 return(error);
1803 }
1804
1805 while ((index = dict_table_get_next_index(index)) != NULL) {
1806 if (index->type & DICT_FTS) {
1807 continue;
1808 }
1809
1810 const dtuple_t* entry = row_build_index_entry(
1811 row, ext, index, heap);
1812 mtr->start();
1813 index->set_modified(*mtr);
1814 btr_pcur_open(index, entry, PAGE_CUR_LE,
1815 BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1816 pcur, mtr);
1817#ifdef UNIV_DEBUG
1818 switch (btr_pcur_get_btr_cur(pcur)->flag) {
1819 case BTR_CUR_DELETE_REF:
1820 case BTR_CUR_DEL_MARK_IBUF:
1821 case BTR_CUR_DELETE_IBUF:
1822 case BTR_CUR_INSERT_TO_IBUF:
1823 /* We did not request buffering. */
1824 break;
1825 case BTR_CUR_HASH:
1826 case BTR_CUR_HASH_FAIL:
1827 case BTR_CUR_BINARY:
1828 goto flag_ok;
1829 }
1830 ut_ad(0);
1831flag_ok:
1832#endif /* UNIV_DEBUG */
1833
1834 if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
1835 || btr_pcur_get_low_match(pcur) < index->n_uniq) {
1836 /* All secondary index entries should be
1837 found, because new_table is being modified by
1838 this thread only, and all indexes should be
1839 updated in sync. */
1840 mtr->commit();
1841 return(DB_INDEX_CORRUPT);
1842 }
1843
1844 btr_cur_pessimistic_delete(&error, FALSE,
1845 btr_pcur_get_btr_cur(pcur),
1846 BTR_CREATE_FLAG, false, mtr);
1847 mtr->commit();
1848 }
1849
1850 return(error);
1851}
1852
1853/******************************************************//**
1854Replays a delete operation on a table that was rebuilt.
1855@return DB_SUCCESS or error code */
1856static MY_ATTRIBUTE((nonnull, warn_unused_result))
1857dberr_t
1858row_log_table_apply_delete(
1859/*=======================*/
1860 ulint trx_id_col, /*!< in: position of
1861 DB_TRX_ID in the new
1862 clustered index */
1863 const mrec_t* mrec, /*!< in: merge record */
1864 const ulint* moffsets, /*!< in: offsets of mrec */
1865 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1866 that can be emptied */
1867 mem_heap_t* heap, /*!< in/out: memory heap */
1868 const row_log_t* log) /*!< in: online log */
1869{
1870 dict_table_t* new_table = log->table;
1871 dict_index_t* index = dict_table_get_first_index(new_table);
1872 dtuple_t* old_pk;
1873 mtr_t mtr;
1874 btr_pcur_t pcur;
1875 ulint* offsets;
1876
1877 ut_ad(rec_offs_n_fields(moffsets)
1878 == dict_index_get_n_unique(index) + 2);
1879 ut_ad(!rec_offs_any_extern(moffsets));
1880
1881 /* Convert the row to a search tuple. */
1882 old_pk = dtuple_create(heap, index->n_uniq);
1883 dict_index_copy_types(old_pk, index, index->n_uniq);
1884
1885 for (ulint i = 0; i < index->n_uniq; i++) {
1886 ulint len;
1887 const void* field;
1888 field = rec_get_nth_field(mrec, moffsets, i, &len);
1889 ut_ad(len != UNIV_SQL_NULL);
1890 dfield_set_data(dtuple_get_nth_field(old_pk, i),
1891 field, len);
1892 }
1893
1894 mtr_start(&mtr);
1895 index->set_modified(mtr);
1896 btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1897 BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1898 &pcur, &mtr);
1899#ifdef UNIV_DEBUG
1900 switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1901 case BTR_CUR_DELETE_REF:
1902 case BTR_CUR_DEL_MARK_IBUF:
1903 case BTR_CUR_DELETE_IBUF:
1904 case BTR_CUR_INSERT_TO_IBUF:
1905 /* We did not request buffering. */
1906 break;
1907 case BTR_CUR_HASH:
1908 case BTR_CUR_HASH_FAIL:
1909 case BTR_CUR_BINARY:
1910 goto flag_ok;
1911 }
1912 ut_ad(0);
1913flag_ok:
1914#endif /* UNIV_DEBUG */
1915
1916 if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1917 || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1918all_done:
1919 mtr_commit(&mtr);
1920 /* The record was not found. All done. */
1921 /* This should only happen when an earlier
1922 ROW_T_INSERT was skipped or
1923 ROW_T_UPDATE was interpreted as ROW_T_DELETE
1924 due to BLOBs having been freed by rollback. */
1925 return(DB_SUCCESS);
1926 }
1927
1928 offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL, true,
1929 ULINT_UNDEFINED, &offsets_heap);
1930#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
1931 ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
1932#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
1933
1934 /* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */
1935
1936 {
1937 ulint len;
1938 const byte* mrec_trx_id
1939 = rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
1940 ut_ad(len == DATA_TRX_ID_LEN);
1941 const byte* rec_trx_id
1942 = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1943 trx_id_col, &len);
1944 ut_ad(len == DATA_TRX_ID_LEN);
1945 ut_d(trx_id_check(rec_trx_id, log->min_trx));
1946 ut_d(trx_id_check(mrec_trx_id, log->min_trx));
1947
1948 ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len)
1949 == mrec_trx_id + DATA_TRX_ID_LEN);
1950 ut_ad(len == DATA_ROLL_PTR_LEN);
1951 ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1952 trx_id_col + 1, &len)
1953 == rec_trx_id + DATA_TRX_ID_LEN);
1954 ut_ad(len == DATA_ROLL_PTR_LEN);
1955
1956 if (memcmp(mrec_trx_id, rec_trx_id,
1957 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
1958 /* The ROW_T_DELETE was logged for a different
1959 PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR.
1960 This is possible if a ROW_T_INSERT was skipped
1961 or a ROW_T_UPDATE was interpreted as ROW_T_DELETE
1962 because some BLOBs were missing due to
1963 (1) rolling back the initial insert, or
1964 (2) purging the BLOB for a later ROW_T_DELETE
1965 (3) purging 'old values' for a later ROW_T_UPDATE
1966 or ROW_T_DELETE. */
1967 ut_ad(!log->same_pk);
1968 goto all_done;
1969 }
1970 }
1971
1972 return row_log_table_apply_delete_low(&pcur, offsets, heap, &mtr);
1973}
1974
1975/******************************************************//**
1976Replays an update operation on a table that was rebuilt.
1977@return DB_SUCCESS or error code */
1978static MY_ATTRIBUTE((nonnull, warn_unused_result))
1979dberr_t
1980row_log_table_apply_update(
1981/*=======================*/
1982 que_thr_t* thr, /*!< in: query graph */
1983 ulint new_trx_id_col, /*!< in: position of
1984 DB_TRX_ID in the new
1985 clustered index */
1986 const mrec_t* mrec, /*!< in: new value */
1987 const ulint* offsets, /*!< in: offsets of mrec */
1988 mem_heap_t* offsets_heap, /*!< in/out: memory heap
1989 that can be emptied */
1990 mem_heap_t* heap, /*!< in/out: memory heap */
1991 row_merge_dup_t* dup, /*!< in/out: for reporting
1992 duplicate key errors */
1993 const dtuple_t* old_pk) /*!< in: PRIMARY KEY and
1994 DB_TRX_ID,DB_ROLL_PTR
1995 of the old value,
1996 or PRIMARY KEY if same_pk */
1997{
1998 const row_log_t*log = dup->index->online_log;
1999 const dtuple_t* row;
2000 dict_index_t* index = dict_table_get_first_index(log->table);
2001 mtr_t mtr;
2002 btr_pcur_t pcur;
2003 dberr_t error;
2004 ulint n_index = 0;
2005
2006 ut_ad(dtuple_get_n_fields_cmp(old_pk)
2007 == dict_index_get_n_unique(index));
2008 ut_ad(dtuple_get_n_fields(old_pk)
2009 == dict_index_get_n_unique(index)
2010 + (log->same_pk ? 0 : 2));
2011
2012 row = row_log_table_apply_convert_mrec(
2013 mrec, dup->index, offsets, log, heap, &error);
2014
2015 switch (error) {
2016 case DB_MISSING_HISTORY:
2017 /* The record contained BLOBs that are now missing. */
2018 ut_ad(log->blobs);
2019 /* Whether or not we are updating the PRIMARY KEY, we
2020 know that there should be a subsequent
2021 ROW_T_DELETE for rolling back a preceding ROW_T_INSERT,
2022 overriding this ROW_T_UPDATE record. (*1)
2023
2024 This allows us to interpret this ROW_T_UPDATE
2025 as ROW_T_DELETE.
2026
2027 When applying the subsequent ROW_T_DELETE, no matching
2028 record will be found. */
2029 /* fall through */
2030 case DB_SUCCESS:
2031 ut_ad(row != NULL);
2032 break;
2033 default:
2034 ut_ad(0);
2035 case DB_INVALID_NULL:
2036 ut_ad(row == NULL);
2037 return(error);
2038 }
2039
2040 mtr_start(&mtr);
2041 index->set_modified(mtr);
2042 btr_pcur_open(index, old_pk, PAGE_CUR_LE,
2043 BTR_MODIFY_TREE, &pcur, &mtr);
2044#ifdef UNIV_DEBUG
2045 switch (btr_pcur_get_btr_cur(&pcur)->flag) {
2046 case BTR_CUR_DELETE_REF:
2047 case BTR_CUR_DEL_MARK_IBUF:
2048 case BTR_CUR_DELETE_IBUF:
2049 case BTR_CUR_INSERT_TO_IBUF:
2050 ut_ad(0);/* We did not request buffering. */
2051 case BTR_CUR_HASH:
2052 case BTR_CUR_HASH_FAIL:
2053 case BTR_CUR_BINARY:
2054 break;
2055 }
2056#endif /* UNIV_DEBUG */
2057
2058 if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
2059 || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
2060 /* The record was not found. This should only happen
2061 when an earlier ROW_T_INSERT or ROW_T_UPDATE was
2062 diverted because BLOBs were freed when the insert was
2063 later rolled back. */
2064
2065 ut_ad(log->blobs);
2066
2067 if (error == DB_SUCCESS) {
2068 /* An earlier ROW_T_INSERT could have been
2069 skipped because of a missing BLOB, like this:
2070
2071 BEGIN;
2072 INSERT INTO t SET blob_col='blob value';
2073 UPDATE t SET blob_col='';
2074 ROLLBACK;
2075
2076 This would generate the following records:
2077 ROW_T_INSERT (referring to 'blob value')
2078 ROW_T_UPDATE
2079 ROW_T_UPDATE (referring to 'blob value')
2080 ROW_T_DELETE
2081 [ROLLBACK removes the 'blob value']
2082
2083 The ROW_T_INSERT would have been skipped
2084 because of a missing BLOB. Now we are
2085 executing the first ROW_T_UPDATE.
2086 The second ROW_T_UPDATE (for the ROLLBACK)
2087 would be interpreted as ROW_T_DELETE, because
2088 the BLOB would be missing.
2089
2090 We could probably assume that the transaction
2091 has been rolled back and simply skip the
2092 'insert' part of this ROW_T_UPDATE record.
2093 However, there might be some complex scenario
2094 that could interfere with such a shortcut.
2095 So, we will insert the row (and risk
2096 introducing a bogus duplicate key error
2097 for the ALTER TABLE), and a subsequent
2098 ROW_T_UPDATE or ROW_T_DELETE will delete it. */
2099 mtr_commit(&mtr);
2100 error = row_log_table_apply_insert_low(
2101 thr, row, offsets_heap, heap, dup);
2102 } else {
2103 /* Some BLOBs are missing, so we are interpreting
2104 this ROW_T_UPDATE as ROW_T_DELETE (see *1).
2105 Because the record was not found, we do nothing. */
2106 ut_ad(error == DB_MISSING_HISTORY);
2107 error = DB_SUCCESS;
2108func_exit:
2109 mtr_commit(&mtr);
2110 }
2111func_exit_committed:
2112 ut_ad(mtr.has_committed());
2113
2114 if (error != DB_SUCCESS) {
2115 /* Report the erroneous row using the new
2116 version of the table. */
2117 innobase_row_to_mysql(dup->table, log->table, row);
2118 }
2119
2120 return(error);
2121 }
2122
2123 /* Prepare to update (or delete) the record. */
2124 ulint* cur_offsets = rec_get_offsets(
2125 btr_pcur_get_rec(&pcur), index, NULL, true,
2126 ULINT_UNDEFINED, &offsets_heap);
2127
2128 if (!log->same_pk) {
2129 /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
2130 was buffered. */
2131 ulint len;
2132 const byte* rec_trx_id
2133 = rec_get_nth_field(btr_pcur_get_rec(&pcur),
2134 cur_offsets, index->n_uniq, &len);
2135 const dfield_t* old_pk_trx_id
2136 = dtuple_get_nth_field(old_pk, index->n_uniq);
2137 ut_ad(len == DATA_TRX_ID_LEN);
2138 ut_d(trx_id_check(rec_trx_id, log->min_trx));
2139 ut_ad(old_pk_trx_id->len == DATA_TRX_ID_LEN);
2140 ut_ad(old_pk_trx_id[1].len == DATA_ROLL_PTR_LEN);
2141 ut_ad(DATA_TRX_ID_LEN
2142 + static_cast<const char*>(old_pk_trx_id->data)
2143 == old_pk_trx_id[1].data);
2144 ut_d(trx_id_check(old_pk_trx_id->data, log->min_trx));
2145
2146 if (memcmp(rec_trx_id, old_pk_trx_id->data,
2147 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
2148 /* The ROW_T_UPDATE was logged for a different
2149 DB_TRX_ID,DB_ROLL_PTR. This is possible if an
2150 earlier ROW_T_INSERT or ROW_T_UPDATE was diverted
2151 because some BLOBs were missing due to rolling
2152 back the initial insert or due to purging
2153 the old BLOB values of an update. */
2154 ut_ad(log->blobs);
2155 if (error != DB_SUCCESS) {
2156 ut_ad(error == DB_MISSING_HISTORY);
2157 /* Some BLOBs are missing, so we are
2158 interpreting this ROW_T_UPDATE as
2159 ROW_T_DELETE (see *1).
2160 Because this is a different row,
2161 we will do nothing. */
2162 error = DB_SUCCESS;
2163 } else {
2164 /* Because the user record is missing due to
2165 BLOBs that were missing when processing
2166 an earlier log record, we should
2167 interpret the ROW_T_UPDATE as ROW_T_INSERT.
2168 However, there is a different user record
2169 with the same PRIMARY KEY value already. */
2170 error = DB_DUPLICATE_KEY;
2171 }
2172
2173 goto func_exit;
2174 }
2175 }
2176
2177 if (error != DB_SUCCESS) {
2178 ut_ad(error == DB_MISSING_HISTORY);
2179 ut_ad(log->blobs);
2180 /* Some BLOBs are missing, so we are interpreting
2181 this ROW_T_UPDATE as ROW_T_DELETE (see *1). */
2182 error = row_log_table_apply_delete_low(
2183 &pcur, cur_offsets, heap, &mtr);
2184 goto func_exit_committed;
2185 }
2186
2187 dtuple_t* entry = row_build_index_entry_low(
2188 row, NULL, index, heap, ROW_BUILD_NORMAL);
2189 upd_t* update = row_upd_build_difference_binary(
2190 index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
2191 false, NULL, heap, dup->table);
2192
2193 if (!update->n_fields) {
2194 /* Nothing to do. */
2195 goto func_exit;
2196 }
2197
2198 const bool pk_updated
2199 = upd_get_nth_field(update, 0)->field_no < new_trx_id_col;
2200
2201 if (pk_updated || rec_offs_any_extern(cur_offsets)) {
2202 /* If the record contains any externally stored
2203 columns, perform the update by delete and insert,
2204 because we will not write any undo log that would
2205 allow purge to free any orphaned externally stored
2206 columns. */
2207
2208 if (pk_updated && log->same_pk) {
2209 /* The ROW_T_UPDATE log record should only be
2210 written when the PRIMARY KEY fields of the
2211 record did not change in the old table. We
2212 can only get a change of PRIMARY KEY columns
2213 in the rebuilt table if the PRIMARY KEY was
2214 redefined (!same_pk). */
2215 ut_ad(0);
2216 error = DB_CORRUPTION;
2217 goto func_exit;
2218 }
2219
2220 error = row_log_table_apply_delete_low(
2221 &pcur, cur_offsets, heap, &mtr);
2222 ut_ad(mtr.has_committed());
2223
2224 if (error == DB_SUCCESS) {
2225 error = row_log_table_apply_insert_low(
2226 thr, row, offsets_heap, heap, dup);
2227 }
2228
2229 goto func_exit_committed;
2230 }
2231
2232 dtuple_t* old_row;
2233 row_ext_t* old_ext;
2234
2235 if (dict_table_get_next_index(index)) {
2236 /* Construct the row corresponding to the old value of
2237 the record. */
2238 old_row = row_build(
2239 ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
2240 cur_offsets, NULL, NULL, NULL, &old_ext, heap);
2241 ut_ad(old_row);
2242
2243 DBUG_LOG("ib_alter_table",
2244 "update table " << index->table->id
2245 << " (index " << index->id
2246 << ": " << rec_printer(old_row).str()
2247 << " to " << rec_printer(row).str());
2248 } else {
2249 old_row = NULL;
2250 old_ext = NULL;
2251 }
2252
2253 big_rec_t* big_rec;
2254
2255 error = btr_cur_pessimistic_update(
2256 BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2257 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
2258 | BTR_KEEP_POS_FLAG,
2259 btr_pcur_get_btr_cur(&pcur),
2260 &cur_offsets, &offsets_heap, heap, &big_rec,
2261 update, 0, thr, 0, &mtr);
2262
2263 if (big_rec) {
2264 if (error == DB_SUCCESS) {
2265 error = btr_store_big_rec_extern_fields(
2266 &pcur, cur_offsets, big_rec, &mtr,
2267 BTR_STORE_UPDATE);
2268 }
2269
2270 dtuple_big_rec_free(big_rec);
2271 }
2272
2273 for (n_index += index->type != DICT_CLUSTERED;
2274 (index = dict_table_get_next_index(index)); n_index++) {
2275 if (index->type & DICT_FTS) {
2276 continue;
2277 }
2278
2279 if (error != DB_SUCCESS) {
2280 break;
2281 }
2282
2283 if (!row_upd_changes_ord_field_binary(
2284 index, update, thr, old_row, NULL)) {
2285 continue;
2286 }
2287
2288 if (dict_index_has_virtual(index)) {
2289 dtuple_copy_v_fields(old_row, old_pk);
2290 }
2291
2292 mtr_commit(&mtr);
2293
2294 entry = row_build_index_entry(old_row, old_ext, index, heap);
2295 if (!entry) {
2296 ut_ad(0);
2297 return(DB_CORRUPTION);
2298 }
2299
2300 mtr_start(&mtr);
2301 index->set_modified(mtr);
2302
2303 if (ROW_FOUND != row_search_index_entry(
2304 index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
2305 ut_ad(0);
2306 error = DB_CORRUPTION;
2307 break;
2308 }
2309
2310 btr_cur_pessimistic_delete(
2311 &error, FALSE, btr_pcur_get_btr_cur(&pcur),
2312 BTR_CREATE_FLAG, false, &mtr);
2313
2314 if (error != DB_SUCCESS) {
2315 break;
2316 }
2317
2318 mtr_commit(&mtr);
2319
2320 entry = row_build_index_entry(row, NULL, index, heap);
2321 error = row_ins_sec_index_entry_low(
2322 BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2323 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
2324 BTR_MODIFY_TREE, index, offsets_heap, heap,
2325 entry, thr_get_trx(thr)->id, thr, false);
2326
2327 /* Report correct index name for duplicate key error. */
2328 if (error == DB_DUPLICATE_KEY) {
2329 thr_get_trx(thr)->error_key_num = n_index;
2330 }
2331
2332 mtr_start(&mtr);
2333 index->set_modified(mtr);
2334 }
2335
2336 goto func_exit;
2337}
2338
2339/******************************************************//**
2340Applies an operation to a table that was rebuilt.
2341@return NULL on failure (mrec corruption) or when out of data;
2342pointer to next record on success */
2343static MY_ATTRIBUTE((nonnull, warn_unused_result))
2344const mrec_t*
2345row_log_table_apply_op(
2346/*===================*/
2347 que_thr_t* thr, /*!< in: query graph */
2348 ulint new_trx_id_col, /*!< in: position of
2349 DB_TRX_ID in new index */
2350 row_merge_dup_t* dup, /*!< in/out: for reporting
2351 duplicate key errors */
2352 dberr_t* error, /*!< out: DB_SUCCESS
2353 or error code */
2354 mem_heap_t* offsets_heap, /*!< in/out: memory heap
2355 that can be emptied */
2356 mem_heap_t* heap, /*!< in/out: memory heap */
2357 const mrec_t* mrec, /*!< in: merge record */
2358 const mrec_t* mrec_end, /*!< in: end of buffer */
2359 ulint* offsets) /*!< in/out: work area
2360 for parsing mrec */
2361{
2362 row_log_t* log = dup->index->online_log;
2363 dict_index_t* new_index = dict_table_get_first_index(log->table);
2364 ulint extra_size;
2365 const mrec_t* next_mrec;
2366 dtuple_t* old_pk;
2367
2368 ut_ad(dict_index_is_clust(dup->index));
2369 ut_ad(dup->index->table != log->table);
2370 ut_ad(log->head.total <= log->tail.total);
2371
2372 *error = DB_SUCCESS;
2373
2374 /* 3 = 1 (op type) + 1 (extra_size) + at least 1 byte payload */
2375 if (mrec + 3 >= mrec_end) {
2376 return(NULL);
2377 }
2378
2379 const bool is_instant = log->is_instant(dup->index);
2380 const mrec_t* const mrec_start = mrec;
2381
2382 switch (*mrec++) {
2383 default:
2384 ut_ad(0);
2385 *error = DB_CORRUPTION;
2386 return(NULL);
2387 case ROW_T_INSERT:
2388 extra_size = *mrec++;
2389
2390 if (extra_size >= 0x80) {
2391 /* Read another byte of extra_size. */
2392
2393 extra_size = (extra_size & 0x7f) << 8;
2394 extra_size |= *mrec++;
2395 }
2396
2397 mrec += extra_size;
2398
2399 ut_ad(extra_size || !is_instant);
2400
2401 if (mrec > mrec_end) {
2402 return(NULL);
2403 }
2404
2405 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2406 rec_init_offsets_temp(mrec, dup->index, offsets,
2407 log->n_core_fields,
2408 is_instant
2409 ? static_cast<rec_comp_status_t>(
2410 *(mrec - extra_size))
2411 : REC_STATUS_ORDINARY);
2412
2413 next_mrec = mrec + rec_offs_data_size(offsets);
2414
2415 if (next_mrec > mrec_end) {
2416 return(NULL);
2417 } else {
2418 log->head.total += ulint(next_mrec - mrec_start);
2419 *error = row_log_table_apply_insert(
2420 thr, mrec, offsets, offsets_heap,
2421 heap, dup);
2422 }
2423 break;
2424
2425 case ROW_T_DELETE:
2426 /* 1 (extra_size) + at least 1 (payload) */
2427 if (mrec + 2 >= mrec_end) {
2428 return(NULL);
2429 }
2430
2431 extra_size = *mrec++;
2432 ut_ad(mrec < mrec_end);
2433
2434 /* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
2435 For fixed-length PRIMARY key columns, it is 0. */
2436 mrec += extra_size;
2437
2438 /* The ROW_T_DELETE record was converted by
2439 rec_convert_dtuple_to_temp() using new_index. */
2440 ut_ad(!new_index->is_instant());
2441 rec_offs_set_n_fields(offsets,
2442 unsigned(new_index->n_uniq) + 2);
2443 rec_init_offsets_temp(mrec, new_index, offsets);
2444 next_mrec = mrec + rec_offs_data_size(offsets);
2445 if (next_mrec > mrec_end) {
2446 return(NULL);
2447 }
2448
2449 log->head.total += ulint(next_mrec - mrec_start);
2450
2451 *error = row_log_table_apply_delete(
2452 new_trx_id_col,
2453 mrec, offsets, offsets_heap, heap, log);
2454 break;
2455
2456 case ROW_T_UPDATE:
2457 /* Logically, the log entry consists of the
2458 (PRIMARY KEY,DB_TRX_ID) of the old value (converted
2459 to the new primary key definition) followed by
2460 the new value in the old table definition. If the
2461 definition of the columns belonging to PRIMARY KEY
2462 is not changed, the log will only contain
2463 DB_TRX_ID,new_row. */
2464
2465 if (log->same_pk) {
2466 ut_ad(new_index->n_uniq == dup->index->n_uniq);
2467
2468 extra_size = *mrec++;
2469
2470 if (extra_size >= 0x80) {
2471 /* Read another byte of extra_size. */
2472
2473 extra_size = (extra_size & 0x7f) << 8;
2474 extra_size |= *mrec++;
2475 }
2476
2477 mrec += extra_size;
2478
2479 ut_ad(extra_size || !is_instant);
2480
2481 if (mrec > mrec_end) {
2482 return(NULL);
2483 }
2484
2485 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2486 rec_init_offsets_temp(mrec, dup->index, offsets,
2487 log->n_core_fields,
2488 is_instant
2489 ? static_cast<rec_comp_status_t>(
2490 *(mrec - extra_size))
2491 : REC_STATUS_ORDINARY);
2492
2493 next_mrec = mrec + rec_offs_data_size(offsets);
2494
2495 if (next_mrec > mrec_end) {
2496 return(NULL);
2497 }
2498
2499 old_pk = dtuple_create(heap, new_index->n_uniq);
2500 dict_index_copy_types(
2501 old_pk, new_index, old_pk->n_fields);
2502
2503 /* Copy the PRIMARY KEY fields from mrec to old_pk. */
2504 for (ulint i = 0; i < new_index->n_uniq; i++) {
2505 const void* field;
2506 ulint len;
2507 dfield_t* dfield;
2508
2509 ut_ad(!rec_offs_nth_extern(offsets, i));
2510
2511 field = rec_get_nth_field(
2512 mrec, offsets, i, &len);
2513 ut_ad(len != UNIV_SQL_NULL);
2514
2515 dfield = dtuple_get_nth_field(old_pk, i);
2516 dfield_set_data(dfield, field, len);
2517 }
2518 } else {
2519 /* We assume extra_size < 0x100
2520 for the PRIMARY KEY prefix. */
2521 mrec += *mrec + 1;
2522
2523 if (mrec > mrec_end) {
2524 return(NULL);
2525 }
2526
2527 /* Get offsets for PRIMARY KEY,
2528 DB_TRX_ID, DB_ROLL_PTR. */
2529 /* The old_pk prefix was converted by
2530 rec_convert_dtuple_to_temp() using new_index. */
2531 ut_ad(!new_index->is_instant());
2532 rec_offs_set_n_fields(offsets,
2533 unsigned(new_index->n_uniq) + 2);
2534 rec_init_offsets_temp(mrec, new_index, offsets);
2535
2536 next_mrec = mrec + rec_offs_data_size(offsets);
2537 if (next_mrec + 2 > mrec_end) {
2538 return(NULL);
2539 }
2540
2541 /* Copy the PRIMARY KEY fields and
2542 DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
2543 old_pk = dtuple_create(
2544 heap, unsigned(new_index->n_uniq) + 2);
2545 dict_index_copy_types(old_pk, new_index,
2546 old_pk->n_fields);
2547
2548 for (ulint i = 0;
2549 i < dict_index_get_n_unique(new_index) + 2;
2550 i++) {
2551 const void* field;
2552 ulint len;
2553 dfield_t* dfield;
2554
2555 ut_ad(!rec_offs_nth_extern(offsets, i));
2556
2557 field = rec_get_nth_field(
2558 mrec, offsets, i, &len);
2559 ut_ad(len != UNIV_SQL_NULL);
2560
2561 dfield = dtuple_get_nth_field(old_pk, i);
2562 dfield_set_data(dfield, field, len);
2563 }
2564
2565 mrec = next_mrec;
2566
2567 /* Fetch the new value of the row as it was
2568 in the old table definition. */
2569 extra_size = *mrec++;
2570
2571 if (extra_size >= 0x80) {
2572 /* Read another byte of extra_size. */
2573
2574 extra_size = (extra_size & 0x7f) << 8;
2575 extra_size |= *mrec++;
2576 }
2577
2578 mrec += extra_size;
2579
2580 ut_ad(extra_size || !is_instant);
2581
2582 if (mrec > mrec_end) {
2583 return(NULL);
2584 }
2585
2586 rec_offs_set_n_fields(offsets, dup->index->n_fields);
2587 rec_init_offsets_temp(mrec, dup->index, offsets,
2588 log->n_core_fields,
2589 is_instant
2590 ? static_cast<rec_comp_status_t>(
2591 *(mrec - extra_size))
2592 : REC_STATUS_ORDINARY);
2593
2594 next_mrec = mrec + rec_offs_data_size(offsets);
2595
2596 if (next_mrec > mrec_end) {
2597 return(NULL);
2598 }
2599 }
2600
2601 ut_ad(next_mrec <= mrec_end);
2602 log->head.total += ulint(next_mrec - mrec_start);
2603 dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
2604
2605 *error = row_log_table_apply_update(
2606 thr, new_trx_id_col,
2607 mrec, offsets, offsets_heap, heap, dup, old_pk);
2608 break;
2609 }
2610
2611 ut_ad(log->head.total <= log->tail.total);
2612 mem_heap_empty(offsets_heap);
2613 mem_heap_empty(heap);
2614 return(next_mrec);
2615}
2616
2617#ifdef HAVE_PSI_STAGE_INTERFACE
2618/** Estimate how much an ALTER TABLE progress should be incremented per
2619one block of log applied.
2620For the other phases of ALTER TABLE we increment the progress with 1 per
2621page processed.
2622@return amount of abstract units to add to work_completed when one block
2623of log is applied.
2624*/
2625inline
2626ulint
2627row_log_progress_inc_per_block()
2628{
2629 /* We must increment the progress once per page (as in
2630 univ_page_size, usually 16KiB). One block here is srv_sort_buf_size
2631 (usually 1MiB). */
2632 const ulint pages_per_block = std::max<ulint>(
2633 ulint(srv_sort_buf_size >> srv_page_size_shift), 1);
2634
2635 /* Multiply by an artificial factor of 6 to even the pace with
2636 the rest of the ALTER TABLE phases, they process page_size amount
2637 of data faster. */
2638 return(pages_per_block * 6);
2639}
2640
2641/** Estimate how much work is to be done by the log apply phase
2642of an ALTER TABLE for this index.
2643@param[in] index index whose log to assess
2644@return work to be done by log-apply in abstract units
2645*/
2646ulint
2647row_log_estimate_work(
2648 const dict_index_t* index)
2649{
2650 if (index == NULL || index->online_log == NULL) {
2651 return(0);
2652 }
2653
2654 const row_log_t* l = index->online_log;
2655 const ulint bytes_left =
2656 static_cast<ulint>(l->tail.total - l->head.total);
2657 const ulint blocks_left = bytes_left / srv_sort_buf_size;
2658
2659 return(blocks_left * row_log_progress_inc_per_block());
2660}
2661#else /* HAVE_PSI_STAGE_INTERFACE */
2662inline
2663ulint
2664row_log_progress_inc_per_block()
2665{
2666 return(0);
2667}
2668#endif /* HAVE_PSI_STAGE_INTERFACE */
2669
2670/** Applies operations to a table was rebuilt.
2671@param[in] thr query graph
2672@param[in,out] dup for reporting duplicate key errors
2673@param[in,out] stage performance schema accounting object, used by
2674ALTER TABLE. If not NULL, then stage->inc() will be called for each block
2675of log that is applied.
2676@return DB_SUCCESS, or error code on failure */
2677static MY_ATTRIBUTE((warn_unused_result))
2678dberr_t
2679row_log_table_apply_ops(
2680 que_thr_t* thr,
2681 row_merge_dup_t* dup,
2682 ut_stage_alter_t* stage)
2683{
2684 dberr_t error;
2685 const mrec_t* mrec = NULL;
2686 const mrec_t* next_mrec;
2687 const mrec_t* mrec_end = NULL; /* silence bogus warning */
2688 const mrec_t* next_mrec_end;
2689 mem_heap_t* heap;
2690 mem_heap_t* offsets_heap;
2691 ulint* offsets;
2692 bool has_index_lock;
2693 dict_index_t* index = const_cast<dict_index_t*>(
2694 dup->index);
2695 dict_table_t* new_table = index->online_log->table;
2696 dict_index_t* new_index = dict_table_get_first_index(
2697 new_table);
2698 const ulint i = 1 + REC_OFFS_HEADER_SIZE
2699 + ut_max(dict_index_get_n_fields(index),
2700 dict_index_get_n_unique(new_index) + 2);
2701 const ulint new_trx_id_col = dict_col_get_clust_pos(
2702 dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
2703 trx_t* trx = thr_get_trx(thr);
2704
2705 ut_ad(dict_index_is_clust(index));
2706 ut_ad(dict_index_is_online_ddl(index));
2707 ut_ad(trx->mysql_thd);
2708 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
2709 ut_ad(!dict_index_is_online_ddl(new_index));
2710 ut_ad(dict_col_get_clust_pos(
2711 dict_table_get_sys_col(index->table, DATA_TRX_ID), index)
2712 != ULINT_UNDEFINED);
2713 ut_ad(new_trx_id_col > 0);
2714 ut_ad(new_trx_id_col != ULINT_UNDEFINED);
2715
2716 UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
2717
2718 offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets));
2719 offsets[0] = i;
2720 offsets[1] = dict_index_get_n_fields(index);
2721
2722 heap = mem_heap_create(srv_page_size);
2723 offsets_heap = mem_heap_create(srv_page_size);
2724 has_index_lock = true;
2725
2726next_block:
2727 ut_ad(has_index_lock);
2728 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
2729 ut_ad(index->online_log->head.bytes == 0);
2730
2731 stage->inc(row_log_progress_inc_per_block());
2732
2733 if (trx_is_interrupted(trx)) {
2734 goto interrupted;
2735 }
2736
2737 if (index->is_corrupted()) {
2738 error = DB_INDEX_CORRUPT;
2739 goto func_exit;
2740 }
2741
2742 ut_ad(dict_index_is_online_ddl(index));
2743
2744 error = index->online_log->error;
2745
2746 if (error != DB_SUCCESS) {
2747 goto func_exit;
2748 }
2749
2750 if (UNIV_UNLIKELY(index->online_log->head.blocks
2751 > index->online_log->tail.blocks)) {
2752unexpected_eof:
2753 ib::error() << "Unexpected end of temporary file for table "
2754 << index->table->name;
2755corruption:
2756 error = DB_CORRUPTION;
2757 goto func_exit;
2758 }
2759
2760 if (index->online_log->head.blocks
2761 == index->online_log->tail.blocks) {
2762 if (index->online_log->head.blocks) {
2763#ifdef HAVE_FTRUNCATE
2764 /* Truncate the file in order to save space. */
2765 if (index->online_log->fd > 0
2766 && ftruncate(index->online_log->fd, 0) == -1) {
2767 ib::error()
2768 << "\'" << index->name + 1
2769 << "\' failed with error "
2770 << errno << ":" << strerror(errno);
2771
2772 goto corruption;
2773 }
2774#endif /* HAVE_FTRUNCATE */
2775 index->online_log->head.blocks
2776 = index->online_log->tail.blocks = 0;
2777 }
2778
2779 next_mrec = index->online_log->tail.block;
2780 next_mrec_end = next_mrec + index->online_log->tail.bytes;
2781
2782 if (next_mrec_end == next_mrec) {
2783 /* End of log reached. */
2784all_done:
2785 ut_ad(has_index_lock);
2786 ut_ad(index->online_log->head.blocks == 0);
2787 ut_ad(index->online_log->tail.blocks == 0);
2788 index->online_log->head.bytes = 0;
2789 index->online_log->tail.bytes = 0;
2790 error = DB_SUCCESS;
2791 goto func_exit;
2792 }
2793 } else {
2794 os_offset_t ofs;
2795
2796 ofs = (os_offset_t) index->online_log->head.blocks
2797 * srv_sort_buf_size;
2798
2799 ut_ad(has_index_lock);
2800 has_index_lock = false;
2801 rw_lock_x_unlock(dict_index_get_lock(index));
2802
2803 log_free_check();
2804
2805 ut_ad(dict_index_is_online_ddl(index));
2806
2807 if (!row_log_block_allocate(index->online_log->head)) {
2808 error = DB_OUT_OF_MEMORY;
2809 goto func_exit;
2810 }
2811
2812 IORequest request(IORequest::READ);
2813 byte* buf = index->online_log->head.block;
2814
2815 if (!os_file_read_no_error_handling(
2816 request, index->online_log->fd,
2817 buf, ofs, srv_sort_buf_size, 0)) {
2818 ib::error()
2819 << "Unable to read temporary file"
2820 " for table " << index->table->name;
2821 goto corruption;
2822 }
2823
2824 if (log_tmp_is_encrypted()) {
2825 if (!log_tmp_block_decrypt(
2826 buf, srv_sort_buf_size,
2827 index->online_log->crypt_head,
2828 ofs, index->table->space->id)) {
2829 error = DB_DECRYPTION_FAILED;
2830 goto func_exit;
2831 }
2832
2833 srv_stats.n_rowlog_blocks_decrypted.inc();
2834 memcpy(buf, index->online_log->crypt_head,
2835 srv_sort_buf_size);
2836 }
2837
2838#ifdef POSIX_FADV_DONTNEED
2839 /* Each block is read exactly once. Free up the file cache. */
2840 posix_fadvise(index->online_log->fd,
2841 ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
2842#endif /* POSIX_FADV_DONTNEED */
2843
2844 next_mrec = index->online_log->head.block;
2845 next_mrec_end = next_mrec + srv_sort_buf_size;
2846 }
2847
2848 /* This read is not protected by index->online_log->mutex for
2849 performance reasons. We will eventually notice any error that
2850 was flagged by a DML thread. */
2851 error = index->online_log->error;
2852
2853 if (error != DB_SUCCESS) {
2854 goto func_exit;
2855 }
2856
2857 if (mrec) {
2858 /* A partial record was read from the previous block.
2859 Copy the temporary buffer full, as we do not know the
2860 length of the record. Parse subsequent records from
2861 the bigger buffer index->online_log->head.block
2862 or index->online_log->tail.block. */
2863
2864 ut_ad(mrec == index->online_log->head.buf);
2865 ut_ad(mrec_end > mrec);
2866 ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
2867
2868 memcpy((mrec_t*) mrec_end, next_mrec,
2869 ulint((&index->online_log->head.buf)[1] - mrec_end));
2870 mrec = row_log_table_apply_op(
2871 thr, new_trx_id_col,
2872 dup, &error, offsets_heap, heap,
2873 index->online_log->head.buf,
2874 (&index->online_log->head.buf)[1], offsets);
2875 if (error != DB_SUCCESS) {
2876 goto func_exit;
2877 } else if (UNIV_UNLIKELY(mrec == NULL)) {
2878 /* The record was not reassembled properly. */
2879 goto corruption;
2880 }
2881 /* The record was previously found out to be
2882 truncated. Now that the parse buffer was extended,
2883 it should proceed beyond the old end of the buffer. */
2884 ut_a(mrec > mrec_end);
2885
2886 index->online_log->head.bytes = ulint(mrec - mrec_end);
2887 next_mrec += index->online_log->head.bytes;
2888 }
2889
2890 ut_ad(next_mrec <= next_mrec_end);
2891 /* The following loop must not be parsing the temporary
2892 buffer, but head.block or tail.block. */
2893
2894 /* mrec!=NULL means that the next record starts from the
2895 middle of the block */
2896 ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
2897
2898#ifdef UNIV_DEBUG
2899 if (next_mrec_end == index->online_log->head.block
2900 + srv_sort_buf_size) {
2901 /* If tail.bytes == 0, next_mrec_end can also be at
2902 the end of tail.block. */
2903 if (index->online_log->tail.bytes == 0) {
2904 ut_ad(next_mrec == next_mrec_end);
2905 ut_ad(index->online_log->tail.blocks == 0);
2906 ut_ad(index->online_log->head.blocks == 0);
2907 ut_ad(index->online_log->head.bytes == 0);
2908 } else {
2909 ut_ad(next_mrec == index->online_log->head.block
2910 + index->online_log->head.bytes);
2911 ut_ad(index->online_log->tail.blocks
2912 > index->online_log->head.blocks);
2913 }
2914 } else if (next_mrec_end == index->online_log->tail.block
2915 + index->online_log->tail.bytes) {
2916 ut_ad(next_mrec == index->online_log->tail.block
2917 + index->online_log->head.bytes);
2918 ut_ad(index->online_log->tail.blocks == 0);
2919 ut_ad(index->online_log->head.blocks == 0);
2920 ut_ad(index->online_log->head.bytes
2921 <= index->online_log->tail.bytes);
2922 } else {
2923 ut_error;
2924 }
2925#endif /* UNIV_DEBUG */
2926
2927 mrec_end = next_mrec_end;
2928
2929 while (!trx_is_interrupted(trx)) {
2930 mrec = next_mrec;
2931 ut_ad(mrec <= mrec_end);
2932
2933 if (mrec == mrec_end) {
2934 /* We are at the end of the log.
2935 Mark the replay all_done. */
2936 if (has_index_lock) {
2937 goto all_done;
2938 }
2939 }
2940
2941 if (!has_index_lock) {
2942 /* We are applying operations from a different
2943 block than the one that is being written to.
2944 We do not hold index->lock in order to
2945 allow other threads to concurrently buffer
2946 modifications. */
2947 ut_ad(mrec >= index->online_log->head.block);
2948 ut_ad(mrec_end == index->online_log->head.block
2949 + srv_sort_buf_size);
2950 ut_ad(index->online_log->head.bytes
2951 < srv_sort_buf_size);
2952
2953 /* Take the opportunity to do a redo log
2954 checkpoint if needed. */
2955 log_free_check();
2956 } else {
2957 /* We are applying operations from the last block.
2958 Do not allow other threads to buffer anything,
2959 so that we can finally catch up and synchronize. */
2960 ut_ad(index->online_log->head.blocks == 0);
2961 ut_ad(index->online_log->tail.blocks == 0);
2962 ut_ad(mrec_end == index->online_log->tail.block
2963 + index->online_log->tail.bytes);
2964 ut_ad(mrec >= index->online_log->tail.block);
2965 }
2966
2967 /* This read is not protected by index->online_log->mutex
2968 for performance reasons. We will eventually notice any
2969 error that was flagged by a DML thread. */
2970 error = index->online_log->error;
2971
2972 if (error != DB_SUCCESS) {
2973 goto func_exit;
2974 }
2975
2976 next_mrec = row_log_table_apply_op(
2977 thr, new_trx_id_col,
2978 dup, &error, offsets_heap, heap,
2979 mrec, mrec_end, offsets);
2980
2981 if (error != DB_SUCCESS) {
2982 goto func_exit;
2983 } else if (next_mrec == next_mrec_end) {
2984 /* The record happened to end on a block boundary.
2985 Do we have more blocks left? */
2986 if (has_index_lock) {
2987 /* The index will be locked while
2988 applying the last block. */
2989 goto all_done;
2990 }
2991
2992 mrec = NULL;
2993process_next_block:
2994 rw_lock_x_lock(dict_index_get_lock(index));
2995 has_index_lock = true;
2996
2997 index->online_log->head.bytes = 0;
2998 index->online_log->head.blocks++;
2999 goto next_block;
3000 } else if (next_mrec != NULL) {
3001 ut_ad(next_mrec < next_mrec_end);
3002 index->online_log->head.bytes
3003 += ulint(next_mrec - mrec);
3004 } else if (has_index_lock) {
3005 /* When mrec is within tail.block, it should
3006 be a complete record, because we are holding
3007 index->lock and thus excluding the writer. */
3008 ut_ad(index->online_log->tail.blocks == 0);
3009 ut_ad(mrec_end == index->online_log->tail.block
3010 + index->online_log->tail.bytes);
3011 ut_ad(0);
3012 goto unexpected_eof;
3013 } else {
3014 memcpy(index->online_log->head.buf, mrec,
3015 ulint(mrec_end - mrec));
3016 mrec_end += ulint(index->online_log->head.buf - mrec);
3017 mrec = index->online_log->head.buf;
3018 goto process_next_block;
3019 }
3020 }
3021
3022interrupted:
3023 error = DB_INTERRUPTED;
3024func_exit:
3025 if (!has_index_lock) {
3026 rw_lock_x_lock(dict_index_get_lock(index));
3027 }
3028
3029 mem_heap_free(offsets_heap);
3030 mem_heap_free(heap);
3031 row_log_block_free(index->online_log->head);
3032 ut_free(offsets);
3033 return(error);
3034}
3035
3036/** Apply the row_log_table log to a table upon completing rebuild.
3037@param[in] thr query graph
3038@param[in] old_table old table
3039@param[in,out] table MySQL table (for reporting duplicates)
3040@param[in,out] stage performance schema accounting object, used by
3041ALTER TABLE. stage->begin_phase_log_table() will be called initially and then
3042stage->inc() will be called for each block of log that is applied.
3043@return DB_SUCCESS, or error code on failure */
3044dberr_t
3045row_log_table_apply(
3046 que_thr_t* thr,
3047 dict_table_t* old_table,
3048 struct TABLE* table,
3049 ut_stage_alter_t* stage)
3050{
3051 dberr_t error;
3052 dict_index_t* clust_index;
3053
3054 thr_get_trx(thr)->error_key_num = 0;
3055 DBUG_EXECUTE_IF("innodb_trx_duplicates",
3056 thr_get_trx(thr)->duplicates = TRX_DUP_REPLACE;);
3057
3058 stage->begin_phase_log_table();
3059
3060 ut_ad(!rw_lock_own(dict_operation_lock, RW_LOCK_S));
3061 clust_index = dict_table_get_first_index(old_table);
3062
3063 rw_lock_x_lock(dict_index_get_lock(clust_index));
3064
3065 if (!clust_index->online_log) {
3066 ut_ad(dict_index_get_online_status(clust_index)
3067 == ONLINE_INDEX_COMPLETE);
3068 /* This function should not be called unless
3069 rebuilding a table online. Build in some fault
3070 tolerance. */
3071 ut_ad(0);
3072 error = DB_ERROR;
3073 } else {
3074 row_merge_dup_t dup = {
3075 clust_index, table,
3076 clust_index->online_log->col_map, 0
3077 };
3078
3079 error = row_log_table_apply_ops(thr, &dup, stage);
3080
3081 ut_ad(error != DB_SUCCESS
3082 || clust_index->online_log->head.total
3083 == clust_index->online_log->tail.total);
3084 }
3085
3086 rw_lock_x_unlock(dict_index_get_lock(clust_index));
3087 DBUG_EXECUTE_IF("innodb_trx_duplicates",
3088 thr_get_trx(thr)->duplicates = 0;);
3089
3090 return(error);
3091}
3092
3093/******************************************************//**
3094Allocate the row log for an index and flag the index
3095for online creation.
3096@retval true if success, false if not */
3097bool
3098row_log_allocate(
3099/*=============*/
3100 const trx_t* trx, /*!< in: the ALTER TABLE transaction */
3101 dict_index_t* index, /*!< in/out: index */
3102 dict_table_t* table, /*!< in/out: new table being rebuilt,
3103 or NULL when creating a secondary index */
3104 bool same_pk,/*!< in: whether the definition of the
3105 PRIMARY KEY has remained the same */
3106 const dtuple_t* defaults,
3107 /*!< in: default values of
3108 added, changed columns, or NULL */
3109 const ulint* col_map,/*!< in: mapping of old column
3110 numbers to new ones, or NULL if !table */
3111 const char* path, /*!< in: where to create temporary file */
3112 const bool ignore) /*!< in: alter ignore issued */
3113{
3114 row_log_t* log;
3115 DBUG_ENTER("row_log_allocate");
3116
3117 ut_ad(!dict_index_is_online_ddl(index));
3118 ut_ad(dict_index_is_clust(index) == !!table);
3119 ut_ad(!table || index->table != table);
3120 ut_ad(same_pk || table);
3121 ut_ad(!table || col_map);
3122 ut_ad(!defaults || col_map);
3123 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3124 ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
3125 ut_ad(trx->id);
3126
3127 log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log));
3128
3129 if (log == NULL) {
3130 DBUG_RETURN(false);
3131 }
3132
3133 log->fd = OS_FILE_CLOSED;
3134 mutex_create(LATCH_ID_INDEX_ONLINE_LOG, &log->mutex);
3135
3136 log->blobs = NULL;
3137 log->table = table;
3138 log->same_pk = same_pk;
3139 log->defaults = defaults;
3140 log->col_map = col_map;
3141 log->error = DB_SUCCESS;
3142 log->min_trx = trx->id;
3143 log->max_trx = 0;
3144 log->tail.blocks = log->tail.bytes = 0;
3145 log->tail.total = 0;
3146 log->tail.block = log->head.block = NULL;
3147 log->crypt_tail = log->crypt_head = NULL;
3148 log->head.blocks = log->head.bytes = 0;
3149 log->head.total = 0;
3150 log->path = path;
3151 log->n_core_fields = index->n_core_fields;
3152 ut_ad(!table || log->is_instant(index) == index->is_instant());
3153 log->ignore=ignore;
3154
3155 dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
3156 index->online_log = log;
3157
3158 if (log_tmp_is_encrypted()) {
3159 ulint size = srv_sort_buf_size;
3160 log->crypt_head = static_cast<byte *>(os_mem_alloc_large(&size));
3161 log->crypt_tail = static_cast<byte *>(os_mem_alloc_large(&size));
3162
3163 if (!log->crypt_head || !log->crypt_tail) {
3164 row_log_free(log);
3165 DBUG_RETURN(false);
3166 }
3167 }
3168
3169 /* While we might be holding an exclusive data dictionary lock
3170 here, in row_log_abort_sec() we will not always be holding it. Use
3171 atomic operations in both cases. */
3172 MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
3173
3174 DBUG_RETURN(true);
3175}
3176
3177/******************************************************//**
3178Free the row log for an index that was being created online. */
3179void
3180row_log_free(
3181/*=========*/
3182 row_log_t*& log) /*!< in,own: row log */
3183{
3184 MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
3185
3186 UT_DELETE(log->blobs);
3187 row_log_block_free(log->tail);
3188 row_log_block_free(log->head);
3189 row_merge_file_destroy_low(log->fd);
3190
3191 if (log->crypt_head) {
3192 os_mem_free_large(log->crypt_head, srv_sort_buf_size);
3193 }
3194
3195 if (log->crypt_tail) {
3196 os_mem_free_large(log->crypt_tail, srv_sort_buf_size);
3197 }
3198
3199 mutex_free(&log->mutex);
3200 ut_free(log);
3201 log = NULL;
3202}
3203
3204/******************************************************//**
3205Get the latest transaction ID that has invoked row_log_online_op()
3206during online creation.
3207@return latest transaction ID, or 0 if nothing was logged */
3208trx_id_t
3209row_log_get_max_trx(
3210/*================*/
3211 dict_index_t* index) /*!< in: index, must be locked */
3212{
3213 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION);
3214
3215 ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_S)
3216 && mutex_own(&index->online_log->mutex))
3217 || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3218
3219 return(index->online_log->max_trx);
3220}
3221
3222/******************************************************//**
3223Applies an operation to a secondary index that was being created. */
3224static MY_ATTRIBUTE((nonnull))
3225void
3226row_log_apply_op_low(
3227/*=================*/
3228 dict_index_t* index, /*!< in/out: index */
3229 row_merge_dup_t*dup, /*!< in/out: for reporting
3230 duplicate key errors */
3231 dberr_t* error, /*!< out: DB_SUCCESS or error code */
3232 mem_heap_t* offsets_heap, /*!< in/out: memory heap for
3233 allocating offsets; can be emptied */
3234 bool has_index_lock, /*!< in: true if holding index->lock
3235 in exclusive mode */
3236 enum row_op op, /*!< in: operation being applied */
3237 trx_id_t trx_id, /*!< in: transaction identifier */
3238 const dtuple_t* entry) /*!< in: row */
3239{
3240 mtr_t mtr;
3241 btr_cur_t cursor;
3242 ulint* offsets = NULL;
3243
3244 ut_ad(!dict_index_is_clust(index));
3245
3246 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)
3247 == has_index_lock);
3248
3249 ut_ad(!index->is_corrupted());
3250 ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
3251
3252 DBUG_LOG("ib_create_index",
3253 (op == ROW_OP_INSERT ? "insert " : "delete ")
3254 << (has_index_lock ? "locked index " : "unlocked index ")
3255 << index->id << ',' << ib::hex(trx_id) << ": "
3256 << rec_printer(entry).str());
3257
3258 mtr_start(&mtr);
3259 index->set_modified(mtr);
3260
3261 /* We perform the pessimistic variant of the operations if we
3262 already hold index->lock exclusively. First, search the
3263 record. The operation may already have been performed,
3264 depending on when the row in the clustered index was
3265 scanned. */
3266 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
3267 has_index_lock
3268 ? BTR_MODIFY_TREE
3269 : BTR_MODIFY_LEAF,
3270 &cursor, 0, __FILE__, __LINE__,
3271 &mtr);
3272
3273 ut_ad(dict_index_get_n_unique(index) > 0);
3274 /* This test is somewhat similar to row_ins_must_modify_rec(),
3275 but not identical for unique secondary indexes. */
3276 if (cursor.low_match >= dict_index_get_n_unique(index)
3277 && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
3278 /* We have a matching record. */
3279 bool exists = (cursor.low_match
3280 == dict_index_get_n_fields(index));
3281#ifdef UNIV_DEBUG
3282 rec_t* rec = btr_cur_get_rec(&cursor);
3283 ut_ad(page_rec_is_user_rec(rec));
3284 ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
3285#endif /* UNIV_DEBUG */
3286
3287 ut_ad(exists || dict_index_is_unique(index));
3288
3289 switch (op) {
3290 case ROW_OP_DELETE:
3291 if (!exists) {
3292 /* The existing record matches the
3293 unique secondary index key, but the
3294 PRIMARY KEY columns differ. So, this
3295 exact record does not exist. For
3296 example, we could detect a duplicate
3297 key error in some old index before
3298 logging an ROW_OP_INSERT for our
3299 index. This ROW_OP_DELETE could have
3300 been logged for rolling back
3301 TRX_UNDO_INSERT_REC. */
3302 goto func_exit;
3303 }
3304
3305 if (btr_cur_optimistic_delete(
3306 &cursor, BTR_CREATE_FLAG, &mtr)) {
3307 *error = DB_SUCCESS;
3308 break;
3309 }
3310
3311 if (!has_index_lock) {
3312 /* This needs a pessimistic operation.
3313 Lock the index tree exclusively. */
3314 mtr_commit(&mtr);
3315 mtr_start(&mtr);
3316 index->set_modified(mtr);
3317 btr_cur_search_to_nth_level(
3318 index, 0, entry, PAGE_CUR_LE,
3319 BTR_MODIFY_TREE, &cursor, 0,
3320 __FILE__, __LINE__, &mtr);
3321
3322 /* No other thread than the current one
3323 is allowed to modify the index tree.
3324 Thus, the record should still exist. */
3325 ut_ad(cursor.low_match
3326 >= dict_index_get_n_fields(index));
3327 ut_ad(page_rec_is_user_rec(
3328 btr_cur_get_rec(&cursor)));
3329 }
3330
3331 /* As there are no externally stored fields in
3332 a secondary index record, the parameter
3333 rollback=false will be ignored. */
3334
3335 btr_cur_pessimistic_delete(
3336 error, FALSE, &cursor,
3337 BTR_CREATE_FLAG, false, &mtr);
3338 break;
3339 case ROW_OP_INSERT:
3340 if (exists) {
3341 /* The record already exists. There
3342 is nothing to be inserted.
3343 This could happen when processing
3344 TRX_UNDO_DEL_MARK_REC in statement
3345 rollback:
3346
3347 UPDATE of PRIMARY KEY can lead to
3348 statement rollback if the updated
3349 value of the PRIMARY KEY already
3350 exists. In this case, the UPDATE would
3351 be mapped to DELETE;INSERT, and we
3352 only wrote undo log for the DELETE
3353 part. The duplicate key error would be
3354 triggered before logging the INSERT
3355 part.
3356
3357 Theoretically, we could also get a
3358 similar situation when a DELETE operation
3359 is blocked by a FOREIGN KEY constraint. */
3360 goto func_exit;
3361 }
3362
3363 if (dtuple_contains_null(entry)) {
3364 /* The UNIQUE KEY columns match, but
3365 there is a NULL value in the key, and
3366 NULL!=NULL. */
3367 goto insert_the_rec;
3368 }
3369
3370 goto duplicate;
3371 }
3372 } else {
3373 switch (op) {
3374 rec_t* rec;
3375 big_rec_t* big_rec;
3376 case ROW_OP_DELETE:
3377 /* The record does not exist. For example, we
3378 could detect a duplicate key error in some old
3379 index before logging an ROW_OP_INSERT for our
3380 index. This ROW_OP_DELETE could be logged for
3381 rolling back TRX_UNDO_INSERT_REC. */
3382 goto func_exit;
3383 case ROW_OP_INSERT:
3384 if (dict_index_is_unique(index)
3385 && (cursor.up_match
3386 >= dict_index_get_n_unique(index)
3387 || cursor.low_match
3388 >= dict_index_get_n_unique(index))
3389 && (!index->n_nullable
3390 || !dtuple_contains_null(entry))) {
3391duplicate:
3392 /* Duplicate key */
3393 ut_ad(dict_index_is_unique(index));
3394 row_merge_dup_report(dup, entry->fields);
3395 *error = DB_DUPLICATE_KEY;
3396 goto func_exit;
3397 }
3398insert_the_rec:
3399 /* Insert the record. As we are inserting into
3400 a secondary index, there cannot be externally
3401 stored columns (!big_rec). */
3402 *error = btr_cur_optimistic_insert(
3403 BTR_NO_UNDO_LOG_FLAG
3404 | BTR_NO_LOCKING_FLAG
3405 | BTR_CREATE_FLAG,
3406 &cursor, &offsets, &offsets_heap,
3407 const_cast<dtuple_t*>(entry),
3408 &rec, &big_rec, 0, NULL, &mtr);
3409 ut_ad(!big_rec);
3410 if (*error != DB_FAIL) {
3411 break;
3412 }
3413
3414 if (!has_index_lock) {
3415 /* This needs a pessimistic operation.
3416 Lock the index tree exclusively. */
3417 mtr_commit(&mtr);
3418 mtr_start(&mtr);
3419 index->set_modified(mtr);
3420 btr_cur_search_to_nth_level(
3421 index, 0, entry, PAGE_CUR_LE,
3422 BTR_MODIFY_TREE, &cursor, 0,
3423 __FILE__, __LINE__, &mtr);
3424 }
3425
3426 /* We already determined that the
3427 record did not exist. No other thread
3428 than the current one is allowed to
3429 modify the index tree. Thus, the
3430 record should still not exist. */
3431
3432 *error = btr_cur_pessimistic_insert(
3433 BTR_NO_UNDO_LOG_FLAG
3434 | BTR_NO_LOCKING_FLAG
3435 | BTR_CREATE_FLAG,
3436 &cursor, &offsets, &offsets_heap,
3437 const_cast<dtuple_t*>(entry),
3438 &rec, &big_rec,
3439 0, NULL, &mtr);
3440 ut_ad(!big_rec);
3441 break;
3442 }
3443 mem_heap_empty(offsets_heap);
3444 }
3445
3446 if (*error == DB_SUCCESS && trx_id) {
3447 page_update_max_trx_id(btr_cur_get_block(&cursor),
3448 btr_cur_get_page_zip(&cursor),
3449 trx_id, &mtr);
3450 }
3451
3452func_exit:
3453 mtr_commit(&mtr);
3454}
3455
3456/******************************************************//**
3457Applies an operation to a secondary index that was being created.
3458@return NULL on failure (mrec corruption) or when out of data;
3459pointer to next record on success */
3460static MY_ATTRIBUTE((nonnull, warn_unused_result))
3461const mrec_t*
3462row_log_apply_op(
3463/*=============*/
3464 dict_index_t* index, /*!< in/out: index */
3465 row_merge_dup_t*dup, /*!< in/out: for reporting
3466 duplicate key errors */
3467 dberr_t* error, /*!< out: DB_SUCCESS or error code */
3468 mem_heap_t* offsets_heap, /*!< in/out: memory heap for
3469 allocating offsets; can be emptied */
3470 mem_heap_t* heap, /*!< in/out: memory heap for
3471 allocating data tuples */
3472 bool has_index_lock, /*!< in: true if holding index->lock
3473 in exclusive mode */
3474 const mrec_t* mrec, /*!< in: merge record */
3475 const mrec_t* mrec_end, /*!< in: end of buffer */
3476 ulint* offsets) /*!< in/out: work area for
3477 rec_init_offsets_temp() */
3478
3479{
3480 enum row_op op;
3481 ulint extra_size;
3482 ulint data_size;
3483 ulint n_ext;
3484 dtuple_t* entry;
3485 trx_id_t trx_id;
3486
3487 /* Online index creation is only used for secondary indexes. */
3488 ut_ad(!dict_index_is_clust(index));
3489
3490 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)
3491 == has_index_lock);
3492
3493 if (index->is_corrupted()) {
3494 *error = DB_INDEX_CORRUPT;
3495 return(NULL);
3496 }
3497
3498 *error = DB_SUCCESS;
3499
3500 if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
3501 return(NULL);
3502 }
3503
3504 switch (*mrec) {
3505 case ROW_OP_INSERT:
3506 if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
3507 return(NULL);
3508 }
3509
3510 op = static_cast<enum row_op>(*mrec++);
3511 trx_id = trx_read_trx_id(mrec);
3512 mrec += DATA_TRX_ID_LEN;
3513 break;
3514 case ROW_OP_DELETE:
3515 op = static_cast<enum row_op>(*mrec++);
3516 trx_id = 0;
3517 break;
3518 default:
3519corrupted:
3520 ut_ad(0);
3521 *error = DB_CORRUPTION;
3522 return(NULL);
3523 }
3524
3525 extra_size = *mrec++;
3526
3527 ut_ad(mrec < mrec_end);
3528
3529 if (extra_size >= 0x80) {
3530 /* Read another byte of extra_size. */
3531
3532 extra_size = (extra_size & 0x7f) << 8;
3533 extra_size |= *mrec++;
3534 }
3535
3536 mrec += extra_size;
3537
3538 if (mrec > mrec_end) {
3539 return(NULL);
3540 }
3541
3542 rec_init_offsets_temp(mrec, index, offsets);
3543
3544 if (rec_offs_any_extern(offsets)) {
3545 /* There should never be any externally stored fields
3546 in a secondary index, which is what online index
3547 creation is used for. Therefore, the log file must be
3548 corrupted. */
3549 goto corrupted;
3550 }
3551
3552 data_size = rec_offs_data_size(offsets);
3553
3554 mrec += data_size;
3555
3556 if (mrec > mrec_end) {
3557 return(NULL);
3558 }
3559
3560 entry = row_rec_to_index_entry_low(
3561 mrec - data_size, index, offsets, &n_ext, heap);
3562 /* Online index creation is only implemented for secondary
3563 indexes, which never contain off-page columns. */
3564 ut_ad(n_ext == 0);
3565
3566 row_log_apply_op_low(index, dup, error, offsets_heap,
3567 has_index_lock, op, trx_id, entry);
3568 return(mrec);
3569}
3570
3571/** Applies operations to a secondary index that was being created.
3572@param[in] trx transaction (for checking if the operation was
3573interrupted)
3574@param[in,out] index index
3575@param[in,out] dup for reporting duplicate key errors
3576@param[in,out] stage performance schema accounting object, used by
3577ALTER TABLE. If not NULL, then stage->inc() will be called for each block
3578of log that is applied.
3579@return DB_SUCCESS, or error code on failure */
3580static
3581dberr_t
3582row_log_apply_ops(
3583 const trx_t* trx,
3584 dict_index_t* index,
3585 row_merge_dup_t* dup,
3586 ut_stage_alter_t* stage)
3587{
3588 dberr_t error;
3589 const mrec_t* mrec = NULL;
3590 const mrec_t* next_mrec;
3591 const mrec_t* mrec_end= NULL; /* silence bogus warning */
3592 const mrec_t* next_mrec_end;
3593 mem_heap_t* offsets_heap;
3594 mem_heap_t* heap;
3595 ulint* offsets;
3596 bool has_index_lock;
3597 const ulint i = 1 + REC_OFFS_HEADER_SIZE
3598 + dict_index_get_n_fields(index);
3599
3600 ut_ad(dict_index_is_online_ddl(index));
3601 ut_ad(!index->is_committed());
3602 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3603 ut_ad(index->online_log);
3604 UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
3605
3606 offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets));
3607 offsets[0] = i;
3608 offsets[1] = dict_index_get_n_fields(index);
3609
3610 offsets_heap = mem_heap_create(srv_page_size);
3611 heap = mem_heap_create(srv_page_size);
3612 has_index_lock = true;
3613
3614next_block:
3615 ut_ad(has_index_lock);
3616 ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3617 ut_ad(index->online_log->head.bytes == 0);
3618
3619 stage->inc(row_log_progress_inc_per_block());
3620
3621 if (trx_is_interrupted(trx)) {
3622 goto interrupted;
3623 }
3624
3625 error = index->online_log->error;
3626 if (error != DB_SUCCESS) {
3627 goto func_exit;
3628 }
3629
3630 if (index->is_corrupted()) {
3631 error = DB_INDEX_CORRUPT;
3632 goto func_exit;
3633 }
3634
3635 if (UNIV_UNLIKELY(index->online_log->head.blocks
3636 > index->online_log->tail.blocks)) {
3637unexpected_eof:
3638 ib::error() << "Unexpected end of temporary file for index "
3639 << index->name;
3640corruption:
3641 error = DB_CORRUPTION;
3642 goto func_exit;
3643 }
3644
3645 if (index->online_log->head.blocks
3646 == index->online_log->tail.blocks) {
3647 if (index->online_log->head.blocks) {
3648#ifdef HAVE_FTRUNCATE
3649 /* Truncate the file in order to save space. */
3650 if (index->online_log->fd > 0
3651 && ftruncate(index->online_log->fd, 0) == -1) {
3652 ib::error()
3653 << "\'" << index->name + 1
3654 << "\' failed with error "
3655 << errno << ":" << strerror(errno);
3656
3657 goto corruption;
3658 }
3659#endif /* HAVE_FTRUNCATE */
3660 index->online_log->head.blocks
3661 = index->online_log->tail.blocks = 0;
3662 }
3663
3664 next_mrec = index->online_log->tail.block;
3665 next_mrec_end = next_mrec + index->online_log->tail.bytes;
3666
3667 if (next_mrec_end == next_mrec) {
3668 /* End of log reached. */
3669all_done:
3670 ut_ad(has_index_lock);
3671 ut_ad(index->online_log->head.blocks == 0);
3672 ut_ad(index->online_log->tail.blocks == 0);
3673 error = DB_SUCCESS;
3674 goto func_exit;
3675 }
3676 } else {
3677 os_offset_t ofs = static_cast<os_offset_t>(
3678 index->online_log->head.blocks)
3679 * srv_sort_buf_size;
3680 IORequest request(IORequest::READ);
3681
3682 ut_ad(has_index_lock);
3683 has_index_lock = false;
3684 rw_lock_x_unlock(dict_index_get_lock(index));
3685
3686 log_free_check();
3687
3688 if (!row_log_block_allocate(index->online_log->head)) {
3689 error = DB_OUT_OF_MEMORY;
3690 goto func_exit;
3691 }
3692
3693 byte* buf = index->online_log->head.block;
3694
3695 if (!os_file_read_no_error_handling(
3696 request, index->online_log->fd,
3697 buf, ofs, srv_sort_buf_size, 0)) {
3698 ib::error()
3699 << "Unable to read temporary file"
3700 " for index " << index->name;
3701 goto corruption;
3702 }
3703
3704 if (log_tmp_is_encrypted()) {
3705 if (!log_tmp_block_decrypt(
3706 buf, srv_sort_buf_size,
3707 index->online_log->crypt_head,
3708 ofs, index->table->space->id)) {
3709 error = DB_DECRYPTION_FAILED;
3710 goto func_exit;
3711 }
3712
3713 srv_stats.n_rowlog_blocks_decrypted.inc();
3714 memcpy(buf, index->online_log->crypt_head, srv_sort_buf_size);
3715 }
3716
3717#ifdef POSIX_FADV_DONTNEED
3718 /* Each block is read exactly once. Free up the file cache. */
3719 posix_fadvise(index->online_log->fd,
3720 ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
3721#endif /* POSIX_FADV_DONTNEED */
3722
3723 next_mrec = index->online_log->head.block;
3724 next_mrec_end = next_mrec + srv_sort_buf_size;
3725 }
3726
3727 if (mrec) {
3728 /* A partial record was read from the previous block.
3729 Copy the temporary buffer full, as we do not know the
3730 length of the record. Parse subsequent records from
3731 the bigger buffer index->online_log->head.block
3732 or index->online_log->tail.block. */
3733
3734 ut_ad(mrec == index->online_log->head.buf);
3735 ut_ad(mrec_end > mrec);
3736 ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
3737
3738 memcpy((mrec_t*) mrec_end, next_mrec,
3739 ulint((&index->online_log->head.buf)[1] - mrec_end));
3740 mrec = row_log_apply_op(
3741 index, dup, &error, offsets_heap, heap,
3742 has_index_lock, index->online_log->head.buf,
3743 (&index->online_log->head.buf)[1], offsets);
3744 if (error != DB_SUCCESS) {
3745 goto func_exit;
3746 } else if (UNIV_UNLIKELY(mrec == NULL)) {
3747 /* The record was not reassembled properly. */
3748 goto corruption;
3749 }
3750 /* The record was previously found out to be
3751 truncated. Now that the parse buffer was extended,
3752 it should proceed beyond the old end of the buffer. */
3753 ut_a(mrec > mrec_end);
3754
3755 index->online_log->head.bytes = ulint(mrec - mrec_end);
3756 next_mrec += index->online_log->head.bytes;
3757 }
3758
3759 ut_ad(next_mrec <= next_mrec_end);
3760 /* The following loop must not be parsing the temporary
3761 buffer, but head.block or tail.block. */
3762
3763 /* mrec!=NULL means that the next record starts from the
3764 middle of the block */
3765 ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
3766
3767#ifdef UNIV_DEBUG
3768 if (next_mrec_end == index->online_log->head.block
3769 + srv_sort_buf_size) {
3770 /* If tail.bytes == 0, next_mrec_end can also be at
3771 the end of tail.block. */
3772 if (index->online_log->tail.bytes == 0) {
3773 ut_ad(next_mrec == next_mrec_end);
3774 ut_ad(index->online_log->tail.blocks == 0);
3775 ut_ad(index->online_log->head.blocks == 0);
3776 ut_ad(index->online_log->head.bytes == 0);
3777 } else {
3778 ut_ad(next_mrec == index->online_log->head.block
3779 + index->online_log->head.bytes);
3780 ut_ad(index->online_log->tail.blocks
3781 > index->online_log->head.blocks);
3782 }
3783 } else if (next_mrec_end == index->online_log->tail.block
3784 + index->online_log->tail.bytes) {
3785 ut_ad(next_mrec == index->online_log->tail.block
3786 + index->online_log->head.bytes);
3787 ut_ad(index->online_log->tail.blocks == 0);
3788 ut_ad(index->online_log->head.blocks == 0);
3789 ut_ad(index->online_log->head.bytes
3790 <= index->online_log->tail.bytes);
3791 } else {
3792 ut_error;
3793 }
3794#endif /* UNIV_DEBUG */
3795
3796 mrec_end = next_mrec_end;
3797
3798 while (!trx_is_interrupted(trx)) {
3799 mrec = next_mrec;
3800 ut_ad(mrec < mrec_end);
3801
3802 if (!has_index_lock) {
3803 /* We are applying operations from a different
3804 block than the one that is being written to.
3805 We do not hold index->lock in order to
3806 allow other threads to concurrently buffer
3807 modifications. */
3808 ut_ad(mrec >= index->online_log->head.block);
3809 ut_ad(mrec_end == index->online_log->head.block
3810 + srv_sort_buf_size);
3811 ut_ad(index->online_log->head.bytes
3812 < srv_sort_buf_size);
3813
3814 /* Take the opportunity to do a redo log
3815 checkpoint if needed. */
3816 log_free_check();
3817 } else {
3818 /* We are applying operations from the last block.
3819 Do not allow other threads to buffer anything,
3820 so that we can finally catch up and synchronize. */
3821 ut_ad(index->online_log->head.blocks == 0);
3822 ut_ad(index->online_log->tail.blocks == 0);
3823 ut_ad(mrec_end == index->online_log->tail.block
3824 + index->online_log->tail.bytes);
3825 ut_ad(mrec >= index->online_log->tail.block);
3826 }
3827
3828 next_mrec = row_log_apply_op(
3829 index, dup, &error, offsets_heap, heap,
3830 has_index_lock, mrec, mrec_end, offsets);
3831
3832 if (error != DB_SUCCESS) {
3833 goto func_exit;
3834 } else if (next_mrec == next_mrec_end) {
3835 /* The record happened to end on a block boundary.
3836 Do we have more blocks left? */
3837 if (has_index_lock) {
3838 /* The index will be locked while
3839 applying the last block. */
3840 goto all_done;
3841 }
3842
3843 mrec = NULL;
3844process_next_block:
3845 rw_lock_x_lock(dict_index_get_lock(index));
3846 has_index_lock = true;
3847
3848 index->online_log->head.bytes = 0;
3849 index->online_log->head.blocks++;
3850 goto next_block;
3851 } else if (next_mrec != NULL) {
3852 ut_ad(next_mrec < next_mrec_end);
3853 index->online_log->head.bytes
3854 += ulint(next_mrec - mrec);
3855 } else if (has_index_lock) {
3856 /* When mrec is within tail.block, it should
3857 be a complete record, because we are holding
3858 index->lock and thus excluding the writer. */
3859 ut_ad(index->online_log->tail.blocks == 0);
3860 ut_ad(mrec_end == index->online_log->tail.block
3861 + index->online_log->tail.bytes);
3862 ut_ad(0);
3863 goto unexpected_eof;
3864 } else {
3865 memcpy(index->online_log->head.buf, mrec,
3866 ulint(mrec_end - mrec));
3867 mrec_end += ulint(index->online_log->head.buf - mrec);
3868 mrec = index->online_log->head.buf;
3869 goto process_next_block;
3870 }
3871 }
3872
3873interrupted:
3874 error = DB_INTERRUPTED;
3875func_exit:
3876 if (!has_index_lock) {
3877 rw_lock_x_lock(dict_index_get_lock(index));
3878 }
3879
3880 switch (error) {
3881 case DB_SUCCESS:
3882 break;
3883 case DB_INDEX_CORRUPT:
3884 if (((os_offset_t) index->online_log->tail.blocks + 1)
3885 * srv_sort_buf_size >= srv_online_max_size) {
3886 /* The log file grew too big. */
3887 error = DB_ONLINE_LOG_TOO_BIG;
3888 }
3889 /* fall through */
3890 default:
3891 /* We set the flag directly instead of invoking
3892 dict_set_corrupted_index_cache_only(index) here,
3893 because the index is not "public" yet. */
3894 index->type |= DICT_CORRUPT;
3895 }
3896
3897 mem_heap_free(heap);
3898 mem_heap_free(offsets_heap);
3899 row_log_block_free(index->online_log->head);
3900 ut_free(offsets);
3901 return(error);
3902}
3903
3904/** Apply the row log to the index upon completing index creation.
3905@param[in] trx transaction (for checking if the operation was
3906interrupted)
3907@param[in,out] index secondary index
3908@param[in,out] table MySQL table (for reporting duplicates)
3909@param[in,out] stage performance schema accounting object, used by
3910ALTER TABLE. stage->begin_phase_log_index() will be called initially and then
3911stage->inc() will be called for each block of log that is applied.
3912@return DB_SUCCESS, or error code on failure */
3913dberr_t
3914row_log_apply(
3915 const trx_t* trx,
3916 dict_index_t* index,
3917 struct TABLE* table,
3918 ut_stage_alter_t* stage)
3919{
3920 dberr_t error;
3921 row_log_t* log;
3922 row_merge_dup_t dup = { index, table, NULL, 0 };
3923 DBUG_ENTER("row_log_apply");
3924
3925 ut_ad(dict_index_is_online_ddl(index));
3926 ut_ad(!dict_index_is_clust(index));
3927
3928 stage->begin_phase_log_index();
3929
3930 log_free_check();
3931
3932 rw_lock_x_lock(dict_index_get_lock(index));
3933
3934 if (!dict_table_is_corrupted(index->table)) {
3935 error = row_log_apply_ops(trx, index, &dup, stage);
3936 } else {
3937 error = DB_SUCCESS;
3938 }
3939
3940 if (error != DB_SUCCESS) {
3941 ut_ad(index->table->space);
3942 /* We set the flag directly instead of invoking
3943 dict_set_corrupted_index_cache_only(index) here,
3944 because the index is not "public" yet. */
3945 index->type |= DICT_CORRUPT;
3946 index->table->drop_aborted = TRUE;
3947
3948 dict_index_set_online_status(index, ONLINE_INDEX_ABORTED);
3949 } else {
3950 ut_ad(dup.n_dup == 0);
3951 dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
3952 }
3953
3954 log = index->online_log;
3955 index->online_log = NULL;
3956 rw_lock_x_unlock(dict_index_get_lock(index));
3957
3958 row_log_free(log);
3959
3960 DBUG_RETURN(error);
3961}
3962