1/*****************************************************************************
2
3Copyright (c) 2005, 2017, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2014, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file row/row0merge.cc
22New index creation routines using a merge sort
23
24Created 12/4/2005 Jan Lindstrom
25Completed by Sunny Bains and Marko Makela
26*******************************************************/
27#include <my_global.h>
28#include <log.h>
29#include <sql_class.h>
30
31#include <math.h>
32
33#include "ha_prototypes.h"
34
35#include "row0merge.h"
36#include "row0ext.h"
37#include "row0log.h"
38#include "row0ins.h"
39#include "row0row.h"
40#include "row0sel.h"
41#include "log0crypt.h"
42#include "dict0crea.h"
43#include "trx0purge.h"
44#include "lock0lock.h"
45#include "pars0pars.h"
46#include "ut0sort.h"
47#include "row0ftsort.h"
48#include "row0import.h"
49#include "row0vers.h"
50#include "handler0alter.h"
51#include "btr0bulk.h"
52#include "fsp0sysspace.h"
53#include "ut0new.h"
54#include "ut0stage.h"
55#include "fil0crypt.h"
56
57float my_log2f(float n)
58{
59 /* log(n) / log(2) is log2. */
60 return (float)(log((double)n) / log((double)2));
61}
62
63/* Ignore posix_fadvise() on those platforms where it does not exist */
64#if defined _WIN32
65# define posix_fadvise(fd, offset, len, advice) /* nothing */
66#endif /* _WIN32 */
67
68/* Whether to disable file system cache */
69char srv_disable_sort_file_cache;
70
71/** Class that caches index row tuples made from a single cluster
72index page scan, and then insert into corresponding index tree */
73class index_tuple_info_t {
74public:
75 /** constructor
76 @param[in] heap memory heap
77 @param[in] index index to be created */
78 index_tuple_info_t(
79 mem_heap_t* heap,
80 dict_index_t* index) UNIV_NOTHROW
81 {
82 m_heap = heap;
83 m_index = index;
84 m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec());
85 }
86
87 /** destructor */
88 ~index_tuple_info_t()
89 {
90 UT_DELETE(m_dtuple_vec);
91 }
92
93 /** Get the index object
94 @return the index object */
95 dict_index_t* get_index() UNIV_NOTHROW
96 {
97 return(m_index);
98 }
99
100 /** Caches an index row into index tuple vector
101 @param[in] row table row
102 @param[in] ext externally stored column
103 prefixes, or NULL */
104 void add(
105 const dtuple_t* row,
106 const row_ext_t* ext) UNIV_NOTHROW
107 {
108 dtuple_t* dtuple;
109
110 dtuple = row_build_index_entry(row, ext, m_index, m_heap);
111
112 ut_ad(dtuple);
113
114 m_dtuple_vec->push_back(dtuple);
115 }
116
117 /** Insert spatial index rows cached in vector into spatial index
118 @param[in] trx_id transaction id
119 @param[in,out] row_heap memory heap
120 @param[in] pcur cluster index scanning cursor
121 @param[in,out] scan_mtr mini-transaction for pcur
122 @param[out] mtr_committed whether scan_mtr got committed
123 @return DB_SUCCESS if successful, else error number */
124 dberr_t insert(
125 trx_id_t trx_id,
126 mem_heap_t* row_heap,
127 btr_pcur_t* pcur,
128 mtr_t* scan_mtr,
129 bool* mtr_committed)
130 {
131 big_rec_t* big_rec;
132 rec_t* rec;
133 btr_cur_t ins_cur;
134 mtr_t mtr;
135 rtr_info_t rtr_info;
136 ulint* ins_offsets = NULL;
137 dberr_t error = DB_SUCCESS;
138 dtuple_t* dtuple;
139 ulint count = 0;
140 const ulint flag = BTR_NO_UNDO_LOG_FLAG
141 | BTR_NO_LOCKING_FLAG
142 | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG;
143
144 ut_ad(dict_index_is_spatial(m_index));
145
146 DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush",
147 log_sys.check_flush_or_checkpoint = true;
148 );
149
150 for (idx_tuple_vec::iterator it = m_dtuple_vec->begin();
151 it != m_dtuple_vec->end();
152 ++it) {
153 dtuple = *it;
154 ut_ad(dtuple);
155
156 if (log_sys.check_flush_or_checkpoint) {
157 if (!(*mtr_committed)) {
158 btr_pcur_move_to_prev_on_page(pcur);
159 btr_pcur_store_position(pcur, scan_mtr);
160 mtr_commit(scan_mtr);
161 *mtr_committed = true;
162 }
163
164 log_free_check();
165 }
166
167 mtr.start();
168 m_index->set_modified(mtr);
169
170 ins_cur.index = m_index;
171 rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index,
172 false);
173 rtr_info_update_btr(&ins_cur, &rtr_info);
174
175 btr_cur_search_to_nth_level(m_index, 0, dtuple,
176 PAGE_CUR_RTREE_INSERT,
177 BTR_MODIFY_LEAF, &ins_cur,
178 0, __FILE__, __LINE__,
179 &mtr);
180
181 /* It need to update MBR in parent entry,
182 so change search mode to BTR_MODIFY_TREE */
183 if (rtr_info.mbr_adj) {
184 mtr_commit(&mtr);
185 rtr_clean_rtr_info(&rtr_info, true);
186 rtr_init_rtr_info(&rtr_info, false, &ins_cur,
187 m_index, false);
188 rtr_info_update_btr(&ins_cur, &rtr_info);
189 mtr_start(&mtr);
190 m_index->set_modified(mtr);
191 btr_cur_search_to_nth_level(
192 m_index, 0, dtuple,
193 PAGE_CUR_RTREE_INSERT,
194 BTR_MODIFY_TREE, &ins_cur, 0,
195 __FILE__, __LINE__, &mtr);
196 }
197
198 error = btr_cur_optimistic_insert(
199 flag, &ins_cur, &ins_offsets, &row_heap,
200 dtuple, &rec, &big_rec, 0, NULL, &mtr);
201
202 if (error == DB_FAIL) {
203 ut_ad(!big_rec);
204 mtr.commit();
205 mtr.start();
206 m_index->set_modified(mtr);
207
208 rtr_clean_rtr_info(&rtr_info, true);
209 rtr_init_rtr_info(&rtr_info, false,
210 &ins_cur, m_index, false);
211
212 rtr_info_update_btr(&ins_cur, &rtr_info);
213 btr_cur_search_to_nth_level(
214 m_index, 0, dtuple,
215 PAGE_CUR_RTREE_INSERT,
216 BTR_MODIFY_TREE,
217 &ins_cur, 0,
218 __FILE__, __LINE__, &mtr);
219
220
221 error = btr_cur_pessimistic_insert(
222 flag, &ins_cur, &ins_offsets,
223 &row_heap, dtuple, &rec,
224 &big_rec, 0, NULL, &mtr);
225 }
226
227 DBUG_EXECUTE_IF(
228 "row_merge_ins_spatial_fail",
229 error = DB_FAIL;
230 );
231
232 if (error == DB_SUCCESS) {
233 if (rtr_info.mbr_adj) {
234 error = rtr_ins_enlarge_mbr(
235 &ins_cur, &mtr);
236 }
237
238 if (error == DB_SUCCESS) {
239 page_update_max_trx_id(
240 btr_cur_get_block(&ins_cur),
241 btr_cur_get_page_zip(&ins_cur),
242 trx_id, &mtr);
243 }
244 }
245
246 mtr_commit(&mtr);
247
248 rtr_clean_rtr_info(&rtr_info, true);
249 count++;
250 }
251
252 m_dtuple_vec->clear();
253
254 return(error);
255 }
256
257private:
258 /** Cache index rows made from a cluster index scan. Usually
259 for rows on single cluster index page */
260 typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> >
261 idx_tuple_vec;
262
263 /** vector used to cache index rows made from cluster index scan */
264 idx_tuple_vec* m_dtuple_vec;
265
266 /** the index being built */
267 dict_index_t* m_index;
268
269 /** memory heap for creating index tuples */
270 mem_heap_t* m_heap;
271};
272
273/* Maximum pending doc memory limit in bytes for a fts tokenization thread */
274#define FTS_PENDING_DOC_MEMORY_LIMIT 1000000
275
276/** Insert sorted data tuples to the index.
277@param[in] index index to be inserted
278@param[in] old_table old table
279@param[in] fd file descriptor
280@param[in,out] block file buffer
281@param[in] row_buf row_buf the sorted data tuples,
282or NULL if fd, block will be used instead
283@param[in,out] btr_bulk btr bulk instance
284@param[in,out] stage performance schema accounting object, used by
285ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
286and then stage->inc() will be called for each record that is processed.
287@return DB_SUCCESS or error number */
288static MY_ATTRIBUTE((warn_unused_result))
289dberr_t
290row_merge_insert_index_tuples(
291 dict_index_t* index,
292 const dict_table_t* old_table,
293 const pfs_os_file_t& fd,
294 row_merge_block_t* block,
295 const row_merge_buf_t* row_buf,
296 BtrBulk* btr_bulk,
297 const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
298 const double pct_progress, /*!< in: total progress
299 percent until now */
300 const double pct_cost, /*!< in: current progress percent
301 */
302 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
303 ulint space, /*!< in: space id */
304 ut_stage_alter_t* stage = NULL);
305
306/******************************************************//**
307Encode an index record. */
308static MY_ATTRIBUTE((nonnull))
309void
310row_merge_buf_encode(
311/*=================*/
312 byte** b, /*!< in/out: pointer to
313 current end of output buffer */
314 const dict_index_t* index, /*!< in: index */
315 const mtuple_t* entry, /*!< in: index fields
316 of the record to encode */
317 ulint n_fields) /*!< in: number of fields
318 in the entry */
319{
320 ulint size;
321 ulint extra_size;
322
323 size = rec_get_converted_size_temp(
324 index, entry->fields, n_fields, &extra_size);
325 ut_ad(size >= extra_size);
326
327 /* Encode extra_size + 1 */
328 if (extra_size + 1 < 0x80) {
329 *(*b)++ = (byte) (extra_size + 1);
330 } else {
331 ut_ad((extra_size + 1) < 0x8000);
332 *(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8));
333 *(*b)++ = (byte) (extra_size + 1);
334 }
335
336 rec_convert_dtuple_to_temp(*b + extra_size, index,
337 entry->fields, n_fields);
338
339 *b += size;
340}
341
342/******************************************************//**
343Allocate a sort buffer.
344@return own: sort buffer */
345static MY_ATTRIBUTE((malloc, nonnull))
346row_merge_buf_t*
347row_merge_buf_create_low(
348/*=====================*/
349 mem_heap_t* heap, /*!< in: heap where allocated */
350 dict_index_t* index, /*!< in: secondary index */
351 ulint max_tuples, /*!< in: maximum number of
352 data tuples */
353 ulint buf_size) /*!< in: size of the buffer,
354 in bytes */
355{
356 row_merge_buf_t* buf;
357
358 ut_ad(max_tuples > 0);
359
360 ut_ad(max_tuples <= srv_sort_buf_size);
361
362 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
363 buf->heap = heap;
364 buf->index = index;
365 buf->max_tuples = max_tuples;
366 buf->tuples = static_cast<mtuple_t*>(
367 ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
368 buf->tmp_tuples = buf->tuples + max_tuples;
369
370 return(buf);
371}
372
373/******************************************************//**
374Allocate a sort buffer.
375@return own: sort buffer */
376row_merge_buf_t*
377row_merge_buf_create(
378/*=================*/
379 dict_index_t* index) /*!< in: secondary index */
380{
381 row_merge_buf_t* buf;
382 ulint max_tuples;
383 ulint buf_size;
384 mem_heap_t* heap;
385
386 max_tuples = srv_sort_buf_size
387 / ut_max(static_cast<ulint>(1),
388 dict_index_get_min_size(index));
389
390 buf_size = (sizeof *buf);
391
392 heap = mem_heap_create(buf_size);
393
394 buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
395
396 return(buf);
397}
398
399/******************************************************//**
400Empty a sort buffer.
401@return sort buffer */
402row_merge_buf_t*
403row_merge_buf_empty(
404/*================*/
405 row_merge_buf_t* buf) /*!< in,own: sort buffer */
406{
407 ulint buf_size = sizeof *buf;
408 ulint max_tuples = buf->max_tuples;
409 mem_heap_t* heap = buf->heap;
410 dict_index_t* index = buf->index;
411 mtuple_t* tuples = buf->tuples;
412
413 mem_heap_empty(heap);
414
415 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
416 buf->heap = heap;
417 buf->index = index;
418 buf->max_tuples = max_tuples;
419 buf->tuples = tuples;
420 buf->tmp_tuples = buf->tuples + max_tuples;
421
422 return(buf);
423}
424
425/******************************************************//**
426Deallocate a sort buffer. */
427void
428row_merge_buf_free(
429/*===============*/
430 row_merge_buf_t* buf) /*!< in,own: sort buffer to be freed */
431{
432 ut_free(buf->tuples);
433 mem_heap_free(buf->heap);
434}
435
436/** Convert the field data from compact to redundant format.
437@param[in] row_field field to copy from
438@param[out] field field to copy to
439@param[in] len length of the field data
440@param[in] zip_size compressed BLOB page size,
441 zero for uncompressed BLOBs
442@param[in,out] heap memory heap where to allocate data when
443 converting to ROW_FORMAT=REDUNDANT, or NULL
444 when not to invoke
445 row_merge_buf_redundant_convert(). */
446static
447void
448row_merge_buf_redundant_convert(
449 const dfield_t* row_field,
450 dfield_t* field,
451 ulint len,
452 const page_size_t& page_size,
453 mem_heap_t* heap)
454{
455 ut_ad(field->type.mbminlen == 1);
456 ut_ad(field->type.mbmaxlen > 1);
457
458 byte* buf = (byte*) mem_heap_alloc(heap, len);
459 ulint field_len = row_field->len;
460 ut_ad(field_len <= len);
461
462 if (row_field->ext) {
463 const byte* field_data = static_cast<byte*>(
464 dfield_get_data(row_field));
465 ulint ext_len;
466
467 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
468 ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE,
469 field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
470
471 byte* data = btr_copy_externally_stored_field(
472 &ext_len, field_data, page_size, field_len, heap);
473
474 ut_ad(ext_len < len);
475
476 memcpy(buf, data, ext_len);
477 field_len = ext_len;
478 } else {
479 memcpy(buf, row_field->data, field_len);
480 }
481
482 memset(buf + field_len, 0x20, len - field_len);
483
484 dfield_set_data(field, buf, len);
485}
486
487/** Insert a data tuple into a sort buffer.
488@param[in,out] buf sort buffer
489@param[in] fts_index fts index to be created
490@param[in] old_table original table
491@param[in] new_table new table
492@param[in,out] psort_info parallel sort info
493@param[in] row table row
494@param[in] ext cache of externally stored
495 column prefixes, or NULL
496@param[in,out] doc_id Doc ID if we are creating
497 FTS index
498@param[in,out] conv_heap memory heap where to allocate data when
499 converting to ROW_FORMAT=REDUNDANT, or NULL
500 when not to invoke
501 row_merge_buf_redundant_convert()
502@param[in,out] err set if error occurs
503@param[in,out] v_heap heap memory to process data for virtual column
504@param[in,out] my_table mysql table object
505@param[in] trx transaction object
506@return number of rows added, 0 if out of space */
507static
508ulint
509row_merge_buf_add(
510 row_merge_buf_t* buf,
511 dict_index_t* fts_index,
512 const dict_table_t* old_table,
513 const dict_table_t* new_table,
514 fts_psort_t* psort_info,
515 const dtuple_t* row,
516 const row_ext_t* ext,
517 doc_id_t* doc_id,
518 mem_heap_t* conv_heap,
519 dberr_t* err,
520 mem_heap_t** v_heap,
521 TABLE* my_table,
522 trx_t* trx)
523{
524 ulint i;
525 const dict_index_t* index;
526 mtuple_t* entry;
527 dfield_t* field;
528 const dict_field_t* ifield;
529 ulint n_fields;
530 ulint data_size;
531 ulint extra_size;
532 ulint bucket = 0;
533 doc_id_t write_doc_id;
534 ulint n_row_added = 0;
535 DBUG_ENTER("row_merge_buf_add");
536
537 if (buf->n_tuples >= buf->max_tuples) {
538 DBUG_RETURN(0);
539 }
540
541 DBUG_EXECUTE_IF(
542 "ib_row_merge_buf_add_two",
543 if (buf->n_tuples >= 2) DBUG_RETURN(0););
544
545 UNIV_PREFETCH_R(row->fields);
546
547 /* If we are building FTS index, buf->index points to
548 the 'fts_sort_idx', and real FTS index is stored in
549 fts_index */
550 index = (buf->index->type & DICT_FTS) ? fts_index : buf->index;
551
552 /* create spatial index should not come here */
553 ut_ad(!dict_index_is_spatial(index));
554
555 n_fields = dict_index_get_n_fields(index);
556
557 entry = &buf->tuples[buf->n_tuples];
558 field = entry->fields = static_cast<dfield_t*>(
559 mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
560
561 data_size = 0;
562 extra_size = UT_BITS_IN_BYTES(unsigned(index->n_nullable));
563
564 ifield = dict_index_get_nth_field(index, 0);
565
566 for (i = 0; i < n_fields; i++, field++, ifield++) {
567 ulint len;
568 const dict_col_t* col;
569 ulint col_no;
570 ulint fixed_len;
571 const dfield_t* row_field;
572
573 col = ifield->col;
574 const dict_v_col_t* v_col = NULL;
575 if (col->is_virtual()) {
576 v_col = reinterpret_cast<const dict_v_col_t*>(col);
577 }
578
579 col_no = dict_col_get_no(col);
580
581 /* Process the Doc ID column */
582 if (*doc_id > 0
583 && col_no == index->table->fts->doc_col
584 && !col->is_virtual()) {
585 fts_write_doc_id((byte*) &write_doc_id, *doc_id);
586
587 /* Note: field->data now points to a value on the
588 stack: &write_doc_id after dfield_set_data(). Because
589 there is only one doc_id per row, it shouldn't matter.
590 We allocate a new buffer before we leave the function
591 later below. */
592
593 dfield_set_data(
594 field, &write_doc_id, sizeof(write_doc_id));
595
596 field->type.mtype = ifield->col->mtype;
597 field->type.prtype = ifield->col->prtype;
598 field->type.mbminlen = 0;
599 field->type.mbmaxlen = 0;
600 field->type.len = ifield->col->len;
601 } else {
602 /* Use callback to get the virtual column value */
603 if (col->is_virtual()) {
604 dict_index_t* clust_index
605 = dict_table_get_first_index(new_table);
606
607 row_field = innobase_get_computed_value(
608 row, v_col, clust_index,
609 v_heap, NULL, ifield, trx->mysql_thd,
610 my_table, old_table, NULL, NULL);
611
612 if (row_field == NULL) {
613 *err = DB_COMPUTE_VALUE_FAILED;
614 DBUG_RETURN(0);
615 }
616 dfield_copy(field, row_field);
617 } else {
618 row_field = dtuple_get_nth_field(row, col_no);
619 dfield_copy(field, row_field);
620 }
621
622
623 /* Tokenize and process data for FTS */
624 if (index->type & DICT_FTS) {
625 fts_doc_item_t* doc_item;
626 byte* value;
627 void* ptr;
628 const ulint max_trial_count = 10000;
629 ulint trial_count = 0;
630
631 /* fetch Doc ID if it already exists
632 in the row, and not supplied by the
633 caller. Even if the value column is
634 NULL, we still need to get the Doc
635 ID so to maintain the correct max
636 Doc ID */
637 if (*doc_id == 0) {
638 const dfield_t* doc_field;
639 doc_field = dtuple_get_nth_field(
640 row,
641 index->table->fts->doc_col);
642 *doc_id = (doc_id_t) mach_read_from_8(
643 static_cast<byte*>(
644 dfield_get_data(doc_field)));
645
646 if (*doc_id == 0) {
647 ib::warn() << "FTS Doc ID is"
648 " zero. Record"
649 " skipped";
650 DBUG_RETURN(0);
651 }
652 }
653
654 if (dfield_is_null(field)) {
655 n_row_added = 1;
656 continue;
657 }
658
659 ptr = ut_malloc_nokey(sizeof(*doc_item)
660 + field->len);
661
662 doc_item = static_cast<fts_doc_item_t*>(ptr);
663 value = static_cast<byte*>(ptr)
664 + sizeof(*doc_item);
665 memcpy(value, field->data, field->len);
666 field->data = value;
667
668 doc_item->field = field;
669 doc_item->doc_id = *doc_id;
670
671 bucket = *doc_id % fts_sort_pll_degree;
672
673 /* Add doc item to fts_doc_list */
674 mutex_enter(&psort_info[bucket].mutex);
675
676 if (psort_info[bucket].error == DB_SUCCESS) {
677 UT_LIST_ADD_LAST(
678 psort_info[bucket].fts_doc_list,
679 doc_item);
680 psort_info[bucket].memory_used +=
681 sizeof(*doc_item) + field->len;
682 } else {
683 ut_free(doc_item);
684 }
685
686 mutex_exit(&psort_info[bucket].mutex);
687
688 /* Sleep when memory used exceeds limit*/
689 while (psort_info[bucket].memory_used
690 > FTS_PENDING_DOC_MEMORY_LIMIT
691 && trial_count++ < max_trial_count) {
692 os_thread_sleep(1000);
693 }
694
695 n_row_added = 1;
696 continue;
697 }
698
699 if (field->len != UNIV_SQL_NULL
700 && col->mtype == DATA_MYSQL
701 && col->len != field->len) {
702 if (conv_heap != NULL) {
703 row_merge_buf_redundant_convert(
704 row_field, field, col->len,
705 dict_table_page_size(old_table),
706 conv_heap);
707 } else {
708 /* Field length mismatch should not
709 happen when rebuilding redundant row
710 format table. */
711 ut_ad(dict_table_is_comp(index->table));
712 }
713 }
714 }
715
716 len = dfield_get_len(field);
717
718 if (dfield_is_null(field)) {
719 ut_ad(!(col->prtype & DATA_NOT_NULL));
720 continue;
721 } else if (!ext) {
722 } else if (dict_index_is_clust(index)) {
723 /* Flag externally stored fields. */
724 const byte* buf = row_ext_lookup(ext, col_no,
725 &len);
726 if (UNIV_LIKELY_NULL(buf)) {
727 ut_a(buf != field_ref_zero);
728 if (i < dict_index_get_n_unique(index)) {
729 dfield_set_data(field, buf, len);
730 } else {
731 dfield_set_ext(field);
732 len = dfield_get_len(field);
733 }
734 }
735 } else if (!col->is_virtual()) {
736 /* Only non-virtual column are stored externally */
737 const byte* buf = row_ext_lookup(ext, col_no,
738 &len);
739 if (UNIV_LIKELY_NULL(buf)) {
740 ut_a(buf != field_ref_zero);
741 dfield_set_data(field, buf, len);
742 }
743 }
744
745 /* If a column prefix index, take only the prefix */
746
747 if (ifield->prefix_len) {
748 len = dtype_get_at_most_n_mbchars(
749 col->prtype,
750 col->mbminlen, col->mbmaxlen,
751 ifield->prefix_len,
752 len,
753 static_cast<char*>(dfield_get_data(field)));
754 dfield_set_len(field, len);
755 }
756
757 ut_ad(len <= col->len
758 || DATA_LARGE_MTYPE(col->mtype));
759
760 fixed_len = ifield->fixed_len;
761 if (fixed_len && !dict_table_is_comp(index->table)
762 && col->mbminlen != col->mbmaxlen) {
763 /* CHAR in ROW_FORMAT=REDUNDANT is always
764 fixed-length, but in the temporary file it is
765 variable-length for variable-length character
766 sets. */
767 fixed_len = 0;
768 }
769
770 if (fixed_len) {
771#ifdef UNIV_DEBUG
772 /* len should be between size calcualted base on
773 mbmaxlen and mbminlen */
774 ut_ad(len <= fixed_len);
775 ut_ad(!col->mbmaxlen || len >= col->mbminlen
776 * (fixed_len / col->mbmaxlen));
777
778 ut_ad(!dfield_is_ext(field));
779#endif /* UNIV_DEBUG */
780 } else if (dfield_is_ext(field)) {
781 extra_size += 2;
782 } else if (len < 128
783 || (!DATA_BIG_COL(col))) {
784 extra_size++;
785 } else {
786 /* For variable-length columns, we look up the
787 maximum length from the column itself. If this
788 is a prefix index column shorter than 256 bytes,
789 this will waste one byte. */
790 extra_size += 2;
791 }
792 data_size += len;
793 }
794
795 /* If this is FTS index, we already populated the sort buffer, return
796 here */
797 if (index->type & DICT_FTS) {
798 DBUG_RETURN(n_row_added);
799 }
800
801#ifdef UNIV_DEBUG
802 {
803 ulint size;
804 ulint extra;
805
806 size = rec_get_converted_size_temp(
807 index, entry->fields, n_fields, &extra);
808
809 ut_ad(data_size + extra_size == size);
810 ut_ad(extra_size == extra);
811 }
812#endif /* UNIV_DEBUG */
813
814 /* Add to the total size of the record in row_merge_block_t
815 the encoded length of extra_size and the extra bytes (extra_size).
816 See row_merge_buf_write() for the variable-length encoding
817 of extra_size. */
818 data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
819
820 /* Record size can exceed page size while converting to
821 redundant row format. But there is assert
822 ut_ad(size < srv_page_size) in rec_offs_data_size().
823 It may hit the assert before attempting to insert the row. */
824 if (conv_heap != NULL && data_size > srv_page_size) {
825 *err = DB_TOO_BIG_RECORD;
826 }
827
828 ut_ad(data_size < srv_sort_buf_size);
829
830 /* Reserve bytes for the end marker of row_merge_block_t. */
831 if (buf->total_size + data_size >= srv_sort_buf_size) {
832 DBUG_RETURN(0);
833 }
834
835 buf->total_size += data_size;
836 buf->n_tuples++;
837 n_row_added++;
838
839 field = entry->fields;
840
841 /* Copy the data fields. */
842
843 do {
844 dfield_dup(field++, buf->heap);
845 } while (--n_fields);
846
847 if (conv_heap != NULL) {
848 mem_heap_empty(conv_heap);
849 }
850
851 DBUG_RETURN(n_row_added);
852}
853
854/*************************************************************//**
855Report a duplicate key. */
856void
857row_merge_dup_report(
858/*=================*/
859 row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */
860 const dfield_t* entry) /*!< in: duplicate index entry */
861{
862 if (!dup->n_dup++) {
863 /* Only report the first duplicate record,
864 but count all duplicate records. */
865 innobase_fields_to_mysql(dup->table, dup->index, entry);
866 }
867}
868
869/*************************************************************//**
870Compare two tuples.
871@return positive, 0, negative if a is greater, equal, less, than b,
872respectively */
873static MY_ATTRIBUTE((warn_unused_result))
874int
875row_merge_tuple_cmp(
876/*================*/
877 ulint n_uniq, /*!< in: number of unique fields */
878 ulint n_field,/*!< in: number of fields */
879 const mtuple_t& a, /*!< in: first tuple to be compared */
880 const mtuple_t& b, /*!< in: second tuple to be compared */
881 row_merge_dup_t* dup) /*!< in/out: for reporting duplicates,
882 NULL if non-unique index */
883{
884 int cmp;
885 const dfield_t* af = a.fields;
886 const dfield_t* bf = b.fields;
887 ulint n = n_uniq;
888
889 ut_ad(n_uniq > 0);
890 ut_ad(n_uniq <= n_field);
891
892 /* Compare the fields of the tuples until a difference is
893 found or we run out of fields to compare. If !cmp at the
894 end, the tuples are equal. */
895 do {
896 cmp = cmp_dfield_dfield(af++, bf++);
897 } while (!cmp && --n);
898
899 if (cmp) {
900 return(cmp);
901 }
902
903 if (dup) {
904 /* Report a duplicate value error if the tuples are
905 logically equal. NULL columns are logically inequal,
906 although they are equal in the sorting order. Find
907 out if any of the fields are NULL. */
908 for (const dfield_t* df = a.fields; df != af; df++) {
909 if (dfield_is_null(df)) {
910 goto no_report;
911 }
912 }
913
914 row_merge_dup_report(dup, a.fields);
915 }
916
917no_report:
918 /* The n_uniq fields were equal, but we compare all fields so
919 that we will get the same (internal) order as in the B-tree. */
920 for (n = n_field - n_uniq + 1; --n; ) {
921 cmp = cmp_dfield_dfield(af++, bf++);
922 if (cmp) {
923 return(cmp);
924 }
925 }
926
927 /* This should never be reached, except in a secondary index
928 when creating a secondary index and a PRIMARY KEY, and there
929 is a duplicate in the PRIMARY KEY that has not been detected
930 yet. Internally, an index must never contain duplicates. */
931 return(cmp);
932}
933
934/** Wrapper for row_merge_tuple_sort() to inject some more context to
935UT_SORT_FUNCTION_BODY().
936@param tuples array of tuples that being sorted
937@param aux work area, same size as tuples[]
938@param low lower bound of the sorting area, inclusive
939@param high upper bound of the sorting area, inclusive */
940#define row_merge_tuple_sort_ctx(tuples, aux, low, high) \
941 row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
942/** Wrapper for row_merge_tuple_cmp() to inject some more context to
943UT_SORT_FUNCTION_BODY().
944@param a first tuple to be compared
945@param b second tuple to be compared
946@return positive, 0, negative, if a is greater, equal, less, than b,
947respectively */
948#define row_merge_tuple_cmp_ctx(a,b) \
949 row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
950
951/**********************************************************************//**
952Merge sort the tuple buffer in main memory. */
953static
954void
955row_merge_tuple_sort(
956/*=================*/
957 ulint n_uniq, /*!< in: number of unique fields */
958 ulint n_field,/*!< in: number of fields */
959 row_merge_dup_t* dup, /*!< in/out: reporter of duplicates
960 (NULL if non-unique index) */
961 mtuple_t* tuples, /*!< in/out: tuples */
962 mtuple_t* aux, /*!< in/out: work area */
963 ulint low, /*!< in: lower bound of the
964 sorting area, inclusive */
965 ulint high) /*!< in: upper bound of the
966 sorting area, exclusive */
967{
968 ut_ad(n_field > 0);
969 ut_ad(n_uniq <= n_field);
970
971 UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
972 tuples, aux, low, high, row_merge_tuple_cmp_ctx);
973}
974
975/******************************************************//**
976Sort a buffer. */
977void
978row_merge_buf_sort(
979/*===============*/
980 row_merge_buf_t* buf, /*!< in/out: sort buffer */
981 row_merge_dup_t* dup) /*!< in/out: reporter of duplicates
982 (NULL if non-unique index) */
983{
984 ut_ad(!dict_index_is_spatial(buf->index));
985
986 row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
987 dict_index_get_n_fields(buf->index),
988 dup,
989 buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
990}
991
992/******************************************************//**
993Write a buffer to a block. */
994void
995row_merge_buf_write(
996/*================*/
997 const row_merge_buf_t* buf, /*!< in: sorted buffer */
998 const merge_file_t* of UNIV_UNUSED,
999 /*!< in: output file */
1000 row_merge_block_t* block) /*!< out: buffer for writing to file */
1001{
1002 const dict_index_t* index = buf->index;
1003 ulint n_fields= dict_index_get_n_fields(index);
1004 byte* b = &block[0];
1005
1006 DBUG_ENTER("row_merge_buf_write");
1007
1008 for (ulint i = 0; i < buf->n_tuples; i++) {
1009 const mtuple_t* entry = &buf->tuples[i];
1010
1011 row_merge_buf_encode(&b, index, entry, n_fields);
1012 ut_ad(b < &block[srv_sort_buf_size]);
1013
1014 DBUG_LOG("ib_merge_sort",
1015 reinterpret_cast<const void*>(b) << ','
1016 << of->fd << ',' << of->offset << ' ' <<
1017 i << ": " <<
1018 rec_printer(entry->fields, n_fields).str());
1019 }
1020
1021 /* Write an "end-of-chunk" marker. */
1022 ut_a(b < &block[srv_sort_buf_size]);
1023 ut_a(b == &block[0] + buf->total_size);
1024 *b++ = 0;
1025#ifdef UNIV_DEBUG_VALGRIND
1026 /* The rest of the block is uninitialized. Initialize it
1027 to avoid bogus warnings. */
1028 memset(b, 0xff, &block[srv_sort_buf_size] - b);
1029#endif /* UNIV_DEBUG_VALGRIND */
1030 DBUG_LOG("ib_merge_sort",
1031 "write " << reinterpret_cast<const void*>(b) << ','
1032 << of->fd << ',' << of->offset << " EOF");
1033 DBUG_VOID_RETURN;
1034}
1035
1036/******************************************************//**
1037Create a memory heap and allocate space for row_merge_rec_offsets()
1038and mrec_buf_t[3].
1039@return memory heap */
1040static
1041mem_heap_t*
1042row_merge_heap_create(
1043/*==================*/
1044 const dict_index_t* index, /*!< in: record descriptor */
1045 mrec_buf_t** buf, /*!< out: 3 buffers */
1046 ulint** offsets1, /*!< out: offsets */
1047 ulint** offsets2) /*!< out: offsets */
1048{
1049 ulint i = 1 + REC_OFFS_HEADER_SIZE
1050 + dict_index_get_n_fields(index);
1051 mem_heap_t* heap = mem_heap_create(2 * i * sizeof **offsets1
1052 + 3 * sizeof **buf);
1053
1054 *buf = static_cast<mrec_buf_t*>(
1055 mem_heap_alloc(heap, 3 * sizeof **buf));
1056 *offsets1 = static_cast<ulint*>(
1057 mem_heap_alloc(heap, i * sizeof **offsets1));
1058 *offsets2 = static_cast<ulint*>(
1059 mem_heap_alloc(heap, i * sizeof **offsets2));
1060
1061 (*offsets1)[0] = (*offsets2)[0] = i;
1062 (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
1063
1064 return(heap);
1065}
1066
1067/** Read a merge block from the file system.
1068@return whether the request was completed successfully */
1069bool
1070row_merge_read(
1071/*===========*/
1072 const pfs_os_file_t& fd, /*!< in: file descriptor */
1073 ulint offset, /*!< in: offset where to read
1074 in number of row_merge_block_t
1075 elements */
1076 row_merge_block_t* buf, /*!< out: data */
1077 row_merge_block_t* crypt_buf, /*!< in: crypt buf or NULL */
1078 ulint space) /*!< in: space id */
1079{
1080 os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size;
1081
1082 DBUG_ENTER("row_merge_read");
1083 DBUG_LOG("ib_merge_sort", "fd=" << fd << " ofs=" << ofs);
1084 DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE););
1085
1086 IORequest request(IORequest::READ);
1087 const bool success = os_file_read_no_error_handling(
1088 request, fd, buf, ofs, srv_sort_buf_size, 0);
1089
1090 /* If encryption is enabled decrypt buffer */
1091 if (success && log_tmp_is_encrypted()) {
1092 if (!log_tmp_block_decrypt(buf, srv_sort_buf_size,
1093 crypt_buf, ofs, space)) {
1094 return (FALSE);
1095 }
1096
1097 srv_stats.n_merge_blocks_decrypted.inc();
1098 memcpy(buf, crypt_buf, srv_sort_buf_size);
1099 }
1100
1101#ifdef POSIX_FADV_DONTNEED
1102 /* Each block is read exactly once. Free up the file cache. */
1103 posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
1104#endif /* POSIX_FADV_DONTNEED */
1105
1106 if (!success) {
1107 ib::error() << "Failed to read merge block at " << ofs;
1108 }
1109
1110 DBUG_RETURN(success);
1111}
1112
1113/********************************************************************//**
1114Write a merge block to the file system.
1115@return whether the request was completed successfully */
1116UNIV_INTERN
1117bool
1118row_merge_write(
1119/*============*/
1120 const pfs_os_file_t& fd, /*!< in: file descriptor */
1121 ulint offset, /*!< in: offset where to write,
1122 in number of row_merge_block_t elements */
1123 const void* buf, /*!< in: data */
1124 void* crypt_buf, /*!< in: crypt buf or NULL */
1125 ulint space) /*!< in: space id */
1126{
1127 size_t buf_len = srv_sort_buf_size;
1128 os_offset_t ofs = buf_len * (os_offset_t) offset;
1129 void* out_buf = (void *)buf;
1130
1131 DBUG_ENTER("row_merge_write");
1132 DBUG_LOG("ib_merge_sort", "fd=" << fd << " ofs=" << ofs);
1133 DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE););
1134
1135 /* For encrypted tables, encrypt data before writing */
1136 if (log_tmp_is_encrypted()) {
1137 if (!log_tmp_block_encrypt(static_cast<const byte*>(buf),
1138 buf_len,
1139 static_cast<byte*>(crypt_buf),
1140 ofs, space)) {
1141 return false;
1142 }
1143
1144 srv_stats.n_merge_blocks_encrypted.inc();
1145 out_buf = crypt_buf;
1146 }
1147
1148 IORequest request(IORequest::WRITE);
1149 const bool success = os_file_write(
1150 request, "(merge)", fd, out_buf, ofs, buf_len);
1151
1152#ifdef POSIX_FADV_DONTNEED
1153 /* The block will be needed on the next merge pass,
1154 but it can be evicted from the file cache meanwhile. */
1155 posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
1156#endif /* POSIX_FADV_DONTNEED */
1157
1158 DBUG_RETURN(success);
1159}
1160
1161/********************************************************************//**
1162Read a merge record.
1163@return pointer to next record, or NULL on I/O error or end of list */
1164const byte*
1165row_merge_read_rec(
1166/*===============*/
1167 row_merge_block_t* block, /*!< in/out: file buffer */
1168 mrec_buf_t* buf, /*!< in/out: secondary buffer */
1169 const byte* b, /*!< in: pointer to record */
1170 const dict_index_t* index, /*!< in: index of the record */
1171 const pfs_os_file_t& fd, /*!< in: file descriptor */
1172 ulint* foffs, /*!< in/out: file offset */
1173 const mrec_t** mrec, /*!< out: pointer to merge record,
1174 or NULL on end of list
1175 (non-NULL on I/O error) */
1176 ulint* offsets,/*!< out: offsets of mrec */
1177 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
1178 ulint space) /*!< in: space id */
1179{
1180 ulint extra_size;
1181 ulint data_size;
1182 ulint avail_size;
1183
1184 ut_ad(b >= &block[0]);
1185 ut_ad(b < &block[srv_sort_buf_size]);
1186
1187 ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
1188 + dict_index_get_n_fields(index));
1189
1190 DBUG_ENTER("row_merge_read_rec");
1191
1192 extra_size = *b++;
1193
1194 if (UNIV_UNLIKELY(!extra_size)) {
1195 /* End of list */
1196 *mrec = NULL;
1197 DBUG_LOG("ib_merge_sort",
1198 "read " << reinterpret_cast<const void*>(b) << ',' <<
1199 reinterpret_cast<const void*>(block) << ',' <<
1200 fd << ',' << *foffs << " EOF");
1201 DBUG_RETURN(NULL);
1202 }
1203
1204 if (extra_size >= 0x80) {
1205 /* Read another byte of extra_size. */
1206
1207 if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) {
1208 if (!row_merge_read(fd, ++(*foffs), block,
1209 crypt_block,
1210 space)) {
1211err_exit:
1212 /* Signal I/O error. */
1213 *mrec = b;
1214 DBUG_RETURN(NULL);
1215 }
1216
1217 /* Wrap around to the beginning of the buffer. */
1218 b = &block[0];
1219 }
1220
1221 extra_size = (extra_size & 0x7f) << 8;
1222 extra_size |= *b++;
1223 }
1224
1225 /* Normalize extra_size. Above, value 0 signals "end of list". */
1226 extra_size--;
1227
1228 /* Read the extra bytes. */
1229
1230 if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) {
1231 /* The record spans two blocks. Copy the entire record
1232 to the auxiliary buffer and handle this as a special
1233 case. */
1234
1235 avail_size = ulint(&block[srv_sort_buf_size] - b);
1236 ut_ad(avail_size < sizeof *buf);
1237 memcpy(*buf, b, avail_size);
1238
1239 if (!row_merge_read(fd, ++(*foffs), block,
1240 crypt_block,
1241 space)) {
1242
1243 goto err_exit;
1244 }
1245
1246 /* Wrap around to the beginning of the buffer. */
1247 b = &block[0];
1248
1249 /* Copy the record. */
1250 memcpy(*buf + avail_size, b, extra_size - avail_size);
1251 b += extra_size - avail_size;
1252
1253 *mrec = *buf + extra_size;
1254
1255 rec_init_offsets_temp(*mrec, index, offsets);
1256
1257 data_size = rec_offs_data_size(offsets);
1258
1259 /* These overflows should be impossible given that
1260 records are much smaller than either buffer, and
1261 the record starts near the beginning of each buffer. */
1262 ut_a(extra_size + data_size < sizeof *buf);
1263 ut_a(b + data_size < &block[srv_sort_buf_size]);
1264
1265 /* Copy the data bytes. */
1266 memcpy(*buf + extra_size, b, data_size);
1267 b += data_size;
1268
1269 goto func_exit;
1270 }
1271
1272 *mrec = b + extra_size;
1273
1274 rec_init_offsets_temp(*mrec, index, offsets);
1275
1276 data_size = rec_offs_data_size(offsets);
1277 ut_ad(extra_size + data_size < sizeof *buf);
1278
1279 b += extra_size + data_size;
1280
1281 if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) {
1282 /* The record fits entirely in the block.
1283 This is the normal case. */
1284 goto func_exit;
1285 }
1286
1287 /* The record spans two blocks. Copy it to buf. */
1288
1289 b -= extra_size + data_size;
1290 avail_size = ulint(&block[srv_sort_buf_size] - b);
1291 memcpy(*buf, b, avail_size);
1292 *mrec = *buf + extra_size;
1293
1294 /* We cannot invoke rec_offs_make_valid() here, because there
1295 are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
1296 Similarly, rec_offs_validate() would fail, because it invokes
1297 rec_get_status(). */
1298 ut_d(offsets[2] = (ulint) *mrec);
1299 ut_d(offsets[3] = (ulint) index);
1300
1301 if (!row_merge_read(fd, ++(*foffs), block,
1302 crypt_block,
1303 space)) {
1304
1305 goto err_exit;
1306 }
1307
1308 /* Wrap around to the beginning of the buffer. */
1309 b = &block[0];
1310
1311 /* Copy the rest of the record. */
1312 memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
1313 b += extra_size + data_size - avail_size;
1314
1315func_exit:
1316 DBUG_LOG("ib_merge_sort",
1317 reinterpret_cast<const void*>(b) << ',' <<
1318 reinterpret_cast<const void*>(block)
1319 << ",fd=" << fd << ',' << *foffs << ": "
1320 << rec_printer(*mrec, 0, offsets).str());
1321 DBUG_RETURN(b);
1322}
1323
1324/********************************************************************//**
1325Write a merge record. */
1326static
1327void
1328row_merge_write_rec_low(
1329/*====================*/
1330 byte* b, /*!< out: buffer */
1331 ulint e, /*!< in: encoded extra_size */
1332#ifndef DBUG_OFF
1333 ulint size, /*!< in: total size to write */
1334 const pfs_os_file_t& fd, /*!< in: file descriptor */
1335 ulint foffs, /*!< in: file offset */
1336#endif /* !DBUG_OFF */
1337 const mrec_t* mrec, /*!< in: record to write */
1338 const ulint* offsets)/*!< in: offsets of mrec */
1339#ifdef DBUG_OFF
1340# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \
1341 row_merge_write_rec_low(b, e, mrec, offsets)
1342#endif /* DBUG_OFF */
1343{
1344 DBUG_ENTER("row_merge_write_rec_low");
1345
1346#ifndef DBUG_OFF
1347 const byte* const end = b + size;
1348#endif /* DBUG_OFF */
1349 DBUG_ASSERT(e == rec_offs_extra_size(offsets) + 1);
1350
1351 DBUG_LOG("ib_merge_sort",
1352 reinterpret_cast<const void*>(b) << ",fd=" << fd << ','
1353 << foffs << ": " << rec_printer(mrec, 0, offsets).str());
1354
1355 if (e < 0x80) {
1356 *b++ = (byte) e;
1357 } else {
1358 *b++ = (byte) (0x80 | (e >> 8));
1359 *b++ = (byte) e;
1360 }
1361
1362 memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
1363 DBUG_SLOW_ASSERT(b + rec_offs_size(offsets) == end);
1364 DBUG_VOID_RETURN;
1365}
1366
1367/********************************************************************//**
1368Write a merge record.
1369@return pointer to end of block, or NULL on error */
1370static
1371byte*
1372row_merge_write_rec(
1373/*================*/
1374 row_merge_block_t* block, /*!< in/out: file buffer */
1375 mrec_buf_t* buf, /*!< in/out: secondary buffer */
1376 byte* b, /*!< in: pointer to end of block */
1377 const pfs_os_file_t& fd, /*!< in: file descriptor */
1378 ulint* foffs, /*!< in/out: file offset */
1379 const mrec_t* mrec, /*!< in: record to write */
1380 const ulint* offsets,/*!< in: offsets of mrec */
1381 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
1382 ulint space) /*!< in: space id */
1383{
1384 ulint extra_size;
1385 ulint size;
1386 ulint avail_size;
1387
1388 ut_ad(block);
1389 ut_ad(buf);
1390 ut_ad(b >= &block[0]);
1391 ut_ad(b < &block[srv_sort_buf_size]);
1392 ut_ad(mrec);
1393 ut_ad(foffs);
1394 ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]);
1395 ut_ad(mrec < buf[0] || mrec > buf[1]);
1396
1397 /* Normalize extra_size. Value 0 signals "end of list". */
1398 extra_size = rec_offs_extra_size(offsets) + 1;
1399
1400 size = extra_size + (extra_size >= 0x80)
1401 + rec_offs_data_size(offsets);
1402
1403 if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) {
1404 /* The record spans two blocks.
1405 Copy it to the temporary buffer first. */
1406 avail_size = ulint(&block[srv_sort_buf_size] - b);
1407
1408 row_merge_write_rec_low(buf[0],
1409 extra_size, size, fd, *foffs,
1410 mrec, offsets);
1411
1412 /* Copy the head of the temporary buffer, write
1413 the completed block, and copy the tail of the
1414 record to the head of the new block. */
1415 memcpy(b, buf[0], avail_size);
1416
1417 if (!row_merge_write(fd, (*foffs)++, block,
1418 crypt_block,
1419 space)) {
1420 return(NULL);
1421 }
1422
1423 UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1424
1425 /* Copy the rest. */
1426 b = &block[0];
1427 memcpy(b, buf[0] + avail_size, size - avail_size);
1428 b += size - avail_size;
1429 } else {
1430 row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
1431 mrec, offsets);
1432 b += size;
1433 }
1434
1435 return(b);
1436}
1437
1438/********************************************************************//**
1439Write an end-of-list marker.
1440@return pointer to end of block, or NULL on error */
1441static
1442byte*
1443row_merge_write_eof(
1444/*================*/
1445 row_merge_block_t* block, /*!< in/out: file buffer */
1446 byte* b, /*!< in: pointer to end of block */
1447 const pfs_os_file_t& fd, /*!< in: file descriptor */
1448 ulint* foffs, /*!< in/out: file offset */
1449 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
1450 ulint space) /*!< in: space id */
1451{
1452 ut_ad(block);
1453 ut_ad(b >= &block[0]);
1454 ut_ad(b < &block[srv_sort_buf_size]);
1455 ut_ad(foffs);
1456
1457 DBUG_ENTER("row_merge_write_eof");
1458 DBUG_LOG("ib_merge_sort",
1459 reinterpret_cast<const void*>(b) << ',' <<
1460 reinterpret_cast<const void*>(block) <<
1461 ",fd=" << fd << ',' << *foffs);
1462
1463 *b++ = 0;
1464 UNIV_MEM_ASSERT_RW(&block[0], b - &block[0]);
1465 UNIV_MEM_ASSERT_W(&block[0], srv_sort_buf_size);
1466
1467#ifdef UNIV_DEBUG_VALGRIND
1468 /* The rest of the block is uninitialized. Initialize it
1469 to avoid bogus warnings. */
1470 memset(b, 0xff, ulint(&block[srv_sort_buf_size] - b));
1471#endif /* UNIV_DEBUG_VALGRIND */
1472
1473 if (!row_merge_write(fd, (*foffs)++, block, crypt_block, space)) {
1474 DBUG_RETURN(NULL);
1475 }
1476
1477 UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1478 DBUG_RETURN(&block[0]);
1479}
1480
1481/** Create a temporary file if it has not been created already.
1482@param[in,out] tmpfd temporary file handle
1483@param[in] path location for creating temporary file
1484@return true on success, false on error */
1485static MY_ATTRIBUTE((warn_unused_result))
1486bool
1487row_merge_tmpfile_if_needed(
1488 pfs_os_file_t* tmpfd,
1489 const char* path)
1490{
1491 if (*tmpfd == OS_FILE_CLOSED) {
1492 *tmpfd = row_merge_file_create_low(path);
1493 if (*tmpfd != OS_FILE_CLOSED) {
1494 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1495 }
1496 }
1497
1498 return(*tmpfd != OS_FILE_CLOSED);
1499}
1500
1501/** Create a temporary file for merge sort if it was not created already.
1502@param[in,out] file merge file structure
1503@param[in] nrec number of records in the file
1504@param[in] path location for creating temporary file
1505@return true on success, false on error */
1506static MY_ATTRIBUTE((warn_unused_result))
1507bool
1508row_merge_file_create_if_needed(
1509 merge_file_t* file,
1510 pfs_os_file_t* tmpfd,
1511 ulint nrec,
1512 const char* path)
1513{
1514 ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED);
1515 if (file->fd == OS_FILE_CLOSED && row_merge_file_create(file, path)!= OS_FILE_CLOSED) {
1516 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1517 if (!row_merge_tmpfile_if_needed(tmpfd, path) ) {
1518 return(false);
1519 }
1520
1521 file->n_rec = nrec;
1522 }
1523
1524 ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED);
1525 return(file->fd != OS_FILE_CLOSED);
1526}
1527
1528/** Copy the merge data tuple from another merge data tuple.
1529@param[in] mtuple source merge data tuple
1530@param[in,out] prev_mtuple destination merge data tuple
1531@param[in] n_unique number of unique fields exist in the mtuple
1532@param[in,out] heap memory heap where last_mtuple allocated */
1533static
1534void
1535row_mtuple_create(
1536 const mtuple_t* mtuple,
1537 mtuple_t* prev_mtuple,
1538 ulint n_unique,
1539 mem_heap_t* heap)
1540{
1541 memcpy(prev_mtuple->fields, mtuple->fields,
1542 n_unique * sizeof *mtuple->fields);
1543
1544 dfield_t* field = prev_mtuple->fields;
1545
1546 for (ulint i = 0; i < n_unique; i++) {
1547 dfield_dup(field++, heap);
1548 }
1549}
1550
1551/** Compare two merge data tuples.
1552@param[in] prev_mtuple merge data tuple
1553@param[in] current_mtuple merge data tuple
1554@param[in,out] dup reporter of duplicates
1555@retval positive, 0, negative if current_mtuple is greater, equal, less, than
1556last_mtuple. */
1557static
1558int
1559row_mtuple_cmp(
1560 const mtuple_t* prev_mtuple,
1561 const mtuple_t* current_mtuple,
1562 row_merge_dup_t* dup)
1563{
1564 ut_ad(dict_index_is_clust(dup->index));
1565 const ulint n_unique = dict_index_get_n_unique(dup->index);
1566
1567 return(row_merge_tuple_cmp(
1568 n_unique, n_unique, *current_mtuple, *prev_mtuple, dup));
1569}
1570
1571/** Insert cached spatial index rows.
1572@param[in] trx_id transaction id
1573@param[in] sp_tuples cached spatial rows
1574@param[in] num_spatial number of spatial indexes
1575@param[in,out] row_heap heap for insert
1576@param[in,out] sp_heap heap for tuples
1577@param[in,out] pcur cluster index cursor
1578@param[in,out] mtr mini transaction
1579@param[in,out] mtr_committed whether scan_mtr got committed
1580@return DB_SUCCESS or error number */
1581static
1582dberr_t
1583row_merge_spatial_rows(
1584 trx_id_t trx_id,
1585 index_tuple_info_t** sp_tuples,
1586 ulint num_spatial,
1587 mem_heap_t* row_heap,
1588 mem_heap_t* sp_heap,
1589 btr_pcur_t* pcur,
1590 mtr_t* mtr,
1591 bool* mtr_committed)
1592{
1593 dberr_t err = DB_SUCCESS;
1594
1595 if (sp_tuples == NULL) {
1596 return(DB_SUCCESS);
1597 }
1598
1599 ut_ad(sp_heap != NULL);
1600
1601 for (ulint j = 0; j < num_spatial; j++) {
1602 err = sp_tuples[j]->insert(
1603 trx_id, row_heap,
1604 pcur, mtr, mtr_committed);
1605
1606 if (err != DB_SUCCESS) {
1607 return(err);
1608 }
1609 }
1610
1611 mem_heap_empty(sp_heap);
1612
1613 return(err);
1614}
1615
1616/** Check if the geometry field is valid.
1617@param[in] row the row
1618@param[in] index spatial index
1619@return true if it's valid, false if it's invalid. */
1620static
1621bool
1622row_geo_field_is_valid(
1623 const dtuple_t* row,
1624 dict_index_t* index)
1625{
1626 const dict_field_t* ind_field
1627 = dict_index_get_nth_field(index, 0);
1628 const dict_col_t* col
1629 = ind_field->col;
1630 ulint col_no
1631 = dict_col_get_no(col);
1632 const dfield_t* dfield
1633 = dtuple_get_nth_field(row, col_no);
1634
1635 if (dfield_is_null(dfield)
1636 || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) {
1637 return(false);
1638 }
1639
1640 return(true);
1641}
1642
1643/** Reads clustered index of the table and create temporary files
1644containing the index entries for the indexes to be built.
1645@param[in] trx transaction
1646@param[in,out] table MySQL table object, for reporting erroneous
1647 records
1648@param[in] old_table table where rows are read from
1649@param[in] new_table table where indexes are created; identical to
1650 old_table unless creating a PRIMARY KEY
1651@param[in] online true if creating indexes online
1652@param[in] index indexes to be created
1653@param[in] fts_sort_idx full-text index to be created, or NULL
1654@param[in] psort_info parallel sort info for fts_sort_idx creation,
1655 or NULL
1656@param[in] files temporary files
1657@param[in] key_numbers MySQL key numbers to create
1658@param[in] n_index number of indexes to create
1659@param[in] defaults default values of added, changed columns, or NULL
1660@param[in] add_v newly added virtual columns along with indexes
1661@param[in] col_map mapping of old column numbers to new ones, or
1662NULL if old_table == new_table
1663@param[in] add_autoinc number of added AUTO_INCREMENT columns, or
1664ULINT_UNDEFINED if none is added
1665@param[in,out] sequence autoinc sequence
1666@param[in,out] block file buffer
1667@param[in] skip_pk_sort whether the new PRIMARY KEY will follow
1668existing order
1669@param[in,out] tmpfd temporary file handle
1670@param[in,out] stage performance schema accounting object, used by
1671ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and
1672stage->inc() will be called for each page read.
1673@param[in] pct_cost percent of task weight out of total alter job
1674@param[in,out] crypt_block crypted file buffer
1675@param[in] eval_table mysql table used to evaluate virtual column
1676 value, see innobase_get_computed_value().
1677@return DB_SUCCESS or error */
1678static MY_ATTRIBUTE((warn_unused_result))
1679dberr_t
1680row_merge_read_clustered_index(
1681 trx_t* trx,
1682 struct TABLE* table,
1683 const dict_table_t* old_table,
1684 const dict_table_t* new_table,
1685 bool online,
1686 dict_index_t** index,
1687 dict_index_t* fts_sort_idx,
1688 fts_psort_t* psort_info,
1689 merge_file_t* files,
1690 const ulint* key_numbers,
1691 ulint n_index,
1692 const dtuple_t* defaults,
1693 const dict_add_v_col_t* add_v,
1694 const ulint* col_map,
1695 ulint add_autoinc,
1696 ib_sequence_t& sequence,
1697 row_merge_block_t* block,
1698 bool skip_pk_sort,
1699 pfs_os_file_t* tmpfd,
1700 ut_stage_alter_t* stage,
1701 double pct_cost,
1702 row_merge_block_t* crypt_block,
1703 struct TABLE* eval_table,
1704 bool drop_historical)
1705{
1706 dict_index_t* clust_index; /* Clustered index */
1707 mem_heap_t* row_heap; /* Heap memory to create
1708 clustered index tuples */
1709 row_merge_buf_t** merge_buf; /* Temporary list for records*/
1710 mem_heap_t* v_heap = NULL; /* Heap memory to process large
1711 data for virtual column */
1712 btr_pcur_t pcur; /* Cursor on the clustered
1713 index */
1714 mtr_t mtr; /* Mini transaction */
1715 dberr_t err = DB_SUCCESS;/* Return code */
1716 ulint n_nonnull = 0; /* number of columns
1717 changed to NOT NULL */
1718 ulint* nonnull = NULL; /* NOT NULL columns */
1719 dict_index_t* fts_index = NULL;/* FTS index */
1720 doc_id_t doc_id = 0;
1721 doc_id_t max_doc_id = 0;
1722 ibool add_doc_id = FALSE;
1723 os_event_t fts_parallel_sort_event = NULL;
1724 ibool fts_pll_sort = FALSE;
1725 int64_t sig_count = 0;
1726 index_tuple_info_t** sp_tuples = NULL;
1727 mem_heap_t* sp_heap = NULL;
1728 ulint num_spatial = 0;
1729 BtrBulk* clust_btr_bulk = NULL;
1730 bool clust_temp_file = false;
1731 mem_heap_t* mtuple_heap = NULL;
1732 mtuple_t prev_mtuple;
1733 mem_heap_t* conv_heap = NULL;
1734 FlushObserver* observer = trx->flush_observer;
1735 double curr_progress = 0.0;
1736 ib_uint64_t read_rows = 0;
1737 ib_uint64_t table_total_rows = 0;
1738 char new_sys_trx_start[8];
1739 char new_sys_trx_end[8];
1740 byte any_autoinc_data[8] = {0};
1741 bool vers_update_trt = false;
1742
1743 DBUG_ENTER("row_merge_read_clustered_index");
1744
1745 ut_ad((old_table == new_table) == !col_map);
1746 ut_ad(!defaults || col_map);
1747 ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
1748 ut_ad(trx->id);
1749
1750 table_total_rows = dict_table_get_n_rows(old_table);
1751 if(table_total_rows == 0) {
1752 /* We don't know total row count */
1753 table_total_rows = 1;
1754 }
1755
1756 trx->op_info = "reading clustered index";
1757
1758#ifdef FTS_INTERNAL_DIAG_PRINT
1759 DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
1760#endif
1761
1762 /* Create and initialize memory for record buffers */
1763
1764 merge_buf = static_cast<row_merge_buf_t**>(
1765 ut_malloc_nokey(n_index * sizeof *merge_buf));
1766
1767 row_merge_dup_t clust_dup = {index[0], table, col_map, 0};
1768 dfield_t* prev_fields;
1769 const ulint n_uniq = dict_index_get_n_unique(index[0]);
1770
1771 ut_ad(trx->mysql_thd != NULL);
1772
1773 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
1774
1775 ut_ad(!skip_pk_sort || dict_index_is_clust(index[0]));
1776 /* There is no previous tuple yet. */
1777 prev_mtuple.fields = NULL;
1778
1779 for (ulint i = 0; i < n_index; i++) {
1780 if (index[i]->type & DICT_FTS) {
1781
1782 /* We are building a FT index, make sure
1783 we have the temporary 'fts_sort_idx' */
1784 ut_a(fts_sort_idx);
1785
1786 fts_index = index[i];
1787
1788 merge_buf[i] = row_merge_buf_create(fts_sort_idx);
1789
1790 add_doc_id = DICT_TF2_FLAG_IS_SET(
1791 new_table, DICT_TF2_FTS_ADD_DOC_ID);
1792
1793 /* If Doc ID does not exist in the table itself,
1794 fetch the first FTS Doc ID */
1795 if (add_doc_id) {
1796 fts_get_next_doc_id(
1797 (dict_table_t*) new_table,
1798 &doc_id);
1799 ut_ad(doc_id > 0);
1800 }
1801
1802 fts_pll_sort = TRUE;
1803 row_fts_start_psort(psort_info);
1804 fts_parallel_sort_event =
1805 psort_info[0].psort_common->sort_event;
1806 } else {
1807 if (dict_index_is_spatial(index[i])) {
1808 num_spatial++;
1809 }
1810
1811 merge_buf[i] = row_merge_buf_create(index[i]);
1812 }
1813 }
1814
1815 if (num_spatial > 0) {
1816 ulint count = 0;
1817
1818 sp_heap = mem_heap_create(512);
1819
1820 sp_tuples = static_cast<index_tuple_info_t**>(
1821 ut_malloc_nokey(num_spatial
1822 * sizeof(*sp_tuples)));
1823
1824 for (ulint i = 0; i < n_index; i++) {
1825 if (dict_index_is_spatial(index[i])) {
1826 sp_tuples[count]
1827 = UT_NEW_NOKEY(
1828 index_tuple_info_t(
1829 sp_heap,
1830 index[i]));
1831 count++;
1832 }
1833 }
1834
1835 ut_ad(count == num_spatial);
1836 }
1837
1838 mtr_start(&mtr);
1839
1840 /* Find the clustered index and create a persistent cursor
1841 based on that. */
1842
1843 clust_index = dict_table_get_first_index(old_table);
1844 const ulint old_trx_id_col = DATA_TRX_ID - DATA_N_SYS_COLS
1845 + ulint(old_table->n_cols);
1846 ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS);
1847 ut_ad(old_table->cols[old_trx_id_col].prtype
1848 == (DATA_TRX_ID | DATA_NOT_NULL));
1849 ut_ad(old_table->cols[old_trx_id_col + 1].mtype == DATA_SYS);
1850 ut_ad(old_table->cols[old_trx_id_col + 1].prtype
1851 == (DATA_ROLL_PTR | DATA_NOT_NULL));
1852 const ulint new_trx_id_col = col_map
1853 ? col_map[old_trx_id_col] : old_trx_id_col;
1854
1855 btr_pcur_open_at_index_side(
1856 true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
1857 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1858 if (rec_is_default_row(btr_pcur_get_rec(&pcur), clust_index)) {
1859 ut_ad(btr_pcur_is_on_user_rec(&pcur));
1860 /* Skip the 'default row' pseudo-record. */
1861 } else {
1862 ut_ad(!clust_index->is_instant());
1863 btr_pcur_move_to_prev_on_page(&pcur);
1864 }
1865
1866 if (old_table != new_table) {
1867 /* The table is being rebuilt. Identify the columns
1868 that were flagged NOT NULL in the new table, so that
1869 we can quickly check that the records in the old table
1870 do not violate the added NOT NULL constraints. */
1871
1872 nonnull = static_cast<ulint*>(
1873 ut_malloc_nokey(dict_table_get_n_cols(new_table)
1874 * sizeof *nonnull));
1875
1876 for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
1877 if (dict_table_get_nth_col(old_table, i)->prtype
1878 & DATA_NOT_NULL) {
1879 continue;
1880 }
1881
1882 const ulint j = col_map[i];
1883
1884 if (j == ULINT_UNDEFINED) {
1885 /* The column was dropped. */
1886 continue;
1887 }
1888
1889 if (dict_table_get_nth_col(new_table, j)->prtype
1890 & DATA_NOT_NULL) {
1891 nonnull[n_nonnull++] = j;
1892 }
1893 }
1894
1895 if (!n_nonnull) {
1896 ut_free(nonnull);
1897 nonnull = NULL;
1898 }
1899 }
1900
1901 row_heap = mem_heap_create(sizeof(mrec_buf_t));
1902
1903 if (dict_table_is_comp(old_table)
1904 && !dict_table_is_comp(new_table)) {
1905 conv_heap = mem_heap_create(sizeof(mrec_buf_t));
1906 }
1907
1908 if (skip_pk_sort) {
1909 prev_fields = static_cast<dfield_t*>(
1910 ut_malloc_nokey(n_uniq * sizeof *prev_fields));
1911 mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
1912 } else {
1913 prev_fields = NULL;
1914 }
1915
1916 mach_write_to_8(new_sys_trx_start, trx->id);
1917 mach_write_to_8(new_sys_trx_end, TRX_ID_MAX);
1918
1919 /* Scan the clustered index. */
1920 for (;;) {
1921 const rec_t* rec;
1922 trx_id_t rec_trx_id;
1923 ulint* offsets;
1924 const dtuple_t* row;
1925 row_ext_t* ext;
1926 page_cur_t* cur = btr_pcur_get_page_cur(&pcur);
1927
1928 mem_heap_empty(row_heap);
1929
1930 /* Do not continue if table pages are still encrypted */
1931 if (!old_table->is_readable() ||
1932 !new_table->is_readable()) {
1933 err = DB_DECRYPTION_FAILED;
1934 trx->error_key_num = 0;
1935 goto func_exit;
1936 }
1937
1938 mem_heap_empty(row_heap);
1939
1940 page_cur_move_to_next(cur);
1941
1942 stage->n_pk_recs_inc();
1943
1944 if (page_cur_is_after_last(cur)) {
1945
1946 stage->inc();
1947
1948 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1949 err = DB_INTERRUPTED;
1950 trx->error_key_num = 0;
1951 goto func_exit;
1952 }
1953
1954 if (online && old_table != new_table) {
1955 err = row_log_table_get_error(clust_index);
1956 if (err != DB_SUCCESS) {
1957 trx->error_key_num = 0;
1958 goto func_exit;
1959 }
1960 }
1961
1962 /* Insert the cached spatial index rows. */
1963 bool mtr_committed = false;
1964
1965 err = row_merge_spatial_rows(
1966 trx->id, sp_tuples, num_spatial,
1967 row_heap, sp_heap, &pcur,
1968 &mtr, &mtr_committed);
1969
1970 if (err != DB_SUCCESS) {
1971 goto func_exit;
1972 }
1973
1974 if (mtr_committed) {
1975 goto scan_next;
1976 }
1977
1978 if (my_atomic_load32_explicit(&clust_index->lock.waiters,
1979 MY_MEMORY_ORDER_RELAXED)) {
1980 /* There are waiters on the clustered
1981 index tree lock, likely the purge
1982 thread. Store and restore the cursor
1983 position, and yield so that scanning a
1984 large table will not starve other
1985 threads. */
1986
1987 /* Store the cursor position on the last user
1988 record on the page. */
1989 btr_pcur_move_to_prev_on_page(&pcur);
1990 /* Leaf pages must never be empty, unless
1991 this is the only page in the index tree. */
1992 ut_ad(btr_pcur_is_on_user_rec(&pcur)
1993 || btr_pcur_get_block(
1994 &pcur)->page.id.page_no()
1995 == clust_index->page);
1996
1997 btr_pcur_store_position(&pcur, &mtr);
1998 mtr_commit(&mtr);
1999
2000 /* Give the waiters a chance to proceed. */
2001 os_thread_yield();
2002scan_next:
2003 mtr_start(&mtr);
2004 /* Restore position on the record, or its
2005 predecessor if the record was purged
2006 meanwhile. */
2007 btr_pcur_restore_position(
2008 BTR_SEARCH_LEAF, &pcur, &mtr);
2009 /* Move to the successor of the
2010 original record. */
2011 if (!btr_pcur_move_to_next_user_rec(
2012 &pcur, &mtr)) {
2013end_of_index:
2014 row = NULL;
2015 mtr_commit(&mtr);
2016 mem_heap_free(row_heap);
2017 ut_free(nonnull);
2018 goto write_buffers;
2019 }
2020 } else {
2021 ulint next_page_no;
2022 buf_block_t* block;
2023
2024 next_page_no = btr_page_get_next(
2025 page_cur_get_page(cur), &mtr);
2026
2027 if (next_page_no == FIL_NULL) {
2028 goto end_of_index;
2029 }
2030
2031 block = page_cur_get_block(cur);
2032 block = btr_block_get(
2033 page_id_t(block->page.id.space(),
2034 next_page_no),
2035 block->page.size,
2036 BTR_SEARCH_LEAF,
2037 clust_index, &mtr);
2038
2039 btr_leaf_page_release(page_cur_get_block(cur),
2040 BTR_SEARCH_LEAF, &mtr);
2041 page_cur_set_before_first(block, cur);
2042 page_cur_move_to_next(cur);
2043
2044 ut_ad(!page_cur_is_after_last(cur));
2045 }
2046 }
2047
2048 rec = page_cur_get_rec(cur);
2049
2050 if (online) {
2051 offsets = rec_get_offsets(rec, clust_index, NULL, true,
2052 ULINT_UNDEFINED, &row_heap);
2053 rec_trx_id = row_get_rec_trx_id(rec, clust_index,
2054 offsets);
2055
2056 /* Perform a REPEATABLE READ.
2057
2058 When rebuilding the table online,
2059 row_log_table_apply() must not see a newer
2060 state of the table when applying the log.
2061 This is mainly to prevent false duplicate key
2062 errors, because the log will identify records
2063 by the PRIMARY KEY, and also to prevent unsafe
2064 BLOB access.
2065
2066 When creating a secondary index online, this
2067 table scan must not see records that have only
2068 been inserted to the clustered index, but have
2069 not been written to the online_log of
2070 index[]. If we performed READ UNCOMMITTED, it
2071 could happen that the ADD INDEX reaches
2072 ONLINE_INDEX_COMPLETE state between the time
2073 the DML thread has updated the clustered index
2074 but has not yet accessed secondary index. */
2075 ut_ad(trx->read_view.is_open());
2076 ut_ad(rec_trx_id != trx->id);
2077
2078 if (!trx->read_view.changes_visible(
2079 rec_trx_id, old_table->name)) {
2080 rec_t* old_vers;
2081
2082 row_vers_build_for_consistent_read(
2083 rec, &mtr, clust_index, &offsets,
2084 &trx->read_view, &row_heap,
2085 row_heap, &old_vers, NULL);
2086
2087 if (!old_vers) {
2088 continue;
2089 }
2090
2091 /* The old version must necessarily be
2092 in the "prehistory", because the
2093 exclusive lock in
2094 ha_innobase::prepare_inplace_alter_table()
2095 forced the completion of any transactions
2096 that accessed this table. */
2097 ut_ad(row_get_rec_trx_id(old_vers, clust_index,
2098 offsets) < trx->id);
2099
2100 rec = old_vers;
2101 rec_trx_id = 0;
2102 }
2103
2104 if (rec_get_deleted_flag(
2105 rec,
2106 dict_table_is_comp(old_table))) {
2107 /* In delete-marked records, DB_TRX_ID must
2108 always refer to an existing undo log record.
2109 Above, we did reset rec_trx_id = 0
2110 for rec = old_vers.*/
2111 ut_ad(rec == page_cur_get_rec(cur)
2112 ? rec_trx_id
2113 : !rec_trx_id);
2114 /* This record was deleted in the latest
2115 committed version, or it was deleted and
2116 then reinserted-by-update before purge
2117 kicked in. Skip it. */
2118 continue;
2119 }
2120
2121 ut_ad(!rec_offs_any_null_extern(rec, offsets));
2122 } else if (rec_get_deleted_flag(
2123 rec, dict_table_is_comp(old_table))) {
2124 /* In delete-marked records, DB_TRX_ID must
2125 always refer to an existing undo log record. */
2126 ut_d(rec_trx_id = rec_get_trx_id(rec, clust_index));
2127 ut_ad(rec_trx_id);
2128 /* This must be a purgeable delete-marked record,
2129 and the transaction that delete-marked the record
2130 must have been committed before this
2131 !online ALTER TABLE transaction. */
2132 ut_ad(rec_trx_id < trx->id);
2133 /* Skip delete-marked records.
2134
2135 Skipping delete-marked records will make the
2136 created indexes unuseable for transactions
2137 whose read views were created before the index
2138 creation completed, but an attempt to preserve
2139 the history would make it tricky to detect
2140 duplicate keys. */
2141 continue;
2142 } else {
2143 offsets = rec_get_offsets(rec, clust_index, NULL, true,
2144 ULINT_UNDEFINED, &row_heap);
2145 /* This is a locking ALTER TABLE.
2146
2147 If we are not rebuilding the table, the
2148 DB_TRX_ID does not matter, as it is not being
2149 written to any secondary indexes; see
2150 if (old_table == new_table) below.
2151
2152 If we are rebuilding the table, the
2153 DB_TRX_ID,DB_ROLL_PTR should be reset, because
2154 there will be no history available. */
2155 ut_ad(rec_get_trx_id(rec, clust_index) < trx->id);
2156 rec_trx_id = 0;
2157 }
2158
2159 /* When !online, we are holding a lock on old_table, preventing
2160 any inserts that could have written a record 'stub' before
2161 writing out off-page columns. */
2162 ut_ad(!rec_offs_any_null_extern(rec, offsets));
2163
2164 /* Build a row based on the clustered index. */
2165
2166 row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
2167 rec, offsets, new_table,
2168 defaults, add_v, col_map, &ext,
2169 row_heap);
2170 ut_ad(row);
2171
2172 for (ulint i = 0; i < n_nonnull; i++) {
2173 dfield_t* field = &row->fields[nonnull[i]];
2174
2175 ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);
2176
2177 if (dfield_is_null(field)) {
2178 const dfield_t& default_field
2179 = defaults->fields[nonnull[i]];
2180
2181 if (default_field.data == NULL) {
2182 err = DB_INVALID_NULL;
2183 trx->error_key_num = 0;
2184 goto func_exit;
2185 }
2186
2187 *field = default_field;
2188 }
2189 }
2190
2191 /* Get the next Doc ID */
2192 if (add_doc_id) {
2193 doc_id++;
2194 } else {
2195 doc_id = 0;
2196 }
2197
2198 ut_ad(row->fields[new_trx_id_col].type.mtype == DATA_SYS);
2199 ut_ad(row->fields[new_trx_id_col].type.prtype
2200 == (DATA_TRX_ID | DATA_NOT_NULL));
2201 ut_ad(row->fields[new_trx_id_col].len == DATA_TRX_ID_LEN);
2202 ut_ad(row->fields[new_trx_id_col + 1].type.mtype == DATA_SYS);
2203 ut_ad(row->fields[new_trx_id_col + 1].type.prtype
2204 == (DATA_ROLL_PTR | DATA_NOT_NULL));
2205 ut_ad(row->fields[new_trx_id_col + 1].len == DATA_ROLL_PTR_LEN);
2206
2207 if (old_table == new_table) {
2208 /* Do not bother touching DB_TRX_ID,DB_ROLL_PTR
2209 because they are not going to be written into
2210 secondary indexes. */
2211 } else if (rec_trx_id < trx->id) {
2212 /* Reset the DB_TRX_ID,DB_ROLL_PTR of old rows
2213 for which history is not going to be
2214 available after the rebuild operation.
2215 This essentially mimics row_purge_reset_trx_id(). */
2216 row->fields[new_trx_id_col].data
2217 = const_cast<byte*>(reset_trx_id);
2218 row->fields[new_trx_id_col + 1].data
2219 = const_cast<byte*>(reset_trx_id
2220 + DATA_TRX_ID_LEN);
2221 }
2222
2223 if (add_autoinc != ULINT_UNDEFINED) {
2224
2225 ut_ad(add_autoinc
2226 < dict_table_get_n_user_cols(new_table));
2227
2228 bool history_row = false;
2229 if (new_table->versioned()) {
2230 const dfield_t* dfield = dtuple_get_nth_field(
2231 row, new_table->vers_end);
2232 history_row = dfield->vers_history_row();
2233 }
2234
2235 dfield_t* dfield;
2236
2237 dfield = dtuple_get_nth_field(row, add_autoinc);
2238
2239 if (new_table->versioned()) {
2240 if (history_row) {
2241 if (dfield_get_type(dfield)->prtype & DATA_NOT_NULL) {
2242 err = DB_UNSUPPORTED;
2243 my_error(ER_UNSUPPORTED_EXTENSION, MYF(0),
2244 old_table->name.m_name);
2245 goto func_exit;
2246 }
2247 dfield_set_null(dfield);
2248 } else {
2249 // set not null
2250 ulint len = dfield_get_type(dfield)->len;
2251 dfield_set_data(dfield, any_autoinc_data, len);
2252 }
2253 }
2254
2255 if (dfield_is_null(dfield)) {
2256 goto write_buffers;
2257 }
2258
2259 const dtype_t* dtype = dfield_get_type(dfield);
2260 byte* b = static_cast<byte*>(dfield_get_data(dfield));
2261
2262 if (sequence.eof()) {
2263 err = DB_ERROR;
2264 trx->error_key_num = 0;
2265
2266 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2267 ER_AUTOINC_READ_FAILED, "[NULL]");
2268
2269 goto func_exit;
2270 }
2271
2272 ulonglong value = sequence++;
2273
2274 switch (dtype_get_mtype(dtype)) {
2275 case DATA_INT: {
2276 ibool usign;
2277 ulint len = dfield_get_len(dfield);
2278
2279 usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
2280 mach_write_ulonglong(b, value, len, usign);
2281
2282 break;
2283 }
2284
2285 case DATA_FLOAT:
2286 mach_float_write(
2287 b, static_cast<float>(value));
2288 break;
2289
2290 case DATA_DOUBLE:
2291 mach_double_write(
2292 b, static_cast<double>(value));
2293 break;
2294
2295 default:
2296 ut_ad(0);
2297 }
2298 }
2299
2300 if (old_table->versioned()) {
2301 if ((!new_table->versioned() || drop_historical)
2302 && clust_index->vers_history_row(rec, offsets)) {
2303 continue;
2304 }
2305 } else if (new_table->versioned()) {
2306 dfield_t* start =
2307 dtuple_get_nth_field(row, new_table->vers_start);
2308 dfield_t* end =
2309 dtuple_get_nth_field(row, new_table->vers_end);
2310 dfield_set_data(start, new_sys_trx_start, 8);
2311 dfield_set_data(end, new_sys_trx_end, 8);
2312 vers_update_trt = true;
2313 }
2314
2315write_buffers:
2316 /* Build all entries for all the indexes to be created
2317 in a single scan of the clustered index. */
2318
2319 ulint s_idx_cnt = 0;
2320 bool skip_sort = skip_pk_sort
2321 && dict_index_is_clust(merge_buf[0]->index);
2322
2323 for (ulint i = 0; i < n_index; i++, skip_sort = false) {
2324 row_merge_buf_t* buf = merge_buf[i];
2325 merge_file_t* file = &files[i];
2326 ulint rows_added = 0;
2327
2328 if (dict_index_is_spatial(buf->index)) {
2329 if (!row) {
2330 continue;
2331 }
2332
2333 ut_ad(sp_tuples[s_idx_cnt]->get_index()
2334 == buf->index);
2335
2336 /* If the geometry field is invalid, report
2337 error. */
2338 if (!row_geo_field_is_valid(row, buf->index)) {
2339 err = DB_CANT_CREATE_GEOMETRY_OBJECT;
2340 break;
2341 }
2342
2343 sp_tuples[s_idx_cnt]->add(row, ext);
2344 s_idx_cnt++;
2345
2346 continue;
2347 }
2348
2349 ut_ad(!row
2350 || !dict_index_is_clust(buf->index)
2351 || trx_id_check(row->fields[new_trx_id_col].data,
2352 trx->id));
2353
2354 if (UNIV_LIKELY
2355 (row && (rows_added = row_merge_buf_add(
2356 buf, fts_index, old_table, new_table,
2357 psort_info, row, ext, &doc_id,
2358 conv_heap, &err,
2359 &v_heap, eval_table, trx)))) {
2360
2361 /* If we are creating FTS index,
2362 a single row can generate more
2363 records for tokenized word */
2364 file->n_rec += rows_added;
2365
2366 if (err != DB_SUCCESS) {
2367 ut_ad(err == DB_TOO_BIG_RECORD);
2368 break;
2369 }
2370
2371 if (doc_id > max_doc_id) {
2372 max_doc_id = doc_id;
2373 }
2374
2375 if (buf->index->type & DICT_FTS) {
2376 /* Check if error occurs in child thread */
2377 for (ulint j = 0;
2378 j < fts_sort_pll_degree; j++) {
2379 if (psort_info[j].error
2380 != DB_SUCCESS) {
2381 err = psort_info[j].error;
2382 trx->error_key_num = i;
2383 break;
2384 }
2385 }
2386
2387 if (err != DB_SUCCESS) {
2388 break;
2389 }
2390 }
2391
2392 if (skip_sort) {
2393 ut_ad(buf->n_tuples > 0);
2394 const mtuple_t* curr =
2395 &buf->tuples[buf->n_tuples - 1];
2396
2397 ut_ad(i == 0);
2398 ut_ad(dict_index_is_clust(merge_buf[0]->index));
2399 /* Detect duplicates by comparing the
2400 current record with previous record.
2401 When temp file is not used, records
2402 should be in sorted order. */
2403 if (prev_mtuple.fields != NULL
2404 && (row_mtuple_cmp(
2405 &prev_mtuple, curr,
2406 &clust_dup) == 0)) {
2407
2408 err = DB_DUPLICATE_KEY;
2409 trx->error_key_num
2410 = key_numbers[0];
2411 goto func_exit;
2412 }
2413
2414 prev_mtuple.fields = curr->fields;
2415 }
2416
2417 continue;
2418 }
2419
2420 if (err == DB_COMPUTE_VALUE_FAILED) {
2421 trx->error_key_num = i;
2422 goto func_exit;
2423 }
2424
2425 if (buf->index->type & DICT_FTS) {
2426 if (!row || !doc_id) {
2427 continue;
2428 }
2429 }
2430
2431 /* The buffer must be sufficiently large
2432 to hold at least one record. It may only
2433 be empty when we reach the end of the
2434 clustered index. row_merge_buf_add()
2435 must not have been called in this loop. */
2436 ut_ad(buf->n_tuples || row == NULL);
2437
2438 /* We have enough data tuples to form a block.
2439 Sort them and write to disk if temp file is used
2440 or insert into index if temp file is not used. */
2441 ut_ad(old_table == new_table
2442 ? !dict_index_is_clust(buf->index)
2443 : (i == 0) == dict_index_is_clust(buf->index));
2444
2445 /* We have enough data tuples to form a block.
2446 Sort them (if !skip_sort) and write to disk. */
2447
2448 if (buf->n_tuples) {
2449 if (skip_sort) {
2450 /* Temporary File is not used.
2451 so insert sorted block to the index */
2452 if (row != NULL) {
2453 bool mtr_committed = false;
2454
2455 /* We have to do insert the
2456 cached spatial index rows, since
2457 after the mtr_commit, the cluster
2458 index page could be updated, then
2459 the data in cached rows become
2460 invalid. */
2461 err = row_merge_spatial_rows(
2462 trx->id, sp_tuples,
2463 num_spatial,
2464 row_heap, sp_heap,
2465 &pcur, &mtr,
2466 &mtr_committed);
2467
2468 if (err != DB_SUCCESS) {
2469 goto func_exit;
2470 }
2471
2472 /* We are not at the end of
2473 the scan yet. We must
2474 mtr_commit() in order to be
2475 able to call log_free_check()
2476 in row_merge_insert_index_tuples().
2477 Due to mtr_commit(), the
2478 current row will be invalid, and
2479 we must reread it on the next
2480 loop iteration. */
2481 if (!mtr_committed) {
2482 btr_pcur_move_to_prev_on_page(
2483 &pcur);
2484 btr_pcur_store_position(
2485 &pcur, &mtr);
2486
2487 mtr_commit(&mtr);
2488 }
2489 }
2490
2491 mem_heap_empty(mtuple_heap);
2492 prev_mtuple.fields = prev_fields;
2493
2494 row_mtuple_create(
2495 &buf->tuples[buf->n_tuples - 1],
2496 &prev_mtuple, n_uniq,
2497 mtuple_heap);
2498
2499 if (clust_btr_bulk == NULL) {
2500 clust_btr_bulk = UT_NEW_NOKEY(
2501 BtrBulk(index[i],
2502 trx->id,
2503 observer));
2504
2505 clust_btr_bulk->init();
2506 } else {
2507 clust_btr_bulk->latch();
2508 }
2509
2510 err = row_merge_insert_index_tuples(
2511 index[i], old_table,
2512 OS_FILE_CLOSED, NULL, buf, clust_btr_bulk,
2513 table_total_rows,
2514 curr_progress,
2515 pct_cost,
2516 crypt_block,
2517 new_table->space->id);
2518
2519 if (row == NULL) {
2520 err = clust_btr_bulk->finish(
2521 err);
2522 UT_DELETE(clust_btr_bulk);
2523 clust_btr_bulk = NULL;
2524 } else {
2525 /* Release latches for possible
2526 log_free_chck in spatial index
2527 build. */
2528 clust_btr_bulk->release();
2529 }
2530
2531 if (err != DB_SUCCESS) {
2532 break;
2533 }
2534
2535 if (row != NULL) {
2536 /* Restore the cursor on the
2537 previous clustered index record,
2538 and empty the buffer. The next
2539 iteration of the outer loop will
2540 advance the cursor and read the
2541 next record (the one which we
2542 had to ignore due to the buffer
2543 overflow). */
2544 mtr_start(&mtr);
2545 btr_pcur_restore_position(
2546 BTR_SEARCH_LEAF, &pcur,
2547 &mtr);
2548 buf = row_merge_buf_empty(buf);
2549 /* Restart the outer loop on the
2550 record. We did not insert it
2551 into any index yet. */
2552 ut_ad(i == 0);
2553 break;
2554 }
2555 } else if (dict_index_is_unique(buf->index)) {
2556 row_merge_dup_t dup = {
2557 buf->index, table, col_map, 0};
2558
2559 row_merge_buf_sort(buf, &dup);
2560
2561 if (dup.n_dup) {
2562 err = DB_DUPLICATE_KEY;
2563 trx->error_key_num
2564 = key_numbers[i];
2565 break;
2566 }
2567 } else {
2568 row_merge_buf_sort(buf, NULL);
2569 }
2570 } else if (online && new_table == old_table) {
2571 /* Note the newest transaction that
2572 modified this index when the scan was
2573 completed. We prevent older readers
2574 from accessing this index, to ensure
2575 read consistency. */
2576
2577 trx_id_t max_trx_id;
2578
2579 ut_a(row == NULL);
2580 rw_lock_x_lock(
2581 dict_index_get_lock(buf->index));
2582 ut_a(dict_index_get_online_status(buf->index)
2583 == ONLINE_INDEX_CREATION);
2584
2585 max_trx_id = row_log_get_max_trx(buf->index);
2586
2587 if (max_trx_id > buf->index->trx_id) {
2588 buf->index->trx_id = max_trx_id;
2589 }
2590
2591 rw_lock_x_unlock(
2592 dict_index_get_lock(buf->index));
2593 }
2594
2595 /* Secondary index and clustered index which is
2596 not in sorted order can use the temporary file.
2597 Fulltext index should not use the temporary file. */
2598 if (!skip_sort && !(buf->index->type & DICT_FTS)) {
2599 /* In case we can have all rows in sort buffer,
2600 we can insert directly into the index without
2601 temporary file if clustered index does not uses
2602 temporary file. */
2603 if (row == NULL && file->fd == OS_FILE_CLOSED
2604 && !clust_temp_file) {
2605 DBUG_EXECUTE_IF(
2606 "row_merge_write_failure",
2607 err = DB_TEMP_FILE_WRITE_FAIL;
2608 trx->error_key_num = i;
2609 goto all_done;);
2610
2611 DBUG_EXECUTE_IF(
2612 "row_merge_tmpfile_fail",
2613 err = DB_OUT_OF_MEMORY;
2614 trx->error_key_num = i;
2615 goto all_done;);
2616
2617 BtrBulk btr_bulk(index[i], trx->id,
2618 observer);
2619 btr_bulk.init();
2620
2621 err = row_merge_insert_index_tuples(
2622 index[i], old_table,
2623 OS_FILE_CLOSED, NULL, buf, &btr_bulk,
2624 table_total_rows,
2625 curr_progress,
2626 pct_cost,
2627 crypt_block,
2628 new_table->space->id);
2629
2630 err = btr_bulk.finish(err);
2631
2632 DBUG_EXECUTE_IF(
2633 "row_merge_insert_big_row",
2634 err = DB_TOO_BIG_RECORD;);
2635
2636 if (err != DB_SUCCESS) {
2637 break;
2638 }
2639 } else {
2640 if (!row_merge_file_create_if_needed(
2641 file, tmpfd,
2642 buf->n_tuples, path)) {
2643 err = DB_OUT_OF_MEMORY;
2644 trx->error_key_num = i;
2645 goto func_exit;
2646 }
2647
2648 /* Ensure that duplicates in the
2649 clustered index will be detected before
2650 inserting secondary index records. */
2651 if (dict_index_is_clust(buf->index)) {
2652 clust_temp_file = true;
2653 }
2654
2655 ut_ad(file->n_rec > 0);
2656
2657 row_merge_buf_write(buf, file, block);
2658
2659 if (!row_merge_write(
2660 file->fd, file->offset++,
2661 block, crypt_block,
2662 new_table->space->id)) {
2663 err = DB_TEMP_FILE_WRITE_FAIL;
2664 trx->error_key_num = i;
2665 break;
2666 }
2667
2668 UNIV_MEM_INVALID(
2669 &block[0], srv_sort_buf_size);
2670 }
2671 }
2672 merge_buf[i] = row_merge_buf_empty(buf);
2673
2674 if (UNIV_LIKELY(row != NULL)) {
2675 /* Try writing the record again, now
2676 that the buffer has been written out
2677 and emptied. */
2678
2679 if (UNIV_UNLIKELY
2680 (!(rows_added = row_merge_buf_add(
2681 buf, fts_index, old_table,
2682 new_table, psort_info, row, ext,
2683 &doc_id, conv_heap,
2684 &err, &v_heap, table, trx)))) {
2685 /* An empty buffer should have enough
2686 room for at least one record. */
2687 ut_error;
2688 }
2689
2690 if (err != DB_SUCCESS) {
2691 break;
2692 }
2693
2694 file->n_rec += rows_added;
2695 }
2696 }
2697
2698 if (row == NULL) {
2699 goto all_done;
2700 }
2701
2702 if (err != DB_SUCCESS) {
2703 goto func_exit;
2704 }
2705
2706 if (v_heap) {
2707 mem_heap_empty(v_heap);
2708 }
2709
2710 /* Increment innodb_onlineddl_pct_progress status variable */
2711 read_rows++;
2712 if(read_rows % 1000 == 0) {
2713 /* Update progress for each 1000 rows */
2714 curr_progress = (read_rows >= table_total_rows) ?
2715 pct_cost :
2716 ((pct_cost * read_rows) / table_total_rows);
2717 /* presenting 10.12% as 1012 integer */
2718 onlineddl_pct_progress = (ulint) (curr_progress * 100);
2719 }
2720 }
2721
2722func_exit:
2723 /* row_merge_spatial_rows may have committed
2724 the mtr before an error occurs. */
2725 if (mtr.is_active()) {
2726 mtr_commit(&mtr);
2727 }
2728 mem_heap_free(row_heap);
2729 ut_free(nonnull);
2730
2731all_done:
2732 if (clust_btr_bulk != NULL) {
2733 ut_ad(err != DB_SUCCESS);
2734 clust_btr_bulk->latch();
2735 err = clust_btr_bulk->finish(
2736 err);
2737 UT_DELETE(clust_btr_bulk);
2738 }
2739
2740 if (prev_fields != NULL) {
2741 ut_free(prev_fields);
2742 mem_heap_free(mtuple_heap);
2743 }
2744
2745 if (v_heap) {
2746 mem_heap_free(v_heap);
2747 }
2748
2749 if (conv_heap != NULL) {
2750 mem_heap_free(conv_heap);
2751 }
2752
2753#ifdef FTS_INTERNAL_DIAG_PRINT
2754 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
2755#endif
2756 if (fts_pll_sort) {
2757 bool all_exit = false;
2758 ulint trial_count = 0;
2759 const ulint max_trial_count = 10000;
2760
2761wait_again:
2762 /* Check if error occurs in child thread */
2763 for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2764 if (psort_info[j].error != DB_SUCCESS) {
2765 err = psort_info[j].error;
2766 trx->error_key_num = j;
2767 break;
2768 }
2769 }
2770
2771 /* Tell all children that parent has done scanning */
2772 for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2773 if (err == DB_SUCCESS) {
2774 psort_info[i].state = FTS_PARENT_COMPLETE;
2775 } else {
2776 psort_info[i].state = FTS_PARENT_EXITING;
2777 }
2778 }
2779
2780 /* Now wait all children to report back to be completed */
2781 os_event_wait_time_low(fts_parallel_sort_event,
2782 1000000, sig_count);
2783
2784 for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2785 if (psort_info[i].child_status != FTS_CHILD_COMPLETE
2786 && psort_info[i].child_status != FTS_CHILD_EXITING) {
2787 sig_count = os_event_reset(
2788 fts_parallel_sort_event);
2789 goto wait_again;
2790 }
2791 }
2792
2793 /* Now all children should complete, wait a bit until
2794 they all finish setting the event, before we free everything.
2795 This has a 10 second timeout */
2796 do {
2797 all_exit = true;
2798
2799 for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2800 if (psort_info[j].child_status
2801 != FTS_CHILD_EXITING) {
2802 all_exit = false;
2803 os_thread_sleep(1000);
2804 break;
2805 }
2806 }
2807 trial_count++;
2808 } while (!all_exit && trial_count < max_trial_count);
2809
2810 if (!all_exit) {
2811 ib::fatal() << "Not all child sort threads exited"
2812 " when creating FTS index '"
2813 << fts_sort_idx->name << "'";
2814 }
2815 }
2816
2817#ifdef FTS_INTERNAL_DIAG_PRINT
2818 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
2819#endif
2820 for (ulint i = 0; i < n_index; i++) {
2821 row_merge_buf_free(merge_buf[i]);
2822 }
2823
2824 row_fts_free_pll_merge_buf(psort_info);
2825
2826 ut_free(merge_buf);
2827
2828 btr_pcur_close(&pcur);
2829
2830 if (sp_tuples != NULL) {
2831 for (ulint i = 0; i < num_spatial; i++) {
2832 UT_DELETE(sp_tuples[i]);
2833 }
2834 ut_free(sp_tuples);
2835
2836 if (sp_heap) {
2837 mem_heap_free(sp_heap);
2838 }
2839 }
2840
2841 /* Update the next Doc ID we used. Table should be locked, so
2842 no concurrent DML */
2843 if (max_doc_id && err == DB_SUCCESS) {
2844 /* Sync fts cache for other fts indexes to keep all
2845 fts indexes consistent in sync_doc_id. */
2846 err = fts_sync_table(const_cast<dict_table_t*>(new_table),
2847 false, true, false);
2848
2849 if (err == DB_SUCCESS) {
2850 fts_update_next_doc_id(
2851 0, new_table,
2852 old_table->name.m_name, max_doc_id);
2853 }
2854 }
2855
2856 if (vers_update_trt) {
2857 trx_mod_table_time_t& time =
2858 trx->mod_tables
2859 .insert(trx_mod_tables_t::value_type(
2860 const_cast<dict_table_t*>(new_table), 0))
2861 .first->second;
2862 time.set_versioned(0);
2863 }
2864
2865 trx->op_info = "";
2866
2867 DBUG_RETURN(err);
2868}
2869
2870/** Write a record via buffer 2 and read the next record to buffer N.
2871@param N number of the buffer (0 or 1)
2872@param INDEX record descriptor
2873@param AT_END statement to execute at end of input */
2874#define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) \
2875 do { \
2876 b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
2877 &buf[2], b2, \
2878 of->fd, &of->offset, \
2879 mrec##N, offsets##N, \
2880 crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL , \
2881 space); \
2882 if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \
2883 goto corrupt; \
2884 } \
2885 b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\
2886 &buf[N], b##N, INDEX, \
2887 file->fd, foffs##N, \
2888 &mrec##N, offsets##N, \
2889 crypt_block ? &crypt_block[N * srv_sort_buf_size] : NULL, \
2890 space); \
2891 \
2892 if (UNIV_UNLIKELY(!b##N)) { \
2893 if (mrec##N) { \
2894 goto corrupt; \
2895 } \
2896 AT_END; \
2897 } \
2898 } while (0)
2899
2900#ifdef HAVE_PSI_STAGE_INTERFACE
2901#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
2902 do { \
2903 if (stage != NULL) { \
2904 stage->inc(); \
2905 } \
2906 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END); \
2907 } while (0)
2908#else /* HAVE_PSI_STAGE_INTERFACE */
2909#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
2910 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)
2911#endif /* HAVE_PSI_STAGE_INTERFACE */
2912
2913/** Merge two blocks of records on disk and write a bigger block.
2914@param[in] dup descriptor of index being created
2915@param[in] file file containing index entries
2916@param[in,out] block 3 buffers
2917@param[in,out] foffs0 offset of first source list in the file
2918@param[in,out] foffs1 offset of second source list in the file
2919@param[in,out] of output file
2920@param[in,out] stage performance schema accounting object, used by
2921ALTER TABLE. If not NULL stage->inc() will be called for each record
2922processed.
2923@param[in,out] crypt_block encryption buffer
2924@param[in] space tablespace ID for encryption
2925@return DB_SUCCESS or error code */
2926static MY_ATTRIBUTE((warn_unused_result))
2927dberr_t
2928row_merge_blocks(
2929 const row_merge_dup_t* dup,
2930 const merge_file_t* file,
2931 row_merge_block_t* block,
2932 ulint* foffs0,
2933 ulint* foffs1,
2934 merge_file_t* of,
2935 ut_stage_alter_t* stage MY_ATTRIBUTE((unused)),
2936 row_merge_block_t* crypt_block,
2937 ulint space)
2938{
2939 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
2940
2941 mrec_buf_t* buf; /*!< buffer for handling
2942 split mrec in block[] */
2943 const byte* b0; /*!< pointer to block[0] */
2944 const byte* b1; /*!< pointer to block[srv_sort_buf_size] */
2945 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
2946 const mrec_t* mrec0; /*!< merge rec, points to block[0] or buf[0] */
2947 const mrec_t* mrec1; /*!< merge rec, points to
2948 block[srv_sort_buf_size] or buf[1] */
2949 ulint* offsets0;/* offsets of mrec0 */
2950 ulint* offsets1;/* offsets of mrec1 */
2951
2952 DBUG_ENTER("row_merge_blocks");
2953 DBUG_LOG("ib_merge_sort",
2954 "fd=" << file->fd << ',' << *foffs0 << '+' << *foffs1
2955 << " to fd=" << of->fd << ',' << of->offset);
2956
2957 heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
2958
2959 /* Write a record and read the next record. Split the output
2960 file in two halves, which can be merged on the following pass. */
2961
2962 if (!row_merge_read(file->fd, *foffs0, &block[0],
2963 crypt_block ? &crypt_block[0] : NULL,
2964 space) ||
2965 !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size],
2966 crypt_block ? &crypt_block[srv_sort_buf_size] : NULL,
2967 space)) {
2968corrupt:
2969 mem_heap_free(heap);
2970 DBUG_RETURN(DB_CORRUPTION);
2971 }
2972
2973 b0 = &block[0];
2974 b1 = &block[srv_sort_buf_size];
2975 b2 = &block[2 * srv_sort_buf_size];
2976
2977 b0 = row_merge_read_rec(
2978 &block[0], &buf[0], b0, dup->index,
2979 file->fd, foffs0, &mrec0, offsets0,
2980 crypt_block ? &crypt_block[0] : NULL,
2981 space);
2982
2983 b1 = row_merge_read_rec(
2984 &block[srv_sort_buf_size],
2985 &buf[srv_sort_buf_size], b1, dup->index,
2986 file->fd, foffs1, &mrec1, offsets1,
2987 crypt_block ? &crypt_block[srv_sort_buf_size] : NULL,
2988 space);
2989
2990 if (UNIV_UNLIKELY(!b0 && mrec0)
2991 || UNIV_UNLIKELY(!b1 && mrec1)) {
2992
2993 goto corrupt;
2994 }
2995
2996 while (mrec0 && mrec1) {
2997 int cmp = cmp_rec_rec_simple(
2998 mrec0, mrec1, offsets0, offsets1,
2999 dup->index, dup->table);
3000 if (cmp < 0) {
3001 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
3002 } else if (cmp) {
3003 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
3004 } else {
3005 mem_heap_free(heap);
3006 DBUG_RETURN(DB_DUPLICATE_KEY);
3007 }
3008 }
3009
3010merged:
3011 if (mrec0) {
3012 /* append all mrec0 to output */
3013 for (;;) {
3014 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0);
3015 }
3016 }
3017done0:
3018 if (mrec1) {
3019 /* append all mrec1 to output */
3020 for (;;) {
3021 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1);
3022 }
3023 }
3024done1:
3025
3026 mem_heap_free(heap);
3027
3028 b2 = row_merge_write_eof(
3029 &block[2 * srv_sort_buf_size],
3030 b2, of->fd, &of->offset,
3031 crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL,
3032 space);
3033 DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION);
3034}
3035
3036/** Copy a block of index entries.
3037@param[in] index index being created
3038@param[in] file input file
3039@param[in,out] block 3 buffers
3040@param[in,out] foffs0 input file offset
3041@param[in,out] of output file
3042@param[in,out] stage performance schema accounting object, used by
3043ALTER TABLE. If not NULL stage->inc() will be called for each record
3044processed.
3045@param[in,out] crypt_block encryption buffer
3046@param[in] space tablespace ID for encryption
3047@return TRUE on success, FALSE on failure */
3048static MY_ATTRIBUTE((warn_unused_result))
3049ibool
3050row_merge_blocks_copy(
3051 const dict_index_t* index,
3052 const merge_file_t* file,
3053 row_merge_block_t* block,
3054 ulint* foffs0,
3055 merge_file_t* of,
3056 ut_stage_alter_t* stage MY_ATTRIBUTE((unused)),
3057 row_merge_block_t* crypt_block,
3058 ulint space)
3059{
3060 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
3061
3062 mrec_buf_t* buf; /*!< buffer for handling
3063 split mrec in block[] */
3064 const byte* b0; /*!< pointer to block[0] */
3065 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
3066 const mrec_t* mrec0; /*!< merge rec, points to block[0] */
3067 ulint* offsets0;/* offsets of mrec0 */
3068 ulint* offsets1;/* dummy offsets */
3069
3070 DBUG_ENTER("row_merge_blocks_copy");
3071 DBUG_LOG("ib_merge_sort",
3072 "fd=" << file->fd << ',' << foffs0
3073 << " to fd=" << of->fd << ',' << of->offset);
3074
3075 heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
3076
3077 /* Write a record and read the next record. Split the output
3078 file in two halves, which can be merged on the following pass. */
3079
3080 if (!row_merge_read(file->fd, *foffs0, &block[0],
3081 crypt_block ? &crypt_block[0] : NULL,
3082 space)) {
3083corrupt:
3084 mem_heap_free(heap);
3085 DBUG_RETURN(FALSE);
3086 }
3087
3088 b0 = &block[0];
3089
3090 b2 = &block[2 * srv_sort_buf_size];
3091
3092 b0 = row_merge_read_rec(&block[0], &buf[0], b0, index,
3093 file->fd, foffs0, &mrec0, offsets0,
3094 crypt_block ? &crypt_block[0] : NULL,
3095 space);
3096
3097 if (UNIV_UNLIKELY(!b0 && mrec0)) {
3098
3099 goto corrupt;
3100 }
3101
3102 if (mrec0) {
3103 /* append all mrec0 to output */
3104 for (;;) {
3105 ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0);
3106 }
3107 }
3108done0:
3109
3110 /* The file offset points to the beginning of the last page
3111 that has been read. Update it to point to the next block. */
3112 (*foffs0)++;
3113
3114 mem_heap_free(heap);
3115
3116 DBUG_RETURN(row_merge_write_eof(
3117 &block[2 * srv_sort_buf_size],
3118 b2, of->fd, &of->offset,
3119 crypt_block
3120 ? &crypt_block[2 * srv_sort_buf_size]
3121 : NULL, space)
3122 != NULL);
3123}
3124
3125/** Merge disk files.
3126@param[in] trx transaction
3127@param[in] dup descriptor of index being created
3128@param[in,out] file file containing index entries
3129@param[in,out] block 3 buffers
3130@param[in,out] tmpfd temporary file handle
3131@param[in,out] num_run Number of runs that remain to be merged
3132@param[in,out] run_offset Array that contains the first offset number
3133for each merge run
3134@param[in,out] stage performance schema accounting object, used by
3135@param[in,out] crypt_block encryption buffer
3136@param[in] space tablespace ID for encryption
3137ALTER TABLE. If not NULL stage->inc() will be called for each record
3138processed.
3139@return DB_SUCCESS or error code */
3140static
3141dberr_t
3142row_merge(
3143 trx_t* trx,
3144 const row_merge_dup_t* dup,
3145 merge_file_t* file,
3146 row_merge_block_t* block,
3147 pfs_os_file_t* tmpfd,
3148 ulint* num_run,
3149 ulint* run_offset,
3150 ut_stage_alter_t* stage,
3151 row_merge_block_t* crypt_block,
3152 ulint space)
3153{
3154 ulint foffs0; /*!< first input offset */
3155 ulint foffs1; /*!< second input offset */
3156 dberr_t error; /*!< error code */
3157 merge_file_t of; /*!< output file */
3158 const ulint ihalf = run_offset[*num_run / 2];
3159 /*!< half the input file */
3160 ulint n_run = 0;
3161 /*!< num of runs generated from this merge */
3162
3163 UNIV_MEM_ASSERT_W(&block[0], 3 * srv_sort_buf_size);
3164
3165 if (crypt_block) {
3166 UNIV_MEM_ASSERT_W(&crypt_block[0], 3 * srv_sort_buf_size);
3167 }
3168
3169 ut_ad(ihalf < file->offset);
3170
3171 of.fd = *tmpfd;
3172 of.offset = 0;
3173 of.n_rec = 0;
3174
3175#ifdef POSIX_FADV_SEQUENTIAL
3176 /* The input file will be read sequentially, starting from the
3177 beginning and the middle. In Linux, the POSIX_FADV_SEQUENTIAL
3178 affects the entire file. Each block will be read exactly once. */
3179 posix_fadvise(file->fd, 0, 0,
3180 POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
3181#endif /* POSIX_FADV_SEQUENTIAL */
3182
3183 /* Merge blocks to the output file. */
3184 foffs0 = 0;
3185 foffs1 = ihalf;
3186
3187 UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
3188
3189 for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
3190
3191 if (trx_is_interrupted(trx)) {
3192 return(DB_INTERRUPTED);
3193 }
3194
3195 /* Remember the offset number for this run */
3196 run_offset[n_run++] = of.offset;
3197
3198 error = row_merge_blocks(dup, file, block,
3199 &foffs0, &foffs1, &of, stage,
3200 crypt_block, space);
3201
3202 if (error != DB_SUCCESS) {
3203 return(error);
3204 }
3205
3206 }
3207
3208 /* Copy the last blocks, if there are any. */
3209
3210 while (foffs0 < ihalf) {
3211
3212 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
3213 return(DB_INTERRUPTED);
3214 }
3215
3216 /* Remember the offset number for this run */
3217 run_offset[n_run++] = of.offset;
3218
3219 if (!row_merge_blocks_copy(dup->index, file, block,
3220 &foffs0, &of, stage,
3221 crypt_block, space)) {
3222 return(DB_CORRUPTION);
3223 }
3224 }
3225
3226 ut_ad(foffs0 == ihalf);
3227
3228 while (foffs1 < file->offset) {
3229
3230 if (trx_is_interrupted(trx)) {
3231 return(DB_INTERRUPTED);
3232 }
3233
3234 /* Remember the offset number for this run */
3235 run_offset[n_run++] = of.offset;
3236
3237 if (!row_merge_blocks_copy(dup->index, file, block,
3238 &foffs1, &of, stage,
3239 crypt_block, space)) {
3240 return(DB_CORRUPTION);
3241 }
3242 }
3243
3244 ut_ad(foffs1 == file->offset);
3245
3246 if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
3247 return(DB_CORRUPTION);
3248 }
3249
3250 ut_ad(n_run <= *num_run);
3251
3252 *num_run = n_run;
3253
3254 /* Each run can contain one or more offsets. As merge goes on,
3255 the number of runs (to merge) will reduce until we have one
3256 single run. So the number of runs will always be smaller than
3257 the number of offsets in file */
3258 ut_ad((*num_run) <= file->offset);
3259
3260 /* The number of offsets in output file is always equal or
3261 smaller than input file */
3262 ut_ad(of.offset <= file->offset);
3263
3264 /* Swap file descriptors for the next pass. */
3265 *tmpfd = file->fd;
3266 *file = of;
3267
3268 UNIV_MEM_INVALID(&block[0], 3 * srv_sort_buf_size);
3269
3270 return(DB_SUCCESS);
3271}
3272
3273/** Merge disk files.
3274@param[in] trx transaction
3275@param[in] dup descriptor of index being created
3276@param[in,out] file file containing index entries
3277@param[in,out] block 3 buffers
3278@param[in,out] tmpfd temporary file handle
3279@param[in,out] stage performance schema accounting object, used by
3280ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially
3281and then stage->inc() will be called for each record processed.
3282@return DB_SUCCESS or error code */
3283dberr_t
3284row_merge_sort(
3285 trx_t* trx,
3286 const row_merge_dup_t* dup,
3287 merge_file_t* file,
3288 row_merge_block_t* block,
3289 pfs_os_file_t* tmpfd,
3290 const bool update_progress,
3291 /*!< in: update progress
3292 status variable or not */
3293 const double pct_progress,
3294 /*!< in: total progress percent
3295 until now */
3296 const double pct_cost, /*!< in: current progress percent */
3297 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
3298 ulint space, /*!< in: space id */
3299 ut_stage_alter_t* stage)
3300{
3301 const ulint half = file->offset / 2;
3302 ulint num_runs;
3303 ulint* run_offset;
3304 dberr_t error = DB_SUCCESS;
3305 ulint merge_count = 0;
3306 ulint total_merge_sort_count;
3307 double curr_progress = 0;
3308
3309 DBUG_ENTER("row_merge_sort");
3310
3311 /* Record the number of merge runs we need to perform */
3312 num_runs = file->offset;
3313
3314 if (stage != NULL) {
3315 stage->begin_phase_sort(log2(num_runs));
3316 }
3317
3318 /* Find the number N which 2^N is greater or equal than num_runs */
3319 /* N is merge sort running count */
3320 total_merge_sort_count = (ulint) ceil(my_log2f((float)num_runs));
3321 if(total_merge_sort_count <= 0) {
3322 total_merge_sort_count=1;
3323 }
3324
3325 /* If num_runs are less than 1, nothing to merge */
3326 if (num_runs <= 1) {
3327 DBUG_RETURN(error);
3328 }
3329
3330 /* "run_offset" records each run's first offset number */
3331 run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint));
3332
3333 /* This tells row_merge() where to start for the first round
3334 of merge. */
3335 run_offset[half] = half;
3336
3337 /* The file should always contain at least one byte (the end
3338 of file marker). Thus, it must be at least one block. */
3339 ut_ad(file->offset > 0);
3340
3341 /* These thd_progress* calls will crash on sol10-64 when innodb_plugin
3342 is used. MDEV-9356: innodb.innodb_bug53290 fails (crashes) on
3343 sol10-64 in buildbot.
3344 */
3345#ifndef UNIV_SOLARIS
3346 /* Progress report only for "normal" indexes. */
3347 if (!(dup->index->type & DICT_FTS)) {
3348 thd_progress_init(trx->mysql_thd, 1);
3349 }
3350#endif /* UNIV_SOLARIS */
3351
3352 if (global_system_variables.log_warnings > 2) {
3353 sql_print_information("InnoDB: Online DDL : merge-sorting"
3354 " has estimated " ULINTPF " runs",
3355 num_runs);
3356 }
3357
3358 /* Merge the runs until we have one big run */
3359 do {
3360 /* Report progress of merge sort to MySQL for
3361 show processlist progress field */
3362 /* Progress report only for "normal" indexes. */
3363#ifndef UNIV_SOLARIS
3364 if (!(dup->index->type & DICT_FTS)) {
3365 thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset);
3366 }
3367#endif /* UNIV_SOLARIS */
3368
3369 error = row_merge(trx, dup, file, block, tmpfd,
3370 &num_runs, run_offset, stage,
3371 crypt_block, space);
3372
3373 if(update_progress) {
3374 merge_count++;
3375 curr_progress = (merge_count >= total_merge_sort_count) ?
3376 pct_cost :
3377 ((pct_cost * merge_count) / total_merge_sort_count);
3378 /* presenting 10.12% as 1012 integer */;
3379 onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100);
3380 }
3381
3382 if (error != DB_SUCCESS) {
3383 break;
3384 }
3385
3386 UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
3387 } while (num_runs > 1);
3388
3389 ut_free(run_offset);
3390
3391 /* Progress report only for "normal" indexes. */
3392#ifndef UNIV_SOLARIS
3393 if (!(dup->index->type & DICT_FTS)) {
3394 thd_progress_end(trx->mysql_thd);
3395 }
3396#endif /* UNIV_SOLARIS */
3397
3398 DBUG_RETURN(error);
3399}
3400
3401/** Copy externally stored columns to the data tuple.
3402@param[in] mrec record containing BLOB pointers,
3403or NULL to use tuple instead
3404@param[in] offsets offsets of mrec
3405@param[in] zip_size compressed page size in bytes, or 0
3406@param[in,out] tuple data tuple
3407@param[in,out] heap memory heap */
3408static
3409void
3410row_merge_copy_blobs(
3411 const mrec_t* mrec,
3412 const ulint* offsets,
3413 const page_size_t& page_size,
3414 dtuple_t* tuple,
3415 mem_heap_t* heap)
3416{
3417 ut_ad(mrec == NULL || rec_offs_any_extern(offsets));
3418
3419 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
3420 ulint len;
3421 const void* data;
3422 dfield_t* field = dtuple_get_nth_field(tuple, i);
3423 ulint field_len;
3424 const byte* field_data;
3425
3426 if (!dfield_is_ext(field)) {
3427 continue;
3428 }
3429
3430 ut_ad(!dfield_is_null(field));
3431
3432 /* During the creation of a PRIMARY KEY, the table is
3433 X-locked, and we skip copying records that have been
3434 marked for deletion. Therefore, externally stored
3435 columns cannot possibly be freed between the time the
3436 BLOB pointers are read (row_merge_read_clustered_index())
3437 and dereferenced (below). */
3438 if (mrec == NULL) {
3439 field_data
3440 = static_cast<byte*>(dfield_get_data(field));
3441 field_len = dfield_get_len(field);
3442
3443 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
3444
3445 ut_a(memcmp(field_data + field_len
3446 - BTR_EXTERN_FIELD_REF_SIZE,
3447 field_ref_zero,
3448 BTR_EXTERN_FIELD_REF_SIZE));
3449
3450 data = btr_copy_externally_stored_field(
3451 &len, field_data, page_size, field_len, heap);
3452 } else {
3453 data = btr_rec_copy_externally_stored_field(
3454 mrec, offsets, page_size, i, &len, heap);
3455 }
3456
3457 /* Because we have locked the table, any records
3458 written by incomplete transactions must have been
3459 rolled back already. There must not be any incomplete
3460 BLOB columns. */
3461 ut_a(data);
3462
3463 dfield_set_data(field, data, len);
3464 }
3465}
3466
3467/** Convert a merge record to a typed data tuple. Note that externally
3468stored fields are not copied to heap.
3469@param[in,out] index index on the table
3470@param[in] mtuple merge record
3471@param[in] heap memory heap from which memory needed is allocated
3472@return index entry built. */
3473static
3474void
3475row_merge_mtuple_to_dtuple(
3476 dict_index_t* index,
3477 dtuple_t* dtuple,
3478 const mtuple_t* mtuple)
3479{
3480 ut_ad(!dict_index_is_ibuf(index));
3481
3482 memcpy(dtuple->fields, mtuple->fields,
3483 dtuple->n_fields * sizeof *mtuple->fields);
3484}
3485
3486/** Insert sorted data tuples to the index.
3487@param[in] index index to be inserted
3488@param[in] old_table old table
3489@param[in] fd file descriptor
3490@param[in,out] block file buffer
3491@param[in] row_buf row_buf the sorted data tuples,
3492or NULL if fd, block will be used instead
3493@param[in,out] btr_bulk btr bulk instance
3494@param[in,out] stage performance schema accounting object, used by
3495ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
3496and then stage->inc() will be called for each record that is processed.
3497@return DB_SUCCESS or error number */
3498static MY_ATTRIBUTE((warn_unused_result))
3499dberr_t
3500row_merge_insert_index_tuples(
3501 dict_index_t* index,
3502 const dict_table_t* old_table,
3503 const pfs_os_file_t& fd,
3504 row_merge_block_t* block,
3505 const row_merge_buf_t* row_buf,
3506 BtrBulk* btr_bulk,
3507 const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
3508 const double pct_progress, /*!< in: total progress
3509 percent until now */
3510 const double pct_cost, /*!< in: current progress percent
3511 */
3512 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
3513 ulint space, /*!< in: space id */
3514 ut_stage_alter_t* stage)
3515{
3516 const byte* b;
3517 mem_heap_t* heap;
3518 mem_heap_t* tuple_heap;
3519 dberr_t error = DB_SUCCESS;
3520 ulint foffs = 0;
3521 ulint* offsets;
3522 mrec_buf_t* buf;
3523 ulint n_rows = 0;
3524 dtuple_t* dtuple;
3525 ib_uint64_t inserted_rows = 0;
3526 double curr_progress = 0;
3527 dict_index_t* old_index = NULL;
3528 const mrec_t* mrec = NULL;
3529 ulint n_ext = 0;
3530 mtr_t mtr;
3531
3532
3533 DBUG_ENTER("row_merge_insert_index_tuples");
3534
3535 ut_ad(!srv_read_only_mode);
3536 ut_ad(!(index->type & DICT_FTS));
3537 ut_ad(!dict_index_is_spatial(index));
3538
3539 if (stage != NULL) {
3540 stage->begin_phase_insert();
3541 }
3542
3543 tuple_heap = mem_heap_create(1000);
3544
3545 {
3546 ulint i = 1 + REC_OFFS_HEADER_SIZE
3547 + dict_index_get_n_fields(index);
3548 heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
3549 offsets = static_cast<ulint*>(
3550 mem_heap_alloc(heap, i * sizeof *offsets));
3551 offsets[0] = i;
3552 offsets[1] = dict_index_get_n_fields(index);
3553 }
3554
3555 if (row_buf != NULL) {
3556 ut_ad(fd == OS_FILE_CLOSED);
3557 ut_ad(block == NULL);
3558 DBUG_EXECUTE_IF("row_merge_read_failure",
3559 error = DB_CORRUPTION;
3560 goto err_exit;);
3561 buf = NULL;
3562 b = NULL;
3563 dtuple = dtuple_create(
3564 heap, dict_index_get_n_fields(index));
3565 dtuple_set_n_fields_cmp(
3566 dtuple, dict_index_get_n_unique_in_tree(index));
3567 } else {
3568 b = block;
3569 dtuple = NULL;
3570
3571 if (!row_merge_read(fd, foffs, block, crypt_block, space)) {
3572 error = DB_CORRUPTION;
3573 goto err_exit;
3574 } else {
3575 buf = static_cast<mrec_buf_t*>(
3576 mem_heap_alloc(heap, sizeof *buf));
3577 }
3578 }
3579
3580 for (;;) {
3581
3582 if (stage != NULL) {
3583 stage->inc();
3584 }
3585
3586 if (row_buf != NULL) {
3587 if (n_rows >= row_buf->n_tuples) {
3588 break;
3589 }
3590
3591 /* Convert merge tuple record from
3592 row buffer to data tuple record */
3593 row_merge_mtuple_to_dtuple(
3594 index, dtuple, &row_buf->tuples[n_rows]);
3595
3596 n_ext = dtuple_get_n_ext(dtuple);
3597 n_rows++;
3598 /* BLOB pointers must be copied from dtuple */
3599 mrec = NULL;
3600 } else {
3601 b = row_merge_read_rec(block, buf, b, index,
3602 fd, &foffs, &mrec, offsets,
3603 crypt_block,
3604 space);
3605
3606 if (UNIV_UNLIKELY(!b)) {
3607 /* End of list, or I/O error */
3608 if (mrec) {
3609 error = DB_CORRUPTION;
3610 }
3611 break;
3612 }
3613
3614 dtuple = row_rec_to_index_entry_low(
3615 mrec, index, offsets, &n_ext, tuple_heap);
3616 }
3617
3618 old_index = dict_table_get_first_index(old_table);
3619
3620 if (dict_index_is_clust(index)
3621 && dict_index_is_online_ddl(old_index)) {
3622 error = row_log_table_get_error(old_index);
3623 if (error != DB_SUCCESS) {
3624 break;
3625 }
3626 }
3627
3628 if (!n_ext) {
3629 /* There are no externally stored columns. */
3630 } else {
3631 ut_ad(dict_index_is_clust(index));
3632 /* Off-page columns can be fetched safely
3633 when concurrent modifications to the table
3634 are disabled. (Purge can process delete-marked
3635 records, but row_merge_read_clustered_index()
3636 would have skipped them.)
3637
3638 When concurrent modifications are enabled,
3639 row_merge_read_clustered_index() will
3640 only see rows from transactions that were
3641 committed before the ALTER TABLE started
3642 (REPEATABLE READ).
3643
3644 Any modifications after the
3645 row_merge_read_clustered_index() scan
3646 will go through row_log_table_apply().
3647 Any modifications to off-page columns
3648 will be tracked by
3649 row_log_table_blob_alloc() and
3650 row_log_table_blob_free(). */
3651 row_merge_copy_blobs(
3652 mrec, offsets,
3653 dict_table_page_size(old_table),
3654 dtuple, tuple_heap);
3655 }
3656
3657#ifdef UNIV_DEBUG
3658 static const latch_level_t latches[] = {
3659 SYNC_INDEX_TREE, /* index->lock */
3660 SYNC_LEVEL_VARYING /* btr_bulk->m_page_bulks */
3661 };
3662#endif /* UNIV_DEBUG */
3663
3664 ut_ad(dtuple_validate(dtuple));
3665 ut_ad(!sync_check_iterate(sync_allowed_latches(latches,
3666 latches + 2)));
3667 error = btr_bulk->insert(dtuple);
3668
3669 if (error != DB_SUCCESS) {
3670 goto err_exit;
3671 }
3672
3673 mem_heap_empty(tuple_heap);
3674
3675 /* Increment innodb_onlineddl_pct_progress status variable */
3676 inserted_rows++;
3677 if(inserted_rows % 1000 == 0) {
3678 /* Update progress for each 1000 rows */
3679 curr_progress = (inserted_rows >= table_total_rows ||
3680 table_total_rows <= 0) ?
3681 pct_cost :
3682 ((pct_cost * inserted_rows) / table_total_rows);
3683
3684 /* presenting 10.12% as 1012 integer */;
3685 onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100);
3686 }
3687 }
3688
3689err_exit:
3690 mem_heap_free(tuple_heap);
3691 mem_heap_free(heap);
3692
3693 DBUG_RETURN(error);
3694}
3695
3696/*********************************************************************//**
3697Sets an exclusive lock on a table, for the duration of creating indexes.
3698@return error code or DB_SUCCESS */
3699dberr_t
3700row_merge_lock_table(
3701/*=================*/
3702 trx_t* trx, /*!< in/out: transaction */
3703 dict_table_t* table, /*!< in: table to lock */
3704 enum lock_mode mode) /*!< in: LOCK_X or LOCK_S */
3705{
3706 ut_ad(!srv_read_only_mode);
3707 ut_ad(mode == LOCK_X || mode == LOCK_S);
3708
3709 trx->op_info = "setting table lock for creating or dropping index";
3710 trx->ddl = true;
3711
3712 return(lock_table_for_trx(table, trx, mode));
3713}
3714
3715/*********************************************************************//**
3716Drop an index that was created before an error occurred.
3717The data dictionary must have been locked exclusively by the caller,
3718because the transaction will not be committed. */
3719static
3720void
3721row_merge_drop_index_dict(
3722/*======================*/
3723 trx_t* trx, /*!< in/out: dictionary transaction */
3724 index_id_t index_id)/*!< in: index identifier */
3725{
3726 static const char sql[] =
3727 "PROCEDURE DROP_INDEX_PROC () IS\n"
3728 "BEGIN\n"
3729 "DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n"
3730 "DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n"
3731 "END;\n";
3732 dberr_t error;
3733 pars_info_t* info;
3734
3735 ut_ad(!srv_read_only_mode);
3736 ut_ad(mutex_own(&dict_sys->mutex));
3737 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3738 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3739 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3740
3741 info = pars_info_create();
3742 pars_info_add_ull_literal(info, "indexid", index_id);
3743 trx->op_info = "dropping index from dictionary";
3744 error = que_eval_sql(info, sql, FALSE, trx);
3745
3746 if (error != DB_SUCCESS) {
3747 /* Even though we ensure that DDL transactions are WAIT
3748 and DEADLOCK free, we could encounter other errors e.g.,
3749 DB_TOO_MANY_CONCURRENT_TRXS. */
3750 trx->error_state = DB_SUCCESS;
3751
3752 ib::error() << "row_merge_drop_index_dict failed with error "
3753 << error;
3754 }
3755
3756 trx->op_info = "";
3757}
3758
3759/*********************************************************************//**
3760Drop indexes that were created before an error occurred.
3761The data dictionary must have been locked exclusively by the caller,
3762because the transaction will not be committed. */
3763void
3764row_merge_drop_indexes_dict(
3765/*========================*/
3766 trx_t* trx, /*!< in/out: dictionary transaction */
3767 table_id_t table_id)/*!< in: table identifier */
3768{
3769 static const char sql[] =
3770 "PROCEDURE DROP_INDEXES_PROC () IS\n"
3771 "ixid CHAR;\n"
3772 "found INT;\n"
3773
3774 "DECLARE CURSOR index_cur IS\n"
3775 " SELECT ID FROM SYS_INDEXES\n"
3776 " WHERE TABLE_ID=:tableid AND\n"
3777 " SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3778 "FOR UPDATE;\n"
3779
3780 "BEGIN\n"
3781 "found := 1;\n"
3782 "OPEN index_cur;\n"
3783 "WHILE found = 1 LOOP\n"
3784 " FETCH index_cur INTO ixid;\n"
3785 " IF (SQL % NOTFOUND) THEN\n"
3786 " found := 0;\n"
3787 " ELSE\n"
3788 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3789 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3790 " END IF;\n"
3791 "END LOOP;\n"
3792 "CLOSE index_cur;\n"
3793
3794 "END;\n";
3795 dberr_t error;
3796 pars_info_t* info;
3797
3798 ut_ad(!srv_read_only_mode);
3799 ut_ad(mutex_own(&dict_sys->mutex));
3800 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3801 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3802 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3803
3804 /* It is possible that table->n_ref_count > 1 when
3805 locked=TRUE. In this case, all code that should have an open
3806 handle to the table be waiting for the next statement to execute,
3807 or waiting for a meta-data lock.
3808
3809 A concurrent purge will be prevented by dict_operation_lock. */
3810
3811 info = pars_info_create();
3812 pars_info_add_ull_literal(info, "tableid", table_id);
3813 trx->op_info = "dropping indexes";
3814 error = que_eval_sql(info, sql, FALSE, trx);
3815
3816 switch (error) {
3817 case DB_SUCCESS:
3818 break;
3819 default:
3820 /* Even though we ensure that DDL transactions are WAIT
3821 and DEADLOCK free, we could encounter other errors e.g.,
3822 DB_TOO_MANY_CONCURRENT_TRXS. */
3823 ib::error() << "row_merge_drop_indexes_dict failed with error "
3824 << error;
3825 /* fall through */
3826 case DB_TOO_MANY_CONCURRENT_TRXS:
3827 trx->error_state = DB_SUCCESS;
3828 }
3829
3830 trx->op_info = "";
3831}
3832
3833/*********************************************************************//**
3834Drop indexes that were created before an error occurred.
3835The data dictionary must have been locked exclusively by the caller,
3836because the transaction will not be committed. */
3837void
3838row_merge_drop_indexes(
3839/*===================*/
3840 trx_t* trx, /*!< in/out: dictionary transaction */
3841 dict_table_t* table, /*!< in/out: table containing the indexes */
3842 ibool locked) /*!< in: TRUE=table locked,
3843 FALSE=may need to do a lazy drop */
3844{
3845 dict_index_t* index;
3846 dict_index_t* next_index;
3847
3848 ut_ad(!srv_read_only_mode);
3849 ut_ad(mutex_own(&dict_sys->mutex));
3850 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3851 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3852 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3853
3854 index = dict_table_get_first_index(table);
3855 ut_ad(dict_index_is_clust(index));
3856 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
3857
3858 /* the caller should have an open handle to the table */
3859 ut_ad(table->get_ref_count() >= 1);
3860
3861 /* It is possible that table->n_ref_count > 1 when
3862 locked=TRUE. In this case, all code that should have an open
3863 handle to the table be waiting for the next statement to execute,
3864 or waiting for a meta-data lock.
3865
3866 A concurrent purge will be prevented by dict_operation_lock. */
3867
3868 if (!locked && (table->get_ref_count() > 1
3869 || UT_LIST_GET_FIRST(table->locks))) {
3870 /* We will have to drop the indexes later, when the
3871 table is guaranteed to be no longer in use. Mark the
3872 indexes as incomplete and corrupted, so that other
3873 threads will stop using them. Let dict_table_close()
3874 or crash recovery or the next invocation of
3875 prepare_inplace_alter_table() take care of dropping
3876 the indexes. */
3877
3878 while ((index = dict_table_get_next_index(index)) != NULL) {
3879 ut_ad(!dict_index_is_clust(index));
3880
3881 switch (dict_index_get_online_status(index)) {
3882 case ONLINE_INDEX_ABORTED_DROPPED:
3883 continue;
3884 case ONLINE_INDEX_COMPLETE:
3885 if (index->is_committed()) {
3886 /* Do nothing to already
3887 published indexes. */
3888 } else if (index->type & DICT_FTS) {
3889 /* Drop a completed FULLTEXT
3890 index, due to a timeout during
3891 MDL upgrade for
3892 commit_inplace_alter_table().
3893 Because only concurrent reads
3894 are allowed (and they are not
3895 seeing this index yet) we
3896 are safe to drop the index. */
3897 dict_index_t* prev = UT_LIST_GET_PREV(
3898 indexes, index);
3899 /* At least there should be
3900 the clustered index before
3901 this one. */
3902 ut_ad(prev);
3903 ut_a(table->fts);
3904 fts_drop_index(table, index, trx);
3905 /* Since
3906 INNOBASE_SHARE::idx_trans_tbl
3907 is shared between all open
3908 ha_innobase handles to this
3909 table, no thread should be
3910 accessing this dict_index_t
3911 object. Also, we should be
3912 holding LOCK=SHARED MDL on the
3913 table even after the MDL
3914 upgrade timeout. */
3915
3916 /* We can remove a DICT_FTS
3917 index from the cache, because
3918 we do not allow ADD FULLTEXT INDEX
3919 with LOCK=NONE. If we allowed that,
3920 we should exclude FTS entries from
3921 prebuilt->ins_node->entry_list
3922 in ins_node_create_entry_list(). */
3923 dict_index_remove_from_cache(
3924 table, index);
3925 index = prev;
3926 } else {
3927 rw_lock_x_lock(
3928 dict_index_get_lock(index));
3929 dict_index_set_online_status(
3930 index, ONLINE_INDEX_ABORTED);
3931 index->type |= DICT_CORRUPT;
3932 table->drop_aborted = TRUE;
3933 goto drop_aborted;
3934 }
3935 continue;
3936 case ONLINE_INDEX_CREATION:
3937 rw_lock_x_lock(dict_index_get_lock(index));
3938 ut_ad(!index->is_committed());
3939 row_log_abort_sec(index);
3940 drop_aborted:
3941 rw_lock_x_unlock(dict_index_get_lock(index));
3942
3943 DEBUG_SYNC_C("merge_drop_index_after_abort");
3944 /* covered by dict_sys->mutex */
3945 MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX);
3946 /* fall through */
3947 case ONLINE_INDEX_ABORTED:
3948 /* Drop the index tree from the
3949 data dictionary and free it from
3950 the tablespace, but keep the object
3951 in the data dictionary cache. */
3952 row_merge_drop_index_dict(trx, index->id);
3953 rw_lock_x_lock(dict_index_get_lock(index));
3954 dict_index_set_online_status(
3955 index, ONLINE_INDEX_ABORTED_DROPPED);
3956 rw_lock_x_unlock(dict_index_get_lock(index));
3957 table->drop_aborted = TRUE;
3958 continue;
3959 }
3960 ut_error;
3961 }
3962
3963 return;
3964 }
3965
3966 row_merge_drop_indexes_dict(trx, table->id);
3967
3968 /* Invalidate all row_prebuilt_t::ins_graph that are referring
3969 to this table. That is, force row_get_prebuilt_insert_row() to
3970 rebuild prebuilt->ins_node->entry_list). */
3971 ut_ad(table->def_trx_id <= trx->id);
3972 table->def_trx_id = trx->id;
3973
3974 next_index = dict_table_get_next_index(index);
3975
3976 while ((index = next_index) != NULL) {
3977 /* read the next pointer before freeing the index */
3978 next_index = dict_table_get_next_index(index);
3979
3980 ut_ad(!dict_index_is_clust(index));
3981
3982 if (!index->is_committed()) {
3983 /* If it is FTS index, drop from table->fts
3984 and also drop its auxiliary tables */
3985 if (index->type & DICT_FTS) {
3986 ut_a(table->fts);
3987 fts_drop_index(table, index, trx);
3988 }
3989
3990 switch (dict_index_get_online_status(index)) {
3991 case ONLINE_INDEX_CREATION:
3992 /* This state should only be possible
3993 when prepare_inplace_alter_table() fails
3994 after invoking row_merge_create_index().
3995 In inplace_alter_table(),
3996 row_merge_build_indexes()
3997 should never leave the index in this state.
3998 It would invoke row_log_abort_sec() on
3999 failure. */
4000 case ONLINE_INDEX_COMPLETE:
4001 /* In these cases, we are able to drop
4002 the index straight. The DROP INDEX was
4003 never deferred. */
4004 break;
4005 case ONLINE_INDEX_ABORTED:
4006 case ONLINE_INDEX_ABORTED_DROPPED:
4007 /* covered by dict_sys->mutex */
4008 MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX);
4009 }
4010
4011 dict_index_remove_from_cache(table, index);
4012 }
4013 }
4014
4015 table->drop_aborted = FALSE;
4016 ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE));
4017}
4018
4019/*********************************************************************//**
4020Drop all partially created indexes during crash recovery. */
4021void
4022row_merge_drop_temp_indexes(void)
4023/*=============================*/
4024{
4025 static const char sql[] =
4026 "PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
4027 "ixid CHAR;\n"
4028 "found INT;\n"
4029
4030 "DECLARE CURSOR index_cur IS\n"
4031 " SELECT ID FROM SYS_INDEXES\n"
4032 " WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
4033 "FOR UPDATE;\n"
4034
4035 "BEGIN\n"
4036 "found := 1;\n"
4037 "OPEN index_cur;\n"
4038 "WHILE found = 1 LOOP\n"
4039 " FETCH index_cur INTO ixid;\n"
4040 " IF (SQL % NOTFOUND) THEN\n"
4041 " found := 0;\n"
4042 " ELSE\n"
4043 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
4044 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
4045 " END IF;\n"
4046 "END LOOP;\n"
4047 "CLOSE index_cur;\n"
4048 "END;\n";
4049 trx_t* trx;
4050 dberr_t error;
4051
4052 /* Load the table definitions that contain partially defined
4053 indexes, so that the data dictionary information can be checked
4054 when accessing the tablename.ibd files. */
4055 trx = trx_create();
4056 trx->op_info = "dropping partially created indexes";
4057 row_mysql_lock_data_dictionary(trx);
4058 /* Ensure that this transaction will be rolled back and locks
4059 will be released, if the server gets killed before the commit
4060 gets written to the redo log. */
4061 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
4062
4063 trx->op_info = "dropping indexes";
4064 error = que_eval_sql(NULL, sql, FALSE, trx);
4065
4066 if (error != DB_SUCCESS) {
4067 /* Even though we ensure that DDL transactions are WAIT
4068 and DEADLOCK free, we could encounter other errors e.g.,
4069 DB_TOO_MANY_CONCURRENT_TRXS. */
4070 trx->error_state = DB_SUCCESS;
4071
4072 ib::error() << "row_merge_drop_temp_indexes failed with error"
4073 << error;
4074 }
4075
4076 trx_commit_for_mysql(trx);
4077 row_mysql_unlock_data_dictionary(trx);
4078 trx_free(trx);
4079}
4080
4081
4082/** Create temporary merge files in the given paramater path, and if
4083UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
4084@param[in] path location for creating temporary merge files, or NULL
4085@return File descriptor */
4086pfs_os_file_t
4087row_merge_file_create_low(
4088 const char* path)
4089{
4090 pfs_os_file_t fd;
4091#ifdef UNIV_PFS_IO
4092 /* This temp file open does not go through normal
4093 file APIs, add instrumentation to register with
4094 performance schema */
4095 struct PSI_file_locker* locker = NULL;
4096 PSI_file_locker_state state;
4097
4098 register_pfs_file_open_begin(
4099 &state, locker, innodb_temp_file_key,
4100 PSI_FILE_CREATE,
4101 "Innodb Merge Temp File",
4102 __FILE__, __LINE__);
4103
4104#endif
4105 fd = innobase_mysql_tmpfile(path);
4106#ifdef UNIV_PFS_IO
4107 register_pfs_file_open_end(locker, fd,
4108 (fd == OS_FILE_CLOSED)?NULL:&fd);
4109#endif
4110
4111 if (fd == OS_FILE_CLOSED) {
4112 ib::error() << "Cannot create temporary merge file";
4113 }
4114 return(fd);
4115}
4116
4117
4118/** Create a merge file in the given location.
4119@param[out] merge_file merge file structure
4120@param[in] path location for creating temporary file, or NULL
4121@return file descriptor, or OS_FILE_CLOSED on error */
4122pfs_os_file_t
4123row_merge_file_create(
4124 merge_file_t* merge_file,
4125 const char* path)
4126{
4127 merge_file->fd = row_merge_file_create_low(path);
4128 merge_file->offset = 0;
4129 merge_file->n_rec = 0;
4130
4131 if (merge_file->fd != OS_FILE_CLOSED) {
4132 if (srv_disable_sort_file_cache) {
4133 os_file_set_nocache(merge_file->fd,
4134 "row0merge.cc", "sort");
4135 }
4136 }
4137 return(merge_file->fd);
4138}
4139
4140/*********************************************************************//**
4141Destroy a merge file. And de-register the file from Performance Schema
4142if UNIV_PFS_IO is defined. */
4143void
4144row_merge_file_destroy_low(
4145/*=======================*/
4146 const pfs_os_file_t& fd) /*!< in: merge file descriptor */
4147{
4148 if (fd != OS_FILE_CLOSED) {
4149 os_file_close(fd);
4150 }
4151}
4152/*********************************************************************//**
4153Destroy a merge file. */
4154void
4155row_merge_file_destroy(
4156/*===================*/
4157 merge_file_t* merge_file) /*!< in/out: merge file structure */
4158{
4159 ut_ad(!srv_read_only_mode);
4160
4161 if (merge_file->fd != OS_FILE_CLOSED) {
4162 row_merge_file_destroy_low(merge_file->fd);
4163 merge_file->fd = OS_FILE_CLOSED;
4164 }
4165}
4166
4167/*********************************************************************//**
4168Rename an index in the dictionary that was created. The data
4169dictionary must have been locked exclusively by the caller, because
4170the transaction will not be committed.
4171@return DB_SUCCESS if all OK */
4172dberr_t
4173row_merge_rename_index_to_add(
4174/*==========================*/
4175 trx_t* trx, /*!< in/out: transaction */
4176 table_id_t table_id, /*!< in: table identifier */
4177 index_id_t index_id) /*!< in: index identifier */
4178{
4179 dberr_t err = DB_SUCCESS;
4180 pars_info_t* info = pars_info_create();
4181
4182 /* We use the private SQL parser of Innobase to generate the
4183 query graphs needed in renaming indexes. */
4184
4185 static const char rename_index[] =
4186 "PROCEDURE RENAME_INDEX_PROC () IS\n"
4187 "BEGIN\n"
4188 "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
4189 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
4190 "END;\n";
4191
4192 ut_ad(trx);
4193 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4194 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4195
4196 trx->op_info = "renaming index to add";
4197
4198 pars_info_add_ull_literal(info, "tableid", table_id);
4199 pars_info_add_ull_literal(info, "indexid", index_id);
4200
4201 err = que_eval_sql(info, rename_index, FALSE, trx);
4202
4203 if (err != DB_SUCCESS) {
4204 /* Even though we ensure that DDL transactions are WAIT
4205 and DEADLOCK free, we could encounter other errors e.g.,
4206 DB_TOO_MANY_CONCURRENT_TRXS. */
4207 trx->error_state = DB_SUCCESS;
4208
4209 ib::error() << "row_merge_rename_index_to_add failed with"
4210 " error " << err;
4211 }
4212
4213 trx->op_info = "";
4214
4215 return(err);
4216}
4217
4218/*********************************************************************//**
4219Rename an index in the dictionary that is to be dropped. The data
4220dictionary must have been locked exclusively by the caller, because
4221the transaction will not be committed.
4222@return DB_SUCCESS if all OK */
4223dberr_t
4224row_merge_rename_index_to_drop(
4225/*===========================*/
4226 trx_t* trx, /*!< in/out: transaction */
4227 table_id_t table_id, /*!< in: table identifier */
4228 index_id_t index_id) /*!< in: index identifier */
4229{
4230 dberr_t err;
4231 pars_info_t* info = pars_info_create();
4232
4233 ut_ad(!srv_read_only_mode);
4234
4235 /* We use the private SQL parser of Innobase to generate the
4236 query graphs needed in renaming indexes. */
4237
4238 static const char rename_index[] =
4239 "PROCEDURE RENAME_INDEX_PROC () IS\n"
4240 "BEGIN\n"
4241 "UPDATE SYS_INDEXES SET NAME=CONCAT('"
4242 TEMP_INDEX_PREFIX_STR "',NAME)\n"
4243 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
4244 "END;\n";
4245
4246 ut_ad(trx);
4247 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4248 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4249
4250 trx->op_info = "renaming index to drop";
4251
4252 pars_info_add_ull_literal(info, "tableid", table_id);
4253 pars_info_add_ull_literal(info, "indexid", index_id);
4254
4255 err = que_eval_sql(info, rename_index, FALSE, trx);
4256
4257 if (err != DB_SUCCESS) {
4258 /* Even though we ensure that DDL transactions are WAIT
4259 and DEADLOCK free, we could encounter other errors e.g.,
4260 DB_TOO_MANY_CONCURRENT_TRXS. */
4261 trx->error_state = DB_SUCCESS;
4262
4263 ib::error() << "row_merge_rename_index_to_drop failed with"
4264 " error " << err;
4265 }
4266
4267 trx->op_info = "";
4268
4269 return(err);
4270}
4271
4272/*********************************************************************//**
4273Provide a new pathname for a table that is being renamed if it belongs to
4274a file-per-table tablespace. The caller is responsible for freeing the
4275memory allocated for the return value.
4276@return new pathname of tablespace file, or NULL if space = 0 */
4277char*
4278row_make_new_pathname(
4279/*==================*/
4280 dict_table_t* table, /*!< in: table to be renamed */
4281 const char* new_name) /*!< in: new name */
4282{
4283 ut_ad(!is_system_tablespace(table->space->id));
4284 return os_file_make_new_pathname(table->space->chain.start->name,
4285 new_name);
4286}
4287
4288/*********************************************************************//**
4289Rename the tables in the data dictionary. The data dictionary must
4290have been locked exclusively by the caller, because the transaction
4291will not be committed.
4292@return error code or DB_SUCCESS */
4293dberr_t
4294row_merge_rename_tables_dict(
4295/*=========================*/
4296 dict_table_t* old_table, /*!< in/out: old table, renamed to
4297 tmp_name */
4298 dict_table_t* new_table, /*!< in/out: new table, renamed to
4299 old_table->name */
4300 const char* tmp_name, /*!< in: new name for old_table */
4301 trx_t* trx) /*!< in/out: dictionary transaction */
4302{
4303 dberr_t err = DB_ERROR;
4304 pars_info_t* info;
4305
4306 ut_ad(!srv_read_only_mode);
4307 ut_ad(old_table != new_table);
4308 ut_ad(mutex_own(&dict_sys->mutex));
4309 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4310 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE
4311 || trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4312
4313 trx->op_info = "renaming tables";
4314
4315 /* We use the private SQL parser of Innobase to generate the query
4316 graphs needed in updating the dictionary data in system tables. */
4317
4318 info = pars_info_create();
4319
4320 pars_info_add_str_literal(info, "new_name", new_table->name.m_name);
4321 pars_info_add_str_literal(info, "old_name", old_table->name.m_name);
4322 pars_info_add_str_literal(info, "tmp_name", tmp_name);
4323
4324 err = que_eval_sql(info,
4325 "PROCEDURE RENAME_TABLES () IS\n"
4326 "BEGIN\n"
4327 "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
4328 " WHERE NAME = :old_name;\n"
4329 "UPDATE SYS_TABLES SET NAME = :old_name\n"
4330 " WHERE NAME = :new_name;\n"
4331 "END;\n", FALSE, trx);
4332
4333 /* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being
4334 renamed is a single-table tablespace, which must be implicitly
4335 renamed along with the table. */
4336 if (err == DB_SUCCESS
4337 && old_table->space_id) {
4338 /* Make pathname to update SYS_DATAFILES. */
4339 char* tmp_path = row_make_new_pathname(old_table, tmp_name);
4340
4341 info = pars_info_create();
4342
4343 pars_info_add_str_literal(info, "tmp_name", tmp_name);
4344 pars_info_add_str_literal(info, "tmp_path", tmp_path);
4345 pars_info_add_int4_literal(info, "old_space",
4346 old_table->space_id);
4347
4348 err = que_eval_sql(info,
4349 "PROCEDURE RENAME_OLD_SPACE () IS\n"
4350 "BEGIN\n"
4351 "UPDATE SYS_TABLESPACES"
4352 " SET NAME = :tmp_name\n"
4353 " WHERE SPACE = :old_space;\n"
4354 "UPDATE SYS_DATAFILES"
4355 " SET PATH = :tmp_path\n"
4356 " WHERE SPACE = :old_space;\n"
4357 "END;\n", FALSE, trx);
4358
4359 ut_free(tmp_path);
4360 }
4361
4362 /* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being
4363 renamed is a single-table tablespace, which must be implicitly
4364 renamed along with the table. */
4365 if (err == DB_SUCCESS
4366 && dict_table_is_file_per_table(new_table)) {
4367 /* Make pathname to update SYS_DATAFILES. */
4368 char* old_path = row_make_new_pathname(
4369 new_table, old_table->name.m_name);
4370
4371 info = pars_info_create();
4372
4373 pars_info_add_str_literal(info, "old_name",
4374 old_table->name.m_name);
4375 pars_info_add_str_literal(info, "old_path", old_path);
4376 pars_info_add_int4_literal(info, "new_space",
4377 new_table->space_id);
4378
4379 err = que_eval_sql(info,
4380 "PROCEDURE RENAME_NEW_SPACE () IS\n"
4381 "BEGIN\n"
4382 "UPDATE SYS_TABLESPACES"
4383 " SET NAME = :old_name\n"
4384 " WHERE SPACE = :new_space;\n"
4385 "UPDATE SYS_DATAFILES"
4386 " SET PATH = :old_path\n"
4387 " WHERE SPACE = :new_space;\n"
4388 "END;\n", FALSE, trx);
4389
4390 ut_free(old_path);
4391 }
4392
4393 if (err == DB_SUCCESS && (new_table->flags2 & DICT_TF2_DISCARDED)) {
4394 err = row_import_update_discarded_flag(
4395 trx, new_table->id, true);
4396 }
4397
4398 trx->op_info = "";
4399
4400 return(err);
4401}
4402
4403/** Create the index and load in to the dictionary.
4404@param[in,out] table the index is on this table
4405@param[in] index_def the index definition
4406@param[in] add_v new virtual columns added along with add
4407 index call
4408@return index, or NULL on error */
4409dict_index_t*
4410row_merge_create_index(
4411 dict_table_t* table,
4412 const index_def_t* index_def,
4413 const dict_add_v_col_t* add_v)
4414{
4415 dict_index_t* index;
4416 ulint n_fields = index_def->n_fields;
4417 ulint i;
4418
4419 DBUG_ENTER("row_merge_create_index");
4420
4421 ut_ad(!srv_read_only_mode);
4422
4423 /* Create the index prototype, using the passed in def, this is not
4424 a persistent operation. We pass 0 as the space id, and determine at
4425 a lower level the space id where to store the table. */
4426
4427 index = dict_mem_index_create(table, index_def->name,
4428 index_def->ind_type, n_fields);
4429 index->set_committed(index_def->rebuild);
4430
4431 for (i = 0; i < n_fields; i++) {
4432 const char* name;
4433 index_field_t* ifield = &index_def->fields[i];
4434
4435 if (ifield->is_v_col) {
4436 if (ifield->col_no >= table->n_v_def) {
4437 ut_ad(ifield->col_no < table->n_v_def
4438 + add_v->n_v_col);
4439 ut_ad(ifield->col_no >= table->n_v_def);
4440 name = add_v->v_col_name[
4441 ifield->col_no - table->n_v_def];
4442 index->has_new_v_col = true;
4443 } else {
4444 name = dict_table_get_v_col_name(
4445 table, ifield->col_no);
4446 }
4447 } else {
4448 name = dict_table_get_col_name(table, ifield->col_no);
4449 }
4450
4451 dict_mem_index_add_field(index, name, ifield->prefix_len);
4452 }
4453
4454 DBUG_RETURN(index);
4455}
4456
4457/*********************************************************************//**
4458Check if a transaction can use an index. */
4459bool
4460row_merge_is_index_usable(
4461/*======================*/
4462 const trx_t* trx, /*!< in: transaction */
4463 const dict_index_t* index) /*!< in: index to check */
4464{
4465 if (!index->is_primary()
4466 && dict_index_is_online_ddl(index)) {
4467 /* Indexes that are being created are not useable. */
4468 return(false);
4469 }
4470
4471 return(!index->is_corrupted()
4472 && (index->table->is_temporary()
4473 || index->trx_id == 0
4474 || !trx->read_view.is_open()
4475 || trx->read_view.changes_visible(
4476 index->trx_id,
4477 index->table->name)));
4478}
4479
4480/*********************************************************************//**
4481Drop a table. The caller must have ensured that the background stats
4482thread is not processing the table. This can be done by calling
4483dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and
4484before calling this function.
4485@return DB_SUCCESS or error code */
4486dberr_t
4487row_merge_drop_table(
4488/*=================*/
4489 trx_t* trx, /*!< in: transaction */
4490 dict_table_t* table) /*!< in: table to drop */
4491{
4492 ut_ad(!srv_read_only_mode);
4493
4494 /* There must be no open transactions on the table. */
4495 ut_a(table->get_ref_count() == 0);
4496
4497 return(row_drop_table_for_mysql(table->name.m_name,
4498 trx, false, false, false));
4499}
4500
4501/** Write an MLOG_INDEX_LOAD record to indicate in the redo-log
4502that redo-logging of individual index pages was disabled, and
4503the flushing of such pages to the data files was completed.
4504@param[in] index an index tree on which redo logging was disabled */
4505static
4506void
4507row_merge_write_redo(
4508 const dict_index_t* index)
4509{
4510 mtr_t mtr;
4511 byte* log_ptr;
4512
4513 ut_ad(!index->table->is_temporary());
4514 mtr.start();
4515 log_ptr = mlog_open(&mtr, 11 + 8);
4516 log_ptr = mlog_write_initial_log_record_low(
4517 MLOG_INDEX_LOAD,
4518 index->table->space->id, index->page, log_ptr, &mtr);
4519 mach_write_to_8(log_ptr, index->id);
4520 mlog_close(&mtr, log_ptr + 8);
4521 mtr.commit();
4522}
4523
4524/** Build indexes on a table by reading a clustered index, creating a temporary
4525file containing index entries, merge sorting these index entries and inserting
4526sorted index entries to indexes.
4527@param[in] trx transaction
4528@param[in] old_table table where rows are read from
4529@param[in] new_table table where indexes are created; identical to
4530old_table unless creating a PRIMARY KEY
4531@param[in] online true if creating indexes online
4532@param[in] indexes indexes to be created
4533@param[in] key_numbers MySQL key numbers
4534@param[in] n_indexes size of indexes[]
4535@param[in,out] table MySQL table, for reporting erroneous key value
4536if applicable
4537@param[in] defaults default values of added, changed columns, or NULL
4538@param[in] col_map mapping of old column numbers to new ones, or
4539NULL if old_table == new_table
4540@param[in] add_autoinc number of added AUTO_INCREMENT columns, or
4541ULINT_UNDEFINED if none is added
4542@param[in,out] sequence autoinc sequence
4543@param[in] skip_pk_sort whether the new PRIMARY KEY will follow
4544existing order
4545@param[in,out] stage performance schema accounting object, used by
4546ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of
4547this function and it will be passed to other functions for further accounting.
4548@param[in] add_v new virtual columns added along with indexes
4549@param[in] eval_table mysql table used to evaluate virtual column
4550 value, see innobase_get_computed_value().
4551@param[in] drop_historical whether to drop historical system rows
4552@return DB_SUCCESS or error code */
4553dberr_t
4554row_merge_build_indexes(
4555 trx_t* trx,
4556 dict_table_t* old_table,
4557 dict_table_t* new_table,
4558 bool online,
4559 dict_index_t** indexes,
4560 const ulint* key_numbers,
4561 ulint n_indexes,
4562 struct TABLE* table,
4563 const dtuple_t* defaults,
4564 const ulint* col_map,
4565 ulint add_autoinc,
4566 ib_sequence_t& sequence,
4567 bool skip_pk_sort,
4568 ut_stage_alter_t* stage,
4569 const dict_add_v_col_t* add_v,
4570 struct TABLE* eval_table,
4571 bool drop_historical)
4572{
4573 merge_file_t* merge_files;
4574 row_merge_block_t* block;
4575 ut_new_pfx_t block_pfx;
4576 size_t block_size;
4577 ut_new_pfx_t crypt_pfx;
4578 row_merge_block_t* crypt_block = NULL;
4579 ulint i;
4580 ulint j;
4581 dberr_t error;
4582 pfs_os_file_t tmpfd = OS_FILE_CLOSED;
4583 dict_index_t* fts_sort_idx = NULL;
4584 fts_psort_t* psort_info = NULL;
4585 fts_psort_t* merge_info = NULL;
4586 int64_t sig_count = 0;
4587 bool fts_psort_initiated = false;
4588
4589 double total_static_cost = 0;
4590 double total_dynamic_cost = 0;
4591 ulint total_index_blocks = 0;
4592 double pct_cost=0;
4593 double pct_progress=0;
4594
4595 DBUG_ENTER("row_merge_build_indexes");
4596
4597 ut_ad(!srv_read_only_mode);
4598 ut_ad((old_table == new_table) == !col_map);
4599 ut_ad(!defaults || col_map);
4600
4601 stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table
4602 ? n_indexes - 1
4603 : n_indexes);
4604
4605 /* Allocate memory for merge file data structure and initialize
4606 fields */
4607
4608 ut_allocator<row_merge_block_t> alloc(mem_key_row_merge_sort);
4609
4610 /* This will allocate "3 * srv_sort_buf_size" elements of type
4611 row_merge_block_t. The latter is defined as byte. */
4612 block_size = 3 * srv_sort_buf_size;
4613 block = alloc.allocate_large(block_size, &block_pfx);
4614
4615 if (block == NULL) {
4616 DBUG_RETURN(DB_OUT_OF_MEMORY);
4617 }
4618
4619 TRASH_ALLOC(&crypt_pfx, sizeof crypt_pfx);
4620
4621 if (log_tmp_is_encrypted()) {
4622 crypt_block = static_cast<row_merge_block_t*>(
4623 alloc.allocate_large(block_size,
4624 &crypt_pfx));
4625
4626 if (crypt_block == NULL) {
4627 DBUG_RETURN(DB_OUT_OF_MEMORY);
4628 }
4629 }
4630
4631 trx_start_if_not_started_xa(trx, true);
4632
4633 /* Check if we need a flush observer to flush dirty pages.
4634 Since we disable redo logging in bulk load, so we should flush
4635 dirty pages before online log apply, because online log apply enables
4636 redo logging(we can do further optimization here).
4637 1. online add index: flush dirty pages right before row_log_apply().
4638 2. table rebuild: flush dirty pages before row_log_table_apply().
4639
4640 we use bulk load to create all types of indexes except spatial index,
4641 for which redo logging is enabled. If we create only spatial indexes,
4642 we don't need to flush dirty pages at all. */
4643 bool need_flush_observer = (old_table != new_table);
4644
4645 for (i = 0; i < n_indexes; i++) {
4646 if (!dict_index_is_spatial(indexes[i])) {
4647 need_flush_observer = true;
4648 }
4649 }
4650
4651 FlushObserver* flush_observer = NULL;
4652 if (need_flush_observer) {
4653 flush_observer = UT_NEW_NOKEY(
4654 FlushObserver(new_table->space, trx, stage));
4655
4656 trx_set_flush_observer(trx, flush_observer);
4657 }
4658
4659 merge_files = static_cast<merge_file_t*>(
4660 ut_malloc_nokey(n_indexes * sizeof *merge_files));
4661
4662 /* Initialize all the merge file descriptors, so that we
4663 don't call row_merge_file_destroy() on uninitialized
4664 merge file descriptor */
4665
4666 for (i = 0; i < n_indexes; i++) {
4667 merge_files[i].fd = OS_FILE_CLOSED;
4668 merge_files[i].offset = 0;
4669 }
4670
4671 total_static_cost = COST_BUILD_INDEX_STATIC * n_indexes + COST_READ_CLUSTERED_INDEX;
4672 total_dynamic_cost = COST_BUILD_INDEX_DYNAMIC * n_indexes;
4673 for (i = 0; i < n_indexes; i++) {
4674 if (indexes[i]->type & DICT_FTS) {
4675 ibool opt_doc_id_size = FALSE;
4676
4677 /* To build FTS index, we would need to extract
4678 doc's word, Doc ID, and word's position, so
4679 we need to build a "fts sort index" indexing
4680 on above three 'fields' */
4681 fts_sort_idx = row_merge_create_fts_sort_index(
4682 indexes[i], old_table, &opt_doc_id_size);
4683
4684 row_merge_dup_t* dup
4685 = static_cast<row_merge_dup_t*>(
4686 ut_malloc_nokey(sizeof *dup));
4687 dup->index = fts_sort_idx;
4688 dup->table = table;
4689 dup->col_map = col_map;
4690 dup->n_dup = 0;
4691
4692 /* This can fail e.g. if temporal files can't be
4693 created */
4694 if (!row_fts_psort_info_init(
4695 trx, dup, new_table, opt_doc_id_size,
4696 &psort_info, &merge_info)) {
4697 error = DB_CORRUPTION;
4698 goto func_exit;
4699 }
4700
4701 /* We need to ensure that we free the resources
4702 allocated */
4703 fts_psort_initiated = true;
4704 }
4705 }
4706
4707 if (global_system_variables.log_warnings > 2) {
4708 sql_print_information("InnoDB: Online DDL : Start reading"
4709 " clustered index of the table"
4710 " and create temporary files");
4711 }
4712
4713 pct_cost = COST_READ_CLUSTERED_INDEX * 100 / (total_static_cost + total_dynamic_cost);
4714
4715 /* Do not continue if we can't encrypt table pages */
4716 if (!old_table->is_readable() ||
4717 !new_table->is_readable()) {
4718 error = DB_DECRYPTION_FAILED;
4719 ib_push_warning(trx->mysql_thd, DB_DECRYPTION_FAILED,
4720 "Table %s is encrypted but encryption service or"
4721 " used key_id is not available. "
4722 " Can't continue reading table.",
4723 !old_table->is_readable() ? old_table->name.m_name :
4724 new_table->name.m_name);
4725 goto func_exit;
4726 }
4727
4728 /* Read clustered index of the table and create files for
4729 secondary index entries for merge sort */
4730 error = row_merge_read_clustered_index(
4731 trx, table, old_table, new_table, online, indexes,
4732 fts_sort_idx, psort_info, merge_files, key_numbers,
4733 n_indexes, defaults, add_v, col_map, add_autoinc,
4734 sequence, block, skip_pk_sort, &tmpfd, stage,
4735 pct_cost, crypt_block, eval_table, drop_historical);
4736
4737 stage->end_phase_read_pk();
4738
4739 pct_progress += pct_cost;
4740
4741 if (global_system_variables.log_warnings > 2) {
4742 sql_print_information("InnoDB: Online DDL : End of reading "
4743 "clustered index of the table"
4744 " and create temporary files");
4745 }
4746
4747 for (i = 0; i < n_indexes; i++) {
4748 total_index_blocks += merge_files[i].offset;
4749 }
4750
4751 if (error != DB_SUCCESS) {
4752 goto func_exit;
4753 }
4754
4755 DEBUG_SYNC_C("row_merge_after_scan");
4756
4757 /* Now we have files containing index entries ready for
4758 sorting and inserting. */
4759
4760 for (i = 0; i < n_indexes; i++) {
4761 dict_index_t* sort_idx = indexes[i];
4762
4763 if (dict_index_is_spatial(sort_idx)) {
4764 continue;
4765 }
4766
4767 if (indexes[i]->type & DICT_FTS) {
4768 os_event_t fts_parallel_merge_event;
4769
4770 sort_idx = fts_sort_idx;
4771
4772 fts_parallel_merge_event
4773 = merge_info[0].psort_common->merge_event;
4774
4775 if (FTS_PLL_MERGE) {
4776 ulint trial_count = 0;
4777 bool all_exit = false;
4778
4779 os_event_reset(fts_parallel_merge_event);
4780 row_fts_start_parallel_merge(merge_info);
4781wait_again:
4782 os_event_wait_time_low(
4783 fts_parallel_merge_event, 1000000,
4784 sig_count);
4785
4786 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
4787 if (merge_info[j].child_status
4788 != FTS_CHILD_COMPLETE
4789 && merge_info[j].child_status
4790 != FTS_CHILD_EXITING) {
4791 sig_count = os_event_reset(
4792 fts_parallel_merge_event);
4793
4794 goto wait_again;
4795 }
4796 }
4797
4798 /* Now all children should complete, wait
4799 a bit until they all finish using event */
4800 while (!all_exit && trial_count < 10000) {
4801 all_exit = true;
4802
4803 for (j = 0; j < FTS_NUM_AUX_INDEX;
4804 j++) {
4805 if (merge_info[j].child_status
4806 != FTS_CHILD_EXITING) {
4807 all_exit = false;
4808 os_thread_sleep(1000);
4809 break;
4810 }
4811 }
4812 trial_count++;
4813 }
4814
4815 if (!all_exit) {
4816 ib::error() << "Not all child merge"
4817 " threads exited when creating"
4818 " FTS index '"
4819 << indexes[i]->name << "'";
4820 } else {
4821 for (j = 0; j < FTS_NUM_AUX_INDEX;
4822 j++) {
4823
4824 os_thread_join(merge_info[j]
4825 .thread_hdl);
4826 }
4827 }
4828 } else {
4829 /* This cannot report duplicates; an
4830 assertion would fail in that case. */
4831 error = row_fts_merge_insert(
4832 sort_idx, new_table,
4833 psort_info, 0);
4834 }
4835
4836#ifdef FTS_INTERNAL_DIAG_PRINT
4837 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
4838#endif
4839 } else if (merge_files[i].fd != OS_FILE_CLOSED) {
4840 char buf[NAME_LEN + 1];
4841 row_merge_dup_t dup = {
4842 sort_idx, table, col_map, 0};
4843
4844 pct_cost = (COST_BUILD_INDEX_STATIC +
4845 (total_dynamic_cost * merge_files[i].offset /
4846 total_index_blocks)) /
4847 (total_static_cost + total_dynamic_cost)
4848 * PCT_COST_MERGESORT_INDEX * 100;
4849 char* bufend = innobase_convert_name(
4850 buf, sizeof buf,
4851 indexes[i]->name,
4852 strlen(indexes[i]->name),
4853 trx->mysql_thd);
4854 buf[bufend - buf]='\0';
4855
4856 if (global_system_variables.log_warnings > 2) {
4857 sql_print_information("InnoDB: Online DDL :"
4858 " Start merge-sorting"
4859 " index %s"
4860 " (" ULINTPF
4861 " / " ULINTPF "),"
4862 " estimated cost :"
4863 " %2.4f",
4864 buf, i + 1, n_indexes,
4865 pct_cost);
4866 }
4867
4868 error = row_merge_sort(
4869 trx, &dup, &merge_files[i],
4870 block, &tmpfd, true,
4871 pct_progress, pct_cost,
4872 crypt_block, new_table->space->id,
4873 stage);
4874
4875 pct_progress += pct_cost;
4876
4877 if (global_system_variables.log_warnings > 2) {
4878 sql_print_information("InnoDB: Online DDL :"
4879 " End of "
4880 " merge-sorting index %s"
4881 " (" ULINTPF
4882 " / " ULINTPF ")",
4883 buf, i + 1, n_indexes);
4884 }
4885
4886 DBUG_EXECUTE_IF(
4887 "ib_merge_wait_after_sort",
4888 os_thread_sleep(20000000);); /* 20 sec */
4889
4890 if (error == DB_SUCCESS) {
4891 BtrBulk btr_bulk(sort_idx, trx->id,
4892 flush_observer);
4893 btr_bulk.init();
4894
4895 pct_cost = (COST_BUILD_INDEX_STATIC +
4896 (total_dynamic_cost * merge_files[i].offset /
4897 total_index_blocks)) /
4898 (total_static_cost + total_dynamic_cost) *
4899 PCT_COST_INSERT_INDEX * 100;
4900
4901 if (global_system_variables.log_warnings > 2) {
4902 sql_print_information(
4903 "InnoDB: Online DDL : Start "
4904 "building index %s"
4905 " (" ULINTPF
4906 " / " ULINTPF "), estimated "
4907 "cost : %2.4f", buf, i + 1,
4908 n_indexes, pct_cost);
4909 }
4910
4911 error = row_merge_insert_index_tuples(
4912 sort_idx, old_table,
4913 merge_files[i].fd, block, NULL,
4914 &btr_bulk,
4915 merge_files[i].n_rec, pct_progress, pct_cost,
4916 crypt_block, new_table->space->id,
4917 stage);
4918
4919 error = btr_bulk.finish(error);
4920
4921 pct_progress += pct_cost;
4922
4923 if (global_system_variables.log_warnings > 2) {
4924 sql_print_information(
4925 "InnoDB: Online DDL : "
4926 "End of building index %s"
4927 " (" ULINTPF " / " ULINTPF ")",
4928 buf, i + 1, n_indexes);
4929 }
4930 }
4931 }
4932
4933 /* Close the temporary file to free up space. */
4934 row_merge_file_destroy(&merge_files[i]);
4935
4936 if (indexes[i]->type & DICT_FTS) {
4937 row_fts_psort_info_destroy(psort_info, merge_info);
4938 fts_psort_initiated = false;
4939 } else if (error != DB_SUCCESS || !online) {
4940 /* Do not apply any online log. */
4941 } else if (old_table != new_table) {
4942 ut_ad(!sort_idx->online_log);
4943 ut_ad(sort_idx->online_status
4944 == ONLINE_INDEX_COMPLETE);
4945 } else {
4946 ut_ad(need_flush_observer);
4947 if (global_system_variables.log_warnings > 2) {
4948 sql_print_information(
4949 "InnoDB: Online DDL : Applying"
4950 " log to index");
4951 }
4952 flush_observer->flush();
4953 row_merge_write_redo(indexes[i]);
4954
4955 DEBUG_SYNC_C("row_log_apply_before");
4956 error = row_log_apply(trx, sort_idx, table, stage);
4957 DEBUG_SYNC_C("row_log_apply_after");
4958 }
4959
4960 if (error != DB_SUCCESS) {
4961 trx->error_key_num = key_numbers[i];
4962 goto func_exit;
4963 }
4964
4965 if (indexes[i]->type & DICT_FTS && fts_enable_diag_print) {
4966 ib::info() << "Finished building full-text index "
4967 << indexes[i]->name;
4968 }
4969 }
4970
4971func_exit:
4972
4973 DBUG_EXECUTE_IF(
4974 "ib_build_indexes_too_many_concurrent_trxs",
4975 error = DB_TOO_MANY_CONCURRENT_TRXS;
4976 trx->error_state = error;);
4977
4978 if (fts_psort_initiated) {
4979 /* Clean up FTS psort related resource */
4980 row_fts_psort_info_destroy(psort_info, merge_info);
4981 fts_psort_initiated = false;
4982 }
4983
4984 row_merge_file_destroy_low(tmpfd);
4985
4986 for (i = 0; i < n_indexes; i++) {
4987 row_merge_file_destroy(&merge_files[i]);
4988 }
4989
4990 if (fts_sort_idx) {
4991 dict_mem_index_free(fts_sort_idx);
4992 }
4993
4994 ut_free(merge_files);
4995
4996 alloc.deallocate_large(block, &block_pfx, block_size);
4997
4998 if (crypt_block) {
4999 alloc.deallocate_large(crypt_block, &crypt_pfx, block_size);
5000 }
5001
5002 DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
5003
5004 if (online && old_table == new_table && error != DB_SUCCESS) {
5005 /* On error, flag all online secondary index creation
5006 as aborted. */
5007 for (i = 0; i < n_indexes; i++) {
5008 ut_ad(!(indexes[i]->type & DICT_FTS));
5009 ut_ad(!indexes[i]->is_committed());
5010 ut_ad(!dict_index_is_clust(indexes[i]));
5011
5012 /* Completed indexes should be dropped as
5013 well, and indexes whose creation was aborted
5014 should be dropped from the persistent
5015 storage. However, at this point we can only
5016 set some flags in the not-yet-published
5017 indexes. These indexes will be dropped later
5018 in row_merge_drop_indexes(), called by
5019 rollback_inplace_alter_table(). */
5020
5021 switch (dict_index_get_online_status(indexes[i])) {
5022 case ONLINE_INDEX_COMPLETE:
5023 break;
5024 case ONLINE_INDEX_CREATION:
5025 rw_lock_x_lock(
5026 dict_index_get_lock(indexes[i]));
5027 row_log_abort_sec(indexes[i]);
5028 indexes[i]->type |= DICT_CORRUPT;
5029 rw_lock_x_unlock(
5030 dict_index_get_lock(indexes[i]));
5031 new_table->drop_aborted = TRUE;
5032 /* fall through */
5033 case ONLINE_INDEX_ABORTED_DROPPED:
5034 case ONLINE_INDEX_ABORTED:
5035 MONITOR_ATOMIC_INC(
5036 MONITOR_BACKGROUND_DROP_INDEX);
5037 }
5038 }
5039 }
5040
5041 DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
5042
5043 if (flush_observer != NULL) {
5044 ut_ad(need_flush_observer);
5045
5046 DBUG_EXECUTE_IF("ib_index_build_fail_before_flush",
5047 error = DB_INTERRUPTED;
5048 );
5049
5050 if (error != DB_SUCCESS) {
5051 flush_observer->interrupted();
5052 }
5053
5054 flush_observer->flush();
5055
5056 UT_DELETE(flush_observer);
5057
5058 if (trx_is_interrupted(trx)) {
5059 error = DB_INTERRUPTED;
5060 }
5061
5062 if (error == DB_SUCCESS && old_table != new_table) {
5063 for (const dict_index_t* index
5064 = dict_table_get_first_index(new_table);
5065 index != NULL;
5066 index = dict_table_get_next_index(index)) {
5067 row_merge_write_redo(index);
5068 }
5069 }
5070 }
5071
5072 DBUG_RETURN(error);
5073}
5074