1/*****************************************************************************
2
3Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2016, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file dict/dict0load.cc
22Loads to the memory cache database object definitions
23from dictionary tables
24
25Created 4/24/1996 Heikki Tuuri
26*******************************************************/
27
28#include "ha_prototypes.h"
29
30#include "dict0load.h"
31
32#include "mysql_version.h"
33#include "btr0pcur.h"
34#include "btr0btr.h"
35#include "dict0boot.h"
36#include "dict0crea.h"
37#include "dict0dict.h"
38#include "dict0mem.h"
39#include "dict0priv.h"
40#include "dict0stats.h"
41#include "fsp0file.h"
42#include "fsp0sysspace.h"
43#include "fts0priv.h"
44#include "mach0data.h"
45#include "page0page.h"
46#include "rem0cmp.h"
47#include "srv0start.h"
48#include "srv0srv.h"
49#include <stack>
50#include <set>
51
52/** Following are the InnoDB system tables. The positions in
53this array are referenced by enum dict_system_table_id. */
54static const char* SYSTEM_TABLE_NAME[] = {
55 "SYS_TABLES",
56 "SYS_INDEXES",
57 "SYS_COLUMNS",
58 "SYS_FIELDS",
59 "SYS_FOREIGN",
60 "SYS_FOREIGN_COLS",
61 "SYS_TABLESPACES",
62 "SYS_DATAFILES",
63 "SYS_VIRTUAL"
64};
65
66/** Loads a table definition and also all its index definitions.
67
68Loads those foreign key constraints whose referenced table is already in
69dictionary cache. If a foreign key constraint is not loaded, then the
70referenced table is pushed into the output stack (fk_tables), if it is not
71NULL. These tables must be subsequently loaded so that all the foreign
72key constraints are loaded into memory.
73
74@param[in] name Table name in the db/tablename format
75@param[in] cached true=add to cache, false=do not
76@param[in] ignore_err Error to be ignored when loading table
77 and its index definition
78@param[out] fk_tables Related table names that must also be
79 loaded to ensure that all foreign key
80 constraints are loaded.
81@return table, NULL if does not exist; if the table is stored in an
82.ibd file, but the file does not exist, then we set the
83file_unreadable flag in the table object we return */
84static
85dict_table_t*
86dict_load_table_one(
87 table_name_t& name,
88 bool cached,
89 dict_err_ignore_t ignore_err,
90 dict_names_t& fk_tables);
91
92/** Load a table definition from a SYS_TABLES record to dict_table_t.
93Do not load any columns or indexes.
94@param[in] name Table name
95@param[in] rec SYS_TABLES record
96@param[out,own] table table, or NULL
97@return error message
98@retval NULL on success */
99static
100const char*
101dict_load_table_low(table_name_t& name, const rec_t* rec, dict_table_t** table)
102 MY_ATTRIBUTE((nonnull, warn_unused_result));
103
104/** Load an index definition from a SYS_INDEXES record to dict_index_t.
105If allocate=TRUE, we will create a dict_index_t structure and fill it
106accordingly. If allocated=FALSE, the dict_index_t will be supplied by
107the caller and filled with information read from the record.
108@return error message
109@retval NULL on success */
110static
111const char*
112dict_load_index_low(
113 byte* table_id, /*!< in/out: table id (8 bytes),
114 an "in" value if allocate=TRUE
115 and "out" when allocate=FALSE */
116 mem_heap_t* heap, /*!< in/out: temporary memory heap */
117 const rec_t* rec, /*!< in: SYS_INDEXES record */
118 ibool allocate, /*!< in: TRUE=allocate *index,
119 FALSE=fill in a pre-allocated
120 *index */
121 dict_index_t** index); /*!< out,own: index, or NULL */
122
123/** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
124@return error message
125@retval NULL on success */
126static
127const char*
128dict_load_column_low(
129 dict_table_t* table, /*!< in/out: table, could be NULL
130 if we just populate a dict_column_t
131 struct with information from
132 a SYS_COLUMNS record */
133 mem_heap_t* heap, /*!< in/out: memory heap
134 for temporary storage */
135 dict_col_t* column, /*!< out: dict_column_t to fill,
136 or NULL if table != NULL */
137 table_id_t* table_id, /*!< out: table id */
138 const char** col_name, /*!< out: column name */
139 const rec_t* rec, /*!< in: SYS_COLUMNS record */
140 ulint* nth_v_col); /*!< out: if not NULL, this
141 records the "n" of "nth" virtual
142 column */
143
144/** Load a virtual column "mapping" (to base columns) information
145from a SYS_VIRTUAL record
146@param[in,out] table table
147@param[in,out] column mapped base column's dict_column_t
148@param[in,out] table_id table id
149@param[in,out] pos virtual column position
150@param[in,out] base_pos base column position
151@param[in] rec SYS_VIRTUAL record
152@return error message
153@retval NULL on success */
154static
155const char*
156dict_load_virtual_low(
157 dict_table_t* table,
158 dict_col_t** column,
159 table_id_t* table_id,
160 ulint* pos,
161 ulint* base_pos,
162 const rec_t* rec);
163
164/** Load an index field definition from a SYS_FIELDS record to dict_index_t.
165@return error message
166@retval NULL on success */
167static
168const char*
169dict_load_field_low(
170 byte* index_id, /*!< in/out: index id (8 bytes)
171 an "in" value if index != NULL
172 and "out" if index == NULL */
173 dict_index_t* index, /*!< in/out: index, could be NULL
174 if we just populate a dict_field_t
175 struct with information from
176 a SYS_FIELDS record */
177 dict_field_t* sys_field, /*!< out: dict_field_t to be
178 filled */
179 ulint* pos, /*!< out: Field position */
180 byte* last_index_id, /*!< in: last index id */
181 mem_heap_t* heap, /*!< in/out: memory heap
182 for temporary storage */
183 const rec_t* rec); /*!< in: SYS_FIELDS record */
184
185/* If this flag is TRUE, then we will load the cluster index's (and tables')
186metadata even if it is marked as "corrupted". */
187my_bool srv_load_corrupted;
188
189#ifdef UNIV_DEBUG
190/****************************************************************//**
191Compare the name of an index column.
192@return TRUE if the i'th column of index is 'name'. */
193static
194ibool
195name_of_col_is(
196/*===========*/
197 const dict_table_t* table, /*!< in: table */
198 const dict_index_t* index, /*!< in: index */
199 ulint i, /*!< in: index field offset */
200 const char* name) /*!< in: name to compare to */
201{
202 ulint tmp = dict_col_get_no(dict_field_get_col(
203 dict_index_get_nth_field(
204 index, i)));
205
206 return(strcmp(name, dict_table_get_col_name(table, tmp)) == 0);
207}
208#endif /* UNIV_DEBUG */
209
210/********************************************************************//**
211Finds the first table name in the given database.
212@return own: table name, NULL if does not exist; the caller must free
213the memory in the string! */
214char*
215dict_get_first_table_name_in_db(
216/*============================*/
217 const char* name) /*!< in: database name which ends in '/' */
218{
219 dict_table_t* sys_tables;
220 btr_pcur_t pcur;
221 dict_index_t* sys_index;
222 dtuple_t* tuple;
223 mem_heap_t* heap;
224 dfield_t* dfield;
225 const rec_t* rec;
226 const byte* field;
227 ulint len;
228 mtr_t mtr;
229
230 ut_ad(mutex_own(&dict_sys->mutex));
231
232 heap = mem_heap_create(1000);
233
234 mtr_start(&mtr);
235
236 sys_tables = dict_table_get_low("SYS_TABLES");
237 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
238 ut_ad(!dict_table_is_comp(sys_tables));
239
240 tuple = dtuple_create(heap, 1);
241 dfield = dtuple_get_nth_field(tuple, 0);
242
243 dfield_set_data(dfield, name, ut_strlen(name));
244 dict_index_copy_types(tuple, sys_index, 1);
245
246 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
247 BTR_SEARCH_LEAF, &pcur, &mtr);
248loop:
249 rec = btr_pcur_get_rec(&pcur);
250
251 if (!btr_pcur_is_on_user_rec(&pcur)) {
252 /* Not found */
253
254 btr_pcur_close(&pcur);
255 mtr_commit(&mtr);
256 mem_heap_free(heap);
257
258 return(NULL);
259 }
260
261 field = rec_get_nth_field_old(
262 rec, DICT_FLD__SYS_TABLES__NAME, &len);
263
264 if (len < strlen(name)
265 || ut_memcmp(name, field, strlen(name)) != 0) {
266 /* Not found */
267
268 btr_pcur_close(&pcur);
269 mtr_commit(&mtr);
270 mem_heap_free(heap);
271
272 return(NULL);
273 }
274
275 if (!rec_get_deleted_flag(rec, 0)) {
276
277 /* We found one */
278
279 char* table_name = mem_strdupl((char*) field, len);
280
281 btr_pcur_close(&pcur);
282 mtr_commit(&mtr);
283 mem_heap_free(heap);
284
285 return(table_name);
286 }
287
288 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
289
290 goto loop;
291}
292
293/********************************************************************//**
294This function gets the next system table record as it scans the table.
295@return the next record if found, NULL if end of scan */
296static
297const rec_t*
298dict_getnext_system_low(
299/*====================*/
300 btr_pcur_t* pcur, /*!< in/out: persistent cursor to the
301 record*/
302 mtr_t* mtr) /*!< in: the mini-transaction */
303{
304 rec_t* rec = NULL;
305
306 while (!rec || rec_get_deleted_flag(rec, 0)) {
307 btr_pcur_move_to_next_user_rec(pcur, mtr);
308
309 rec = btr_pcur_get_rec(pcur);
310
311 if (!btr_pcur_is_on_user_rec(pcur)) {
312 /* end of index */
313 btr_pcur_close(pcur);
314
315 return(NULL);
316 }
317 }
318
319 /* Get a record, let's save the position */
320 btr_pcur_store_position(pcur, mtr);
321
322 return(rec);
323}
324
325/********************************************************************//**
326This function opens a system table, and returns the first record.
327@return first record of the system table */
328const rec_t*
329dict_startscan_system(
330/*==================*/
331 btr_pcur_t* pcur, /*!< out: persistent cursor to
332 the record */
333 mtr_t* mtr, /*!< in: the mini-transaction */
334 dict_system_id_t system_id) /*!< in: which system table to open */
335{
336 dict_table_t* system_table;
337 dict_index_t* clust_index;
338 const rec_t* rec;
339
340 ut_a(system_id < SYS_NUM_SYSTEM_TABLES);
341
342 system_table = dict_table_get_low(SYSTEM_TABLE_NAME[system_id]);
343
344 clust_index = UT_LIST_GET_FIRST(system_table->indexes);
345
346 btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur,
347 true, 0, mtr);
348
349 rec = dict_getnext_system_low(pcur, mtr);
350
351 return(rec);
352}
353
354/********************************************************************//**
355This function gets the next system table record as it scans the table.
356@return the next record if found, NULL if end of scan */
357const rec_t*
358dict_getnext_system(
359/*================*/
360 btr_pcur_t* pcur, /*!< in/out: persistent cursor
361 to the record */
362 mtr_t* mtr) /*!< in: the mini-transaction */
363{
364 const rec_t* rec;
365
366 /* Restore the position */
367 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
368
369 /* Get the next record */
370 rec = dict_getnext_system_low(pcur, mtr);
371
372 return(rec);
373}
374
375/********************************************************************//**
376This function processes one SYS_TABLES record and populate the dict_table_t
377struct for the table.
378@return error message, or NULL on success */
379const char*
380dict_process_sys_tables_rec_and_mtr_commit(
381/*=======================================*/
382 mem_heap_t* heap, /*!< in/out: temporary memory heap */
383 const rec_t* rec, /*!< in: SYS_TABLES record */
384 dict_table_t** table, /*!< out: dict_table_t to fill */
385 bool cached, /*!< in: whether to load from cache */
386 mtr_t* mtr) /*!< in/out: mini-transaction,
387 will be committed */
388{
389 ulint len;
390 const char* field;
391 table_name_t table_name;
392
393 field = (const char*) rec_get_nth_field_old(
394 rec, DICT_FLD__SYS_TABLES__NAME, &len);
395
396 ut_a(!rec_get_deleted_flag(rec, 0));
397
398 ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
399
400 /* Get the table name */
401 table_name.m_name = mem_heap_strdupl(heap, field, len);
402
403 if (cached) {
404 /* Commit before load the table again */
405 mtr_commit(mtr);
406
407 *table = dict_table_get_low(table_name.m_name);
408 return *table ? NULL : "Table not found in cache";
409 } else {
410 const char* err = dict_load_table_low(table_name, rec, table);
411 mtr_commit(mtr);
412 return err;
413 }
414}
415
416/********************************************************************//**
417This function parses a SYS_INDEXES record and populate a dict_index_t
418structure with the information from the record. For detail information
419about SYS_INDEXES fields, please refer to dict_boot() function.
420@return error message, or NULL on success */
421const char*
422dict_process_sys_indexes_rec(
423/*=========================*/
424 mem_heap_t* heap, /*!< in/out: heap memory */
425 const rec_t* rec, /*!< in: current SYS_INDEXES rec */
426 dict_index_t* index, /*!< out: index to be filled */
427 table_id_t* table_id) /*!< out: index table id */
428{
429 const char* err_msg;
430 byte* buf;
431
432 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
433
434 /* Parse the record, and get "dict_index_t" struct filled */
435 err_msg = dict_load_index_low(buf, heap, rec, FALSE, &index);
436
437 *table_id = mach_read_from_8(buf);
438
439 return(err_msg);
440}
441
442/********************************************************************//**
443This function parses a SYS_COLUMNS record and populate a dict_column_t
444structure with the information from the record.
445@return error message, or NULL on success */
446const char*
447dict_process_sys_columns_rec(
448/*=========================*/
449 mem_heap_t* heap, /*!< in/out: heap memory */
450 const rec_t* rec, /*!< in: current SYS_COLUMNS rec */
451 dict_col_t* column, /*!< out: dict_col_t to be filled */
452 table_id_t* table_id, /*!< out: table id */
453 const char** col_name, /*!< out: column name */
454 ulint* nth_v_col) /*!< out: if virtual col, this is
455 record's sequence number */
456{
457 const char* err_msg;
458
459 /* Parse the record, and get "dict_col_t" struct filled */
460 err_msg = dict_load_column_low(NULL, heap, column,
461 table_id, col_name, rec, nth_v_col);
462
463 return(err_msg);
464}
465
466/** This function parses a SYS_VIRTUAL record and extracts virtual column
467information
468@param[in] rec current SYS_COLUMNS rec
469@param[in,out] table_id table id
470@param[in,out] pos virtual column position
471@param[in,out] base_pos base column position
472@return error message, or NULL on success */
473const char*
474dict_process_sys_virtual_rec(
475 const rec_t* rec,
476 table_id_t* table_id,
477 ulint* pos,
478 ulint* base_pos)
479{
480 const char* err_msg;
481
482 /* Parse the record, and get "dict_col_t" struct filled */
483 err_msg = dict_load_virtual_low(NULL, NULL, table_id,
484 pos, base_pos, rec);
485
486 return(err_msg);
487}
488
489/********************************************************************//**
490This function parses a SYS_FIELDS record and populates a dict_field_t
491structure with the information from the record.
492@return error message, or NULL on success */
493const char*
494dict_process_sys_fields_rec(
495/*========================*/
496 mem_heap_t* heap, /*!< in/out: heap memory */
497 const rec_t* rec, /*!< in: current SYS_FIELDS rec */
498 dict_field_t* sys_field, /*!< out: dict_field_t to be
499 filled */
500 ulint* pos, /*!< out: Field position */
501 index_id_t* index_id, /*!< out: current index id */
502 index_id_t last_id) /*!< in: previous index id */
503{
504 byte* buf;
505 byte* last_index_id;
506 const char* err_msg;
507
508 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
509
510 last_index_id = static_cast<byte*>(mem_heap_alloc(heap, 8));
511 mach_write_to_8(last_index_id, last_id);
512
513 err_msg = dict_load_field_low(buf, NULL, sys_field,
514 pos, last_index_id, heap, rec);
515
516 *index_id = mach_read_from_8(buf);
517
518 return(err_msg);
519
520}
521
522/********************************************************************//**
523This function parses a SYS_FOREIGN record and populate a dict_foreign_t
524structure with the information from the record. For detail information
525about SYS_FOREIGN fields, please refer to dict_load_foreign() function.
526@return error message, or NULL on success */
527const char*
528dict_process_sys_foreign_rec(
529/*=========================*/
530 mem_heap_t* heap, /*!< in/out: heap memory */
531 const rec_t* rec, /*!< in: current SYS_FOREIGN rec */
532 dict_foreign_t* foreign) /*!< out: dict_foreign_t struct
533 to be filled */
534{
535 ulint len;
536 const byte* field;
537 ulint n_fields_and_type;
538
539 if (rec_get_deleted_flag(rec, 0)) {
540 return("delete-marked record in SYS_FOREIGN");
541 }
542
543 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN) {
544 return("wrong number of columns in SYS_FOREIGN record");
545 }
546
547 field = rec_get_nth_field_old(
548 rec, DICT_FLD__SYS_FOREIGN__ID, &len);
549 if (len == 0 || len == UNIV_SQL_NULL) {
550err_len:
551 return("incorrect column length in SYS_FOREIGN");
552 }
553
554 /* This receives a dict_foreign_t* that points to a stack variable.
555 So dict_foreign_free(foreign) is not used as elsewhere.
556 Since the heap used here is freed elsewhere, foreign->heap
557 is not assigned. */
558 foreign->id = mem_heap_strdupl(heap, (const char*) field, len);
559
560 rec_get_nth_field_offs_old(
561 rec, DICT_FLD__SYS_FOREIGN__DB_TRX_ID, &len);
562 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
563 goto err_len;
564 }
565 rec_get_nth_field_offs_old(
566 rec, DICT_FLD__SYS_FOREIGN__DB_ROLL_PTR, &len);
567 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
568 goto err_len;
569 }
570
571 /* The _lookup versions of the referenced and foreign table names
572 are not assigned since they are not used in this dict_foreign_t */
573
574 field = rec_get_nth_field_old(
575 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
576 if (len == 0 || len == UNIV_SQL_NULL) {
577 goto err_len;
578 }
579 foreign->foreign_table_name = mem_heap_strdupl(
580 heap, (const char*) field, len);
581
582 field = rec_get_nth_field_old(
583 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
584 if (len == 0 || len == UNIV_SQL_NULL) {
585 goto err_len;
586 }
587 foreign->referenced_table_name = mem_heap_strdupl(
588 heap, (const char*) field, len);
589
590 field = rec_get_nth_field_old(
591 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len);
592 if (len != 4) {
593 goto err_len;
594 }
595 n_fields_and_type = mach_read_from_4(field);
596
597 foreign->type = (unsigned int) (n_fields_and_type >> 24);
598 foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
599
600 return(NULL);
601}
602
603/********************************************************************//**
604This function parses a SYS_FOREIGN_COLS record and extract necessary
605information from the record and return to caller.
606@return error message, or NULL on success */
607const char*
608dict_process_sys_foreign_col_rec(
609/*=============================*/
610 mem_heap_t* heap, /*!< in/out: heap memory */
611 const rec_t* rec, /*!< in: current SYS_FOREIGN_COLS rec */
612 const char** name, /*!< out: foreign key constraint name */
613 const char** for_col_name, /*!< out: referencing column name */
614 const char** ref_col_name, /*!< out: referenced column name
615 in referenced table */
616 ulint* pos) /*!< out: column position */
617{
618 ulint len;
619 const byte* field;
620
621 if (rec_get_deleted_flag(rec, 0)) {
622 return("delete-marked record in SYS_FOREIGN_COLS");
623 }
624
625 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FOREIGN_COLS) {
626 return("wrong number of columns in SYS_FOREIGN_COLS record");
627 }
628
629 field = rec_get_nth_field_old(
630 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
631 if (len == 0 || len == UNIV_SQL_NULL) {
632err_len:
633 return("incorrect column length in SYS_FOREIGN_COLS");
634 }
635 *name = mem_heap_strdupl(heap, (char*) field, len);
636
637 field = rec_get_nth_field_old(
638 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
639 if (len != 4) {
640 goto err_len;
641 }
642 *pos = mach_read_from_4(field);
643
644 rec_get_nth_field_offs_old(
645 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_TRX_ID, &len);
646 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
647 goto err_len;
648 }
649 rec_get_nth_field_offs_old(
650 rec, DICT_FLD__SYS_FOREIGN_COLS__DB_ROLL_PTR, &len);
651 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
652 goto err_len;
653 }
654
655 field = rec_get_nth_field_old(
656 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
657 if (len == 0 || len == UNIV_SQL_NULL) {
658 goto err_len;
659 }
660 *for_col_name = mem_heap_strdupl(heap, (char*) field, len);
661
662 field = rec_get_nth_field_old(
663 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
664 if (len == 0 || len == UNIV_SQL_NULL) {
665 goto err_len;
666 }
667 *ref_col_name = mem_heap_strdupl(heap, (char*) field, len);
668
669 return(NULL);
670}
671
672/********************************************************************//**
673This function parses a SYS_TABLESPACES record, extracts necessary
674information from the record and returns to caller.
675@return error message, or NULL on success */
676const char*
677dict_process_sys_tablespaces(
678/*=========================*/
679 mem_heap_t* heap, /*!< in/out: heap memory */
680 const rec_t* rec, /*!< in: current SYS_TABLESPACES rec */
681 ulint* space, /*!< out: space id */
682 const char** name, /*!< out: tablespace name */
683 ulint* flags) /*!< out: tablespace flags */
684{
685 ulint len;
686 const byte* field;
687
688 /* Initialize the output values */
689 *space = ULINT_UNDEFINED;
690 *name = NULL;
691 *flags = ULINT_UNDEFINED;
692
693 if (rec_get_deleted_flag(rec, 0)) {
694 return("delete-marked record in SYS_TABLESPACES");
695 }
696
697 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLESPACES) {
698 return("wrong number of columns in SYS_TABLESPACES record");
699 }
700
701 field = rec_get_nth_field_old(
702 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
703 if (len != DICT_FLD_LEN_SPACE) {
704err_len:
705 return("incorrect column length in SYS_TABLESPACES");
706 }
707 *space = mach_read_from_4(field);
708
709 rec_get_nth_field_offs_old(
710 rec, DICT_FLD__SYS_TABLESPACES__DB_TRX_ID, &len);
711 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
712 goto err_len;
713 }
714
715 rec_get_nth_field_offs_old(
716 rec, DICT_FLD__SYS_TABLESPACES__DB_ROLL_PTR, &len);
717 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
718 goto err_len;
719 }
720
721 field = rec_get_nth_field_old(
722 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
723 if (len == 0 || len == UNIV_SQL_NULL) {
724 goto err_len;
725 }
726 *name = mem_heap_strdupl(heap, (char*) field, len);
727
728 field = rec_get_nth_field_old(
729 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
730 if (len != DICT_FLD_LEN_FLAGS) {
731 goto err_len;
732 }
733 *flags = mach_read_from_4(field);
734
735 return(NULL);
736}
737
738/********************************************************************//**
739This function parses a SYS_DATAFILES record, extracts necessary
740information from the record and returns it to the caller.
741@return error message, or NULL on success */
742const char*
743dict_process_sys_datafiles(
744/*=======================*/
745 mem_heap_t* heap, /*!< in/out: heap memory */
746 const rec_t* rec, /*!< in: current SYS_DATAFILES rec */
747 ulint* space, /*!< out: space id */
748 const char** path) /*!< out: datafile paths */
749{
750 ulint len;
751 const byte* field;
752
753 if (rec_get_deleted_flag(rec, 0)) {
754 return("delete-marked record in SYS_DATAFILES");
755 }
756
757 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_DATAFILES) {
758 return("wrong number of columns in SYS_DATAFILES record");
759 }
760
761 field = rec_get_nth_field_old(
762 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
763 if (len != DICT_FLD_LEN_SPACE) {
764err_len:
765 return("incorrect column length in SYS_DATAFILES");
766 }
767 *space = mach_read_from_4(field);
768
769 rec_get_nth_field_offs_old(
770 rec, DICT_FLD__SYS_DATAFILES__DB_TRX_ID, &len);
771 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
772 goto err_len;
773 }
774
775 rec_get_nth_field_offs_old(
776 rec, DICT_FLD__SYS_DATAFILES__DB_ROLL_PTR, &len);
777 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
778 goto err_len;
779 }
780
781 field = rec_get_nth_field_old(
782 rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
783 if (len == 0 || len == UNIV_SQL_NULL) {
784 goto err_len;
785 }
786 *path = mem_heap_strdupl(heap, (char*) field, len);
787
788 return(NULL);
789}
790
791/** Get the first filepath from SYS_DATAFILES for a given space_id.
792@param[in] space_id Tablespace ID
793@return First filepath (caller must invoke ut_free() on it)
794@retval NULL if no SYS_DATAFILES entry was found. */
795char*
796dict_get_first_path(
797 ulint space_id)
798{
799 mtr_t mtr;
800 dict_table_t* sys_datafiles;
801 dict_index_t* sys_index;
802 dtuple_t* tuple;
803 dfield_t* dfield;
804 byte* buf;
805 btr_pcur_t pcur;
806 const rec_t* rec;
807 const byte* field;
808 ulint len;
809 char* filepath = NULL;
810 mem_heap_t* heap = mem_heap_create(1024);
811
812 ut_ad(mutex_own(&dict_sys->mutex));
813
814 mtr_start(&mtr);
815
816 sys_datafiles = dict_table_get_low("SYS_DATAFILES");
817 sys_index = UT_LIST_GET_FIRST(sys_datafiles->indexes);
818
819 ut_ad(!dict_table_is_comp(sys_datafiles));
820 ut_ad(name_of_col_is(sys_datafiles, sys_index,
821 DICT_FLD__SYS_DATAFILES__SPACE, "SPACE"));
822 ut_ad(name_of_col_is(sys_datafiles, sys_index,
823 DICT_FLD__SYS_DATAFILES__PATH, "PATH"));
824
825 tuple = dtuple_create(heap, 1);
826 dfield = dtuple_get_nth_field(tuple, DICT_FLD__SYS_DATAFILES__SPACE);
827
828 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
829 mach_write_to_4(buf, space_id);
830
831 dfield_set_data(dfield, buf, 4);
832 dict_index_copy_types(tuple, sys_index, 1);
833
834 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
835 BTR_SEARCH_LEAF, &pcur, &mtr);
836
837 rec = btr_pcur_get_rec(&pcur);
838
839 /* Get the filepath from this SYS_DATAFILES record. */
840 if (btr_pcur_is_on_user_rec(&pcur)) {
841 field = rec_get_nth_field_old(
842 rec, DICT_FLD__SYS_DATAFILES__SPACE, &len);
843 ut_a(len == 4);
844
845 if (space_id == mach_read_from_4(field)) {
846 /* A record for this space ID was found. */
847 field = rec_get_nth_field_old(
848 rec, DICT_FLD__SYS_DATAFILES__PATH, &len);
849
850 ut_ad(len > 0);
851 ut_ad(len < OS_FILE_MAX_PATH);
852
853 if (len > 0 && len != UNIV_SQL_NULL) {
854 filepath = mem_strdupl(
855 reinterpret_cast<const char*>(field),
856 len);
857 ut_ad(filepath != NULL);
858
859 /* The dictionary may have been written on
860 another OS. */
861 os_normalize_path(filepath);
862 }
863 }
864 }
865
866 btr_pcur_close(&pcur);
867 mtr_commit(&mtr);
868 mem_heap_free(heap);
869
870 return(filepath);
871}
872
873/** Update the record for space_id in SYS_TABLESPACES to this filepath.
874@param[in] space_id Tablespace ID
875@param[in] filepath Tablespace filepath
876@return DB_SUCCESS if OK, dberr_t if the insert failed */
877dberr_t
878dict_update_filepath(
879 ulint space_id,
880 const char* filepath)
881{
882 if (!srv_sys_tablespaces_open) {
883 /* Startup procedure is not yet ready for updates. */
884 return(DB_SUCCESS);
885 }
886
887 dberr_t err = DB_SUCCESS;
888 trx_t* trx;
889
890 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
891 ut_ad(mutex_own(&dict_sys->mutex));
892
893 trx = trx_create();
894 trx->op_info = "update filepath";
895 trx->dict_operation_lock_mode = RW_X_LATCH;
896 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
897
898 pars_info_t* info = pars_info_create();
899
900 pars_info_add_int4_literal(info, "space", space_id);
901 pars_info_add_str_literal(info, "path", filepath);
902
903 err = que_eval_sql(info,
904 "PROCEDURE UPDATE_FILEPATH () IS\n"
905 "BEGIN\n"
906 "UPDATE SYS_DATAFILES"
907 " SET PATH = :path\n"
908 " WHERE SPACE = :space;\n"
909 "END;\n", FALSE, trx);
910
911 trx_commit_for_mysql(trx);
912 trx->dict_operation_lock_mode = 0;
913 trx_free(trx);
914
915 if (err == DB_SUCCESS) {
916 /* We just updated SYS_DATAFILES due to the contents in
917 a link file. Make a note that we did this. */
918 ib::info() << "The InnoDB data dictionary table SYS_DATAFILES"
919 " for tablespace ID " << space_id
920 << " was updated to use file " << filepath << ".";
921 } else {
922 ib::warn() << "Error occurred while updating InnoDB data"
923 " dictionary table SYS_DATAFILES for tablespace ID "
924 << space_id << " to file " << filepath << ": "
925 << ut_strerr(err) << ".";
926 }
927
928 return(err);
929}
930
931/** Replace records in SYS_TABLESPACES and SYS_DATAFILES associated with
932the given space_id using an independent transaction.
933@param[in] space_id Tablespace ID
934@param[in] name Tablespace name
935@param[in] filepath First filepath
936@param[in] fsp_flags Tablespace flags
937@return DB_SUCCESS if OK, dberr_t if the insert failed */
938dberr_t
939dict_replace_tablespace_and_filepath(
940 ulint space_id,
941 const char* name,
942 const char* filepath,
943 ulint fsp_flags)
944{
945 if (!srv_sys_tablespaces_open) {
946 /* Startup procedure is not yet ready for updates.
947 Return success since this will likely get updated
948 later. */
949 return(DB_SUCCESS);
950 }
951
952 dberr_t err = DB_SUCCESS;
953 trx_t* trx;
954
955 DBUG_EXECUTE_IF("innodb_fail_to_update_tablespace_dict",
956 return(DB_INTERRUPTED););
957
958 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
959 ut_ad(mutex_own(&dict_sys->mutex));
960 ut_ad(filepath);
961
962 trx = trx_create();
963 trx->op_info = "insert tablespace and filepath";
964 trx->dict_operation_lock_mode = RW_X_LATCH;
965 trx_start_for_ddl(trx, TRX_DICT_OP_INDEX);
966
967 /* A record for this space ID was not found in
968 SYS_DATAFILES. Assume the record is also missing in
969 SYS_TABLESPACES. Insert records into them both. */
970 err = dict_replace_tablespace_in_dictionary(
971 space_id, name, fsp_flags, filepath, trx);
972
973 trx_commit_for_mysql(trx);
974 trx->dict_operation_lock_mode = 0;
975 trx_free(trx);
976
977 return(err);
978}
979
980/** Check the validity of a SYS_TABLES record
981Make sure the fields are the right length and that they
982do not contain invalid contents.
983@param[in] rec SYS_TABLES record
984@return error message, or NULL on success */
985static
986const char*
987dict_sys_tables_rec_check(
988 const rec_t* rec)
989{
990 const byte* field;
991 ulint len;
992
993 ut_ad(mutex_own(&dict_sys->mutex));
994
995 if (rec_get_deleted_flag(rec, 0)) {
996 return("delete-marked record in SYS_TABLES");
997 }
998
999 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES) {
1000 return("wrong number of columns in SYS_TABLES record");
1001 }
1002
1003 rec_get_nth_field_offs_old(
1004 rec, DICT_FLD__SYS_TABLES__NAME, &len);
1005 if (len == 0 || len == UNIV_SQL_NULL) {
1006err_len:
1007 return("incorrect column length in SYS_TABLES");
1008 }
1009 rec_get_nth_field_offs_old(
1010 rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
1011 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1012 goto err_len;
1013 }
1014 rec_get_nth_field_offs_old(
1015 rec, DICT_FLD__SYS_TABLES__DB_ROLL_PTR, &len);
1016 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1017 goto err_len;
1018 }
1019
1020 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__ID, &len);
1021 if (len != 8) {
1022 goto err_len;
1023 }
1024
1025 field = rec_get_nth_field_old(
1026 rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1027 if (field == NULL || len != 4) {
1028 goto err_len;
1029 }
1030
1031 rec_get_nth_field_offs_old(rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1032 if (len != 4) {
1033 goto err_len;
1034 }
1035
1036 rec_get_nth_field_offs_old(
1037 rec, DICT_FLD__SYS_TABLES__MIX_ID, &len);
1038 if (len != 8) {
1039 goto err_len;
1040 }
1041
1042 field = rec_get_nth_field_old(
1043 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1044 if (field == NULL || len != 4) {
1045 goto err_len;
1046 }
1047
1048 rec_get_nth_field_offs_old(
1049 rec, DICT_FLD__SYS_TABLES__CLUSTER_ID, &len);
1050 if (len != UNIV_SQL_NULL) {
1051 goto err_len;
1052 }
1053
1054 field = rec_get_nth_field_old(
1055 rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1056 if (field == NULL || len != 4) {
1057 goto err_len;
1058 }
1059
1060 return(NULL);
1061}
1062
1063/** Read and return the contents of a SYS_TABLESPACES record.
1064@param[in] rec A record of SYS_TABLESPACES
1065@param[out] id Pointer to the space_id for this table
1066@param[in,out] name Buffer for Tablespace Name of length NAME_LEN
1067@param[out] flags Pointer to tablespace flags
1068@return true if the record was read correctly, false if not. */
1069bool
1070dict_sys_tablespaces_rec_read(
1071 const rec_t* rec,
1072 ulint* id,
1073 char* name,
1074 ulint* flags)
1075{
1076 const byte* field;
1077 ulint len;
1078
1079 field = rec_get_nth_field_old(
1080 rec, DICT_FLD__SYS_TABLESPACES__SPACE, &len);
1081 if (len != DICT_FLD_LEN_SPACE) {
1082 ib::error() << "Wrong field length in SYS_TABLESPACES.SPACE: "
1083 << len;
1084 return(false);
1085 }
1086 *id = mach_read_from_4(field);
1087
1088 field = rec_get_nth_field_old(
1089 rec, DICT_FLD__SYS_TABLESPACES__NAME, &len);
1090 if (len == 0 || len == UNIV_SQL_NULL) {
1091 ib::error() << "Wrong field length in SYS_TABLESPACES.NAME: "
1092 << len;
1093 return(false);
1094 }
1095 strncpy(name, reinterpret_cast<const char*>(field), NAME_LEN);
1096
1097 /* read the 4 byte flags from the TYPE field */
1098 field = rec_get_nth_field_old(
1099 rec, DICT_FLD__SYS_TABLESPACES__FLAGS, &len);
1100 if (len != 4) {
1101 ib::error() << "Wrong field length in SYS_TABLESPACES.FLAGS: "
1102 << len;
1103 return(false);
1104 }
1105 *flags = mach_read_from_4(field);
1106
1107 return(true);
1108}
1109
1110/** Check if SYS_TABLES.TYPE is valid
1111@param[in] type SYS_TABLES.TYPE
1112@param[in] not_redundant whether ROW_FORMAT=REDUNDANT is not used
1113@return whether the SYS_TABLES.TYPE value is valid */
1114static
1115bool
1116dict_sys_tables_type_valid(ulint type, bool not_redundant)
1117{
1118 /* The DATA_DIRECTORY flag can be assigned fully independently
1119 of all other persistent table flags. */
1120 type &= ~DICT_TF_MASK_DATA_DIR;
1121
1122 if (type == 1) {
1123 return(true); /* ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPACT */
1124 }
1125
1126 if (!(type & 1)) {
1127 /* For ROW_FORMAT=REDUNDANT and ROW_FORMAT=COMPACT,
1128 SYS_TABLES.TYPE=1. Else, it is the same as
1129 dict_table_t::flags, and the least significant bit
1130 would be set. So, the bit never can be 0. */
1131 return(false);
1132 }
1133
1134 if (!not_redundant) {
1135 /* SYS_TABLES.TYPE must be 1 or 1|DICT_TF_MASK_NO_ROLLBACK
1136 for ROW_FORMAT=REDUNDANT. */
1137 return !(type & ~(1U | DICT_TF_MASK_NO_ROLLBACK));
1138 }
1139
1140 if (type >= 1U << DICT_TF_POS_UNUSED) {
1141 /* Some unknown bits are set. */
1142 return(false);
1143 }
1144
1145 return(dict_tf_is_valid_not_redundant(type));
1146}
1147
1148/** Convert SYS_TABLES.TYPE to dict_table_t::flags.
1149@param[in] type SYS_TABLES.TYPE
1150@param[in] not_redundant whether ROW_FORMAT=REDUNDANT is not used
1151@return table flags */
1152static
1153ulint
1154dict_sys_tables_type_to_tf(ulint type, bool not_redundant)
1155{
1156 ut_ad(dict_sys_tables_type_valid(type, not_redundant));
1157 ulint flags = not_redundant ? 1 : 0;
1158
1159 /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION,
1160 PAGE_COMPRESSION_LEVEL are the same. */
1161 flags |= type & (DICT_TF_MASK_ZIP_SSIZE
1162 | DICT_TF_MASK_ATOMIC_BLOBS
1163 | DICT_TF_MASK_DATA_DIR
1164 | DICT_TF_MASK_PAGE_COMPRESSION
1165 | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL
1166 | DICT_TF_MASK_NO_ROLLBACK);
1167
1168 ut_ad(dict_tf_is_valid(flags));
1169 return(flags);
1170}
1171
1172/** Read and return 5 integer fields from a SYS_TABLES record.
1173@param[in] rec A record of SYS_TABLES
1174@param[in] name Table Name, the same as SYS_TABLES.NAME
1175@param[out] table_id Pointer to the table_id for this table
1176@param[out] space_id Pointer to the space_id for this table
1177@param[out] n_cols Pointer to number of columns for this table.
1178@param[out] flags Pointer to table flags
1179@param[out] flags2 Pointer to table flags2
1180@return true if the record was read correctly, false if not. */
1181MY_ATTRIBUTE((warn_unused_result))
1182static
1183bool
1184dict_sys_tables_rec_read(
1185 const rec_t* rec,
1186 const table_name_t& table_name,
1187 table_id_t* table_id,
1188 ulint* space_id,
1189 ulint* n_cols,
1190 ulint* flags,
1191 ulint* flags2)
1192{
1193 const byte* field;
1194 ulint len;
1195 ulint type;
1196
1197 field = rec_get_nth_field_old(
1198 rec, DICT_FLD__SYS_TABLES__ID, &len);
1199 ut_ad(len == 8);
1200 *table_id = static_cast<table_id_t>(mach_read_from_8(field));
1201
1202 field = rec_get_nth_field_old(
1203 rec, DICT_FLD__SYS_TABLES__SPACE, &len);
1204 ut_ad(len == 4);
1205 *space_id = mach_read_from_4(field);
1206
1207 /* Read the 4 byte flags from the TYPE field */
1208 field = rec_get_nth_field_old(
1209 rec, DICT_FLD__SYS_TABLES__TYPE, &len);
1210 ut_a(len == 4);
1211 type = mach_read_from_4(field);
1212
1213 /* Handle MDEV-12873 InnoDB SYS_TABLES.TYPE incompatibility
1214 for PAGE_COMPRESSED=YES in MariaDB 10.2.2 to 10.2.6.
1215
1216 MariaDB 10.2.2 introduced the SHARED_SPACE flag from MySQL 5.7,
1217 shifting the flags PAGE_COMPRESSION, PAGE_COMPRESSION_LEVEL,
1218 ATOMIC_WRITES (repurposed to NO_ROLLBACK in 10.3.1) by one bit.
1219 The SHARED_SPACE flag would always
1220 be written as 0 by MariaDB, because MariaDB does not support
1221 CREATE TABLESPACE or CREATE TABLE...TABLESPACE for InnoDB.
1222
1223 So, instead of the bits AALLLLCxxxxxxx we would have
1224 AALLLLC0xxxxxxx if the table was created with MariaDB 10.2.2
1225 to 10.2.6. (AA=ATOMIC_WRITES, LLLL=PAGE_COMPRESSION_LEVEL,
1226 C=PAGE_COMPRESSED, xxxxxxx=7 bits that were not moved.)
1227
1228 The case LLLLC=00000 is not a problem. The problem is the case
1229 AALLLL10DB00001 where D is the (mostly ignored) DATA_DIRECTORY
1230 flag and B is the ATOMIC_BLOBS flag (1 for ROW_FORMAT=DYNAMIC
1231 and 0 for ROW_FORMAT=COMPACT in this case). Other low-order
1232 bits must be so, because PAGE_COMPRESSED=YES is only allowed
1233 for ROW_FORMAT=DYNAMIC and ROW_FORMAT=COMPACT, not for
1234 ROW_FORMAT=REDUNDANT or ROW_FORMAT=COMPRESSED.
1235
1236 Starting with MariaDB 10.2.4, the flags would be
1237 00LLLL10DB00001, because ATOMIC_WRITES is always written as 0.
1238
1239 We will concentrate on the PAGE_COMPRESSION_LEVEL and
1240 PAGE_COMPRESSED=YES. PAGE_COMPRESSED=NO implies
1241 PAGE_COMPRESSION_LEVEL=0, and in that case all the affected
1242 bits will be 0. For PAGE_COMPRESSED=YES, the values 1..9 are
1243 allowed for PAGE_COMPRESSION_LEVEL. That is, we must interpret
1244 the bits AALLLL10DB00001 as AALLLL1DB00001.
1245
1246 If someone created a table in MariaDB 10.2.2 or 10.2.3 with
1247 the attribute ATOMIC_WRITES=OFF (value 2) and without
1248 PAGE_COMPRESSED=YES or PAGE_COMPRESSION_LEVEL, that should be
1249 rejected. The value ATOMIC_WRITES=ON (1) would look like
1250 ATOMIC_WRITES=OFF, but it would be ignored starting with
1251 MariaDB 10.2.4. */
1252 compile_time_assert(DICT_TF_POS_PAGE_COMPRESSION == 7);
1253 compile_time_assert(DICT_TF_POS_UNUSED == 14);
1254
1255 if ((type & 0x19f) != 0x101) {
1256 /* The table cannot have been created with MariaDB
1257 10.2.2 to 10.2.6, because they would write the
1258 low-order bits of SYS_TABLES.TYPE as 0b10xx00001 for
1259 PAGE_COMPRESSED=YES. No adjustment is applicable. */
1260 } else if (type >= 3 << 13) {
1261 /* 10.2.2 and 10.2.3 write ATOMIC_WRITES less than 3,
1262 and no other flags above that can be set for the
1263 SYS_TABLES.TYPE to be in the 10.2.2..10.2.6 format.
1264 This would in any case be invalid format for 10.2 and
1265 earlier releases. */
1266 ut_ad(!dict_sys_tables_type_valid(type, true));
1267 } else {
1268 /* SYS_TABLES.TYPE is of the form AALLLL10DB00001. We
1269 must still validate that the LLLL bits are between 0
1270 and 9 before we can discard the extraneous 0 bit. */
1271 ut_ad(!DICT_TF_GET_PAGE_COMPRESSION(type));
1272
1273 if ((((type >> 9) & 0xf) - 1) < 9) {
1274 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) & 1);
1275
1276 type = (type & 0x7fU) | (type >> 1 & ~0x7fU);
1277
1278 ut_ad(DICT_TF_GET_PAGE_COMPRESSION(type));
1279 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) >= 1);
1280 ut_ad(DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type) <= 9);
1281 } else {
1282 ut_ad(!dict_sys_tables_type_valid(type, true));
1283 }
1284 }
1285
1286 /* The low order bit of SYS_TABLES.TYPE is always set to 1. But in
1287 dict_table_t::flags the low order bit is used to determine if the
1288 ROW_FORMAT=REDUNDANT (0) or anything else (1).
1289 Read the 4 byte N_COLS field and look at the high order bit. It
1290 should be set for COMPACT and later. It should not be set for
1291 REDUNDANT. */
1292 field = rec_get_nth_field_old(
1293 rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
1294 ut_a(len == 4);
1295 *n_cols = mach_read_from_4(field);
1296
1297 const bool not_redundant = 0 != (*n_cols & DICT_N_COLS_COMPACT);
1298
1299 if (!dict_sys_tables_type_valid(type, not_redundant)) {
1300 ib::error() << "Table " << table_name << " in InnoDB"
1301 " data dictionary contains invalid flags."
1302 " SYS_TABLES.TYPE=" << type <<
1303 " SYS_TABLES.N_COLS=" << *n_cols;
1304 return(false);
1305 }
1306
1307 *flags = dict_sys_tables_type_to_tf(type, not_redundant);
1308
1309 /* For tables created before MySQL 4.1, there may be
1310 garbage in SYS_TABLES.MIX_LEN where flags2 are found. Such tables
1311 would always be in ROW_FORMAT=REDUNDANT which do not have the
1312 high bit set in n_cols, and flags would be zero.
1313 MySQL 4.1 was the first version to support innodb_file_per_table,
1314 that is, *space_id != 0. */
1315 if (not_redundant || *space_id != 0 || *n_cols & DICT_N_COLS_COMPACT) {
1316
1317 /* Get flags2 from SYS_TABLES.MIX_LEN */
1318 field = rec_get_nth_field_old(
1319 rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
1320 *flags2 = mach_read_from_4(field);
1321
1322 if (!dict_tf2_is_valid(*flags, *flags2)) {
1323 ib::error() << "Table " << table_name << " in InnoDB"
1324 " data dictionary contains invalid flags."
1325 " SYS_TABLES.TYPE=" << type
1326 << " SYS_TABLES.MIX_LEN=" << *flags2;
1327 return(false);
1328 }
1329
1330 /* DICT_TF2_FTS will be set when indexes are being loaded */
1331 *flags2 &= ~DICT_TF2_FTS;
1332
1333 /* Now that we have used this bit, unset it. */
1334 *n_cols &= ~DICT_N_COLS_COMPACT;
1335 } else {
1336 *flags2 = 0;
1337 }
1338
1339 return(true);
1340}
1341
1342/** Load and check each non-predefined tablespace mentioned in SYS_TABLES.
1343Search SYS_TABLES and check each tablespace mentioned that has not
1344already been added to the fil_system. If it is valid, add it to the
1345file_system list. Perform extra validation on the table if recovery from
1346the REDO log occurred.
1347@param[in] validate Whether to do validation on the table.
1348@return the highest space ID found. */
1349UNIV_INLINE
1350ulint
1351dict_check_sys_tables(
1352 bool validate)
1353{
1354 ulint max_space_id = 0;
1355 btr_pcur_t pcur;
1356 const rec_t* rec;
1357 mtr_t mtr;
1358
1359 DBUG_ENTER("dict_check_sys_tables");
1360
1361 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1362 ut_ad(mutex_own(&dict_sys->mutex));
1363
1364 mtr_start(&mtr);
1365
1366 /* Before traversing SYS_TABLES, let's make sure we have
1367 SYS_TABLESPACES and SYS_DATAFILES loaded. */
1368 dict_table_t* sys_tablespaces;
1369 dict_table_t* sys_datafiles;
1370 sys_tablespaces = dict_table_get_low("SYS_TABLESPACES");
1371 ut_a(sys_tablespaces != NULL);
1372 sys_datafiles = dict_table_get_low("SYS_DATAFILES");
1373 ut_a(sys_datafiles != NULL);
1374
1375 for (rec = dict_startscan_system(&pcur, &mtr, SYS_TABLES);
1376 rec != NULL;
1377 rec = dict_getnext_system(&pcur, &mtr)) {
1378 const byte* field;
1379 ulint len;
1380 table_name_t table_name;
1381 table_id_t table_id;
1382 ulint space_id;
1383 ulint n_cols;
1384 ulint flags;
1385 ulint flags2;
1386
1387 /* If a table record is not useable, ignore it and continue
1388 on to the next record. Error messages were logged. */
1389 if (dict_sys_tables_rec_check(rec) != NULL) {
1390 continue;
1391 }
1392
1393 /* Copy the table name from rec */
1394 field = rec_get_nth_field_old(
1395 rec, DICT_FLD__SYS_TABLES__NAME, &len);
1396 table_name.m_name = mem_strdupl((char*) field, len);
1397 DBUG_PRINT("dict_check_sys_tables",
1398 ("name: %p, '%s'", table_name.m_name,
1399 table_name.m_name));
1400
1401 if (!dict_sys_tables_rec_read(rec, table_name,
1402 &table_id, &space_id,
1403 &n_cols, &flags, &flags2)
1404 || space_id == TRX_SYS_SPACE) {
1405 ut_free(table_name.m_name);
1406 continue;
1407 }
1408
1409 if (flags2 & DICT_TF2_DISCARDED) {
1410 ib::info() << "Ignoring tablespace for " << table_name
1411 << " because the DISCARD flag is set .";
1412 ut_free(table_name.m_name);
1413 continue;
1414 }
1415
1416 /* For tables or partitions using .ibd files, the flag
1417 DICT_TF2_USE_FILE_PER_TABLE was not set in MIX_LEN
1418 before MySQL 5.6.5. The flag should not have been
1419 introduced in persistent storage. MariaDB will keep
1420 setting the flag when writing SYS_TABLES entries for
1421 newly created or rebuilt tables or partitions, but
1422 will otherwise ignore the flag. */
1423
1424 /* Now that we have the proper name for this tablespace,
1425 look to see if it is already in the tablespace cache. */
1426 if (const fil_space_t* space
1427 = fil_space_for_table_exists_in_mem(
1428 space_id, table_name.m_name, false, flags)) {
1429 /* Recovery can open a datafile that does not
1430 match SYS_DATAFILES. If they don't match, update
1431 SYS_DATAFILES. */
1432 char *dict_path = dict_get_first_path(space_id);
1433 const char *fil_path = space->chain.start->name;
1434 if (dict_path
1435 && strcmp(dict_path, fil_path)) {
1436 dict_update_filepath(space_id, fil_path);
1437 }
1438 ut_free(dict_path);
1439 ut_free(table_name.m_name);
1440 continue;
1441 }
1442
1443 /* Set the expected filepath from the data dictionary.
1444 If the file is found elsewhere (from an ISL or the default
1445 location) or this path is the same file but looks different,
1446 fil_ibd_open() will update the dictionary with what is
1447 opened. */
1448 char* filepath = dict_get_first_path(space_id);
1449
1450 /* Check that the .ibd file exists. */
1451 if (!fil_ibd_open(
1452 validate,
1453 !srv_read_only_mode && srv_log_file_size != 0,
1454 FIL_TYPE_TABLESPACE,
1455 space_id, dict_tf_to_fsp_flags(flags),
1456 table_name, filepath)) {
1457 ib::warn() << "Ignoring tablespace for "
1458 << table_name
1459 << " because it could not be opened.";
1460 }
1461
1462 max_space_id = ut_max(max_space_id, space_id);
1463
1464 ut_free(table_name.m_name);
1465 ut_free(filepath);
1466 }
1467
1468 mtr_commit(&mtr);
1469
1470 DBUG_RETURN(max_space_id);
1471}
1472
1473/** Check each tablespace found in the data dictionary.
1474Then look at each table defined in SYS_TABLES that has a space_id > 0
1475to find all the file-per-table tablespaces.
1476
1477In a crash recovery we already have some tablespace objects created from
1478processing the REDO log. Any other tablespace in SYS_TABLESPACES not
1479previously used in recovery will be opened here. We will compare the
1480space_id information in the data dictionary to what we find in the
1481tablespace file. In addition, more validation will be done if recovery
1482was needed and force_recovery is not set.
1483
1484We also scan the biggest space id, and store it to fil_system.
1485@param[in] validate true if recovery was needed */
1486void
1487dict_check_tablespaces_and_store_max_id(
1488 bool validate)
1489{
1490 mtr_t mtr;
1491
1492 DBUG_ENTER("dict_check_tablespaces_and_store_max_id");
1493
1494 rw_lock_x_lock(dict_operation_lock);
1495 mutex_enter(&dict_sys->mutex);
1496
1497 /* Initialize the max space_id from sys header */
1498 mtr_start(&mtr);
1499 ulint max_space_id = mtr_read_ulint(
1500 dict_hdr_get(&mtr) + DICT_HDR_MAX_SPACE_ID,
1501 MLOG_4BYTES, &mtr);
1502 mtr_commit(&mtr);
1503
1504 fil_set_max_space_id_if_bigger(max_space_id);
1505
1506 /* Open all tablespaces referenced in SYS_TABLES.
1507 This will update SYS_TABLESPACES and SYS_DATAFILES if it
1508 finds any file-per-table tablespaces not already there. */
1509 max_space_id = dict_check_sys_tables(validate);
1510 fil_set_max_space_id_if_bigger(max_space_id);
1511
1512 mutex_exit(&dict_sys->mutex);
1513 rw_lock_x_unlock(dict_operation_lock);
1514
1515 DBUG_VOID_RETURN;
1516}
1517
1518/** Error message for a delete-marked record in dict_load_column_low() */
1519static const char* dict_load_column_del = "delete-marked record in SYS_COLUMN";
1520
1521/** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
1522@return error message
1523@retval NULL on success */
1524static
1525const char*
1526dict_load_column_low(
1527 dict_table_t* table, /*!< in/out: table, could be NULL
1528 if we just populate a dict_column_t
1529 struct with information from
1530 a SYS_COLUMNS record */
1531 mem_heap_t* heap, /*!< in/out: memory heap
1532 for temporary storage */
1533 dict_col_t* column, /*!< out: dict_column_t to fill,
1534 or NULL if table != NULL */
1535 table_id_t* table_id, /*!< out: table id */
1536 const char** col_name, /*!< out: column name */
1537 const rec_t* rec, /*!< in: SYS_COLUMNS record */
1538 ulint* nth_v_col) /*!< out: if not NULL, this
1539 records the "n" of "nth" virtual
1540 column */
1541{
1542 char* name;
1543 const byte* field;
1544 ulint len;
1545 ulint mtype;
1546 ulint prtype;
1547 ulint col_len;
1548 ulint pos;
1549 ulint num_base;
1550
1551 ut_ad(table || column);
1552
1553 if (rec_get_deleted_flag(rec, 0)) {
1554 return(dict_load_column_del);
1555 }
1556
1557 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_COLUMNS) {
1558 return("wrong number of columns in SYS_COLUMNS record");
1559 }
1560
1561 field = rec_get_nth_field_old(
1562 rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
1563 if (len != 8) {
1564err_len:
1565 return("incorrect column length in SYS_COLUMNS");
1566 }
1567
1568 if (table_id) {
1569 *table_id = mach_read_from_8(field);
1570 } else if (table->id != mach_read_from_8(field)) {
1571 return("SYS_COLUMNS.TABLE_ID mismatch");
1572 }
1573
1574 field = rec_get_nth_field_old(
1575 rec, DICT_FLD__SYS_COLUMNS__POS, &len);
1576 if (len != 4) {
1577 goto err_len;
1578 }
1579
1580 pos = mach_read_from_4(field);
1581
1582 rec_get_nth_field_offs_old(
1583 rec, DICT_FLD__SYS_COLUMNS__DB_TRX_ID, &len);
1584 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1585 goto err_len;
1586 }
1587 rec_get_nth_field_offs_old(
1588 rec, DICT_FLD__SYS_COLUMNS__DB_ROLL_PTR, &len);
1589 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1590 goto err_len;
1591 }
1592
1593 field = rec_get_nth_field_old(
1594 rec, DICT_FLD__SYS_COLUMNS__NAME, &len);
1595 if (len == 0 || len == UNIV_SQL_NULL) {
1596 goto err_len;
1597 }
1598
1599 name = mem_heap_strdupl(heap, (const char*) field, len);
1600
1601 if (col_name) {
1602 *col_name = name;
1603 }
1604
1605 field = rec_get_nth_field_old(
1606 rec, DICT_FLD__SYS_COLUMNS__MTYPE, &len);
1607 if (len != 4) {
1608 goto err_len;
1609 }
1610
1611 mtype = mach_read_from_4(field);
1612
1613 field = rec_get_nth_field_old(
1614 rec, DICT_FLD__SYS_COLUMNS__PRTYPE, &len);
1615 if (len != 4) {
1616 goto err_len;
1617 }
1618 prtype = mach_read_from_4(field);
1619
1620 if (dtype_get_charset_coll(prtype) == 0
1621 && dtype_is_string_type(mtype)) {
1622 /* The table was created with < 4.1.2. */
1623
1624 if (dtype_is_binary_string_type(mtype, prtype)) {
1625 /* Use the binary collation for
1626 string columns of binary type. */
1627
1628 prtype = dtype_form_prtype(
1629 prtype,
1630 DATA_MYSQL_BINARY_CHARSET_COLL);
1631 } else {
1632 /* Use the default charset for
1633 other than binary columns. */
1634
1635 prtype = dtype_form_prtype(
1636 prtype,
1637 data_mysql_default_charset_coll);
1638 }
1639 }
1640
1641 if (table && table->n_def != pos && !(prtype & DATA_VIRTUAL)) {
1642 return("SYS_COLUMNS.POS mismatch");
1643 }
1644
1645 field = rec_get_nth_field_old(
1646 rec, DICT_FLD__SYS_COLUMNS__LEN, &len);
1647 if (len != 4) {
1648 goto err_len;
1649 }
1650 col_len = mach_read_from_4(field);
1651 field = rec_get_nth_field_old(
1652 rec, DICT_FLD__SYS_COLUMNS__PREC, &len);
1653 if (len != 4) {
1654 goto err_len;
1655 }
1656 num_base = mach_read_from_4(field);
1657
1658 if (column == NULL) {
1659 if (prtype & DATA_VIRTUAL) {
1660#ifdef UNIV_DEBUG
1661 dict_v_col_t* vcol =
1662#endif
1663 dict_mem_table_add_v_col(
1664 table, heap, name, mtype,
1665 prtype, col_len,
1666 dict_get_v_col_mysql_pos(pos), num_base);
1667 ut_ad(vcol->v_pos == dict_get_v_col_pos(pos));
1668 } else {
1669 ut_ad(num_base == 0);
1670 dict_mem_table_add_col(table, heap, name, mtype,
1671 prtype, col_len);
1672 }
1673 } else {
1674 dict_mem_fill_column_struct(column, pos, mtype,
1675 prtype, col_len);
1676 }
1677
1678 /* Report the virtual column number */
1679 if ((prtype & DATA_VIRTUAL) && nth_v_col != NULL) {
1680 *nth_v_col = dict_get_v_col_pos(pos);
1681 }
1682
1683 return(NULL);
1684}
1685
1686/** Error message for a delete-marked record in dict_load_virtual_low() */
1687static const char* dict_load_virtual_del = "delete-marked record in SYS_VIRTUAL";
1688
1689/** Load a virtual column "mapping" (to base columns) information
1690from a SYS_VIRTUAL record
1691@param[in,out] table table
1692@param[in,out] column mapped base column's dict_column_t
1693@param[in,out] table_id table id
1694@param[in,out] pos virtual column position
1695@param[in,out] base_pos base column position
1696@param[in] rec SYS_VIRTUAL record
1697@return error message
1698@retval NULL on success */
1699static
1700const char*
1701dict_load_virtual_low(
1702 dict_table_t* table,
1703 dict_col_t** column,
1704 table_id_t* table_id,
1705 ulint* pos,
1706 ulint* base_pos,
1707 const rec_t* rec)
1708{
1709 const byte* field;
1710 ulint len;
1711 ulint base;
1712
1713 if (rec_get_deleted_flag(rec, 0)) {
1714 return(dict_load_virtual_del);
1715 }
1716
1717 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VIRTUAL) {
1718 return("wrong number of columns in SYS_VIRTUAL record");
1719 }
1720
1721 field = rec_get_nth_field_old(
1722 rec, DICT_FLD__SYS_VIRTUAL__TABLE_ID, &len);
1723 if (len != 8) {
1724err_len:
1725 return("incorrect column length in SYS_VIRTUAL");
1726 }
1727
1728 if (table_id != NULL) {
1729 *table_id = mach_read_from_8(field);
1730 } else if (table->id != mach_read_from_8(field)) {
1731 return("SYS_VIRTUAL.TABLE_ID mismatch");
1732 }
1733
1734 field = rec_get_nth_field_old(
1735 rec, DICT_FLD__SYS_VIRTUAL__POS, &len);
1736 if (len != 4) {
1737 goto err_len;
1738 }
1739
1740 if (pos != NULL) {
1741 *pos = mach_read_from_4(field);
1742 }
1743
1744 field = rec_get_nth_field_old(
1745 rec, DICT_FLD__SYS_VIRTUAL__BASE_POS, &len);
1746 if (len != 4) {
1747 goto err_len;
1748 }
1749
1750 base = mach_read_from_4(field);
1751
1752 if (base_pos != NULL) {
1753 *base_pos = base;
1754 }
1755
1756 rec_get_nth_field_offs_old(
1757 rec, DICT_FLD__SYS_VIRTUAL__DB_TRX_ID, &len);
1758 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
1759 goto err_len;
1760 }
1761
1762 rec_get_nth_field_offs_old(
1763 rec, DICT_FLD__SYS_VIRTUAL__DB_ROLL_PTR, &len);
1764 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
1765 goto err_len;
1766 }
1767
1768 if (column != NULL) {
1769 *column = dict_table_get_nth_col(table, base);
1770 }
1771
1772 return(NULL);
1773}
1774
1775/********************************************************************//**
1776Loads definitions for table columns. */
1777static
1778void
1779dict_load_columns(
1780/*==============*/
1781 dict_table_t* table, /*!< in/out: table */
1782 mem_heap_t* heap) /*!< in/out: memory heap
1783 for temporary storage */
1784{
1785 dict_table_t* sys_columns;
1786 dict_index_t* sys_index;
1787 btr_pcur_t pcur;
1788 dtuple_t* tuple;
1789 dfield_t* dfield;
1790 const rec_t* rec;
1791 byte* buf;
1792 ulint i;
1793 mtr_t mtr;
1794 ulint n_skipped = 0;
1795
1796 ut_ad(mutex_own(&dict_sys->mutex));
1797
1798 mtr_start(&mtr);
1799
1800 sys_columns = dict_table_get_low("SYS_COLUMNS");
1801 sys_index = UT_LIST_GET_FIRST(sys_columns->indexes);
1802 ut_ad(!dict_table_is_comp(sys_columns));
1803
1804 ut_ad(name_of_col_is(sys_columns, sys_index,
1805 DICT_FLD__SYS_COLUMNS__NAME, "NAME"));
1806 ut_ad(name_of_col_is(sys_columns, sys_index,
1807 DICT_FLD__SYS_COLUMNS__PREC, "PREC"));
1808
1809 tuple = dtuple_create(heap, 1);
1810 dfield = dtuple_get_nth_field(tuple, 0);
1811
1812 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1813 mach_write_to_8(buf, table->id);
1814
1815 dfield_set_data(dfield, buf, 8);
1816 dict_index_copy_types(tuple, sys_index, 1);
1817
1818 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
1819 BTR_SEARCH_LEAF, &pcur, &mtr);
1820
1821 ut_ad(table->n_t_cols == static_cast<ulint>(
1822 table->n_cols) + static_cast<ulint>(table->n_v_cols));
1823
1824 for (i = 0;
1825 i + DATA_N_SYS_COLS < table->n_t_cols + n_skipped;
1826 i++) {
1827 const char* err_msg;
1828 const char* name = NULL;
1829 ulint nth_v_col = ULINT_UNDEFINED;
1830
1831 rec = btr_pcur_get_rec(&pcur);
1832
1833 ut_a(btr_pcur_is_on_user_rec(&pcur));
1834
1835 err_msg = dict_load_column_low(table, heap, NULL, NULL,
1836 &name, rec, &nth_v_col);
1837
1838 if (err_msg == dict_load_column_del) {
1839 n_skipped++;
1840 goto next_rec;
1841 } else if (err_msg) {
1842 ib::fatal() << err_msg;
1843 }
1844
1845 /* Note: Currently we have one DOC_ID column that is
1846 shared by all FTS indexes on a table. And only non-virtual
1847 column can be used for FULLTEXT index */
1848 if (innobase_strcasecmp(name,
1849 FTS_DOC_ID_COL_NAME) == 0
1850 && nth_v_col == ULINT_UNDEFINED) {
1851 dict_col_t* col;
1852 /* As part of normal loading of tables the
1853 table->flag is not set for tables with FTS
1854 till after the FTS indexes are loaded. So we
1855 create the fts_t instance here if there isn't
1856 one already created.
1857
1858 This case does not arise for table create as
1859 the flag is set before the table is created. */
1860 if (table->fts == NULL) {
1861 table->fts = fts_create(table);
1862 fts_optimize_add_table(table);
1863 }
1864
1865 ut_a(table->fts->doc_col == ULINT_UNDEFINED);
1866
1867 col = dict_table_get_nth_col(table, i - n_skipped);
1868
1869 ut_ad(col->len == sizeof(doc_id_t));
1870
1871 if (col->prtype & DATA_FTS_DOC_ID) {
1872 DICT_TF2_FLAG_SET(
1873 table, DICT_TF2_FTS_HAS_DOC_ID);
1874 DICT_TF2_FLAG_UNSET(
1875 table, DICT_TF2_FTS_ADD_DOC_ID);
1876 }
1877
1878 table->fts->doc_col = i - n_skipped;
1879 }
1880next_rec:
1881 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1882 }
1883
1884 btr_pcur_close(&pcur);
1885 mtr_commit(&mtr);
1886}
1887
1888/** Loads SYS_VIRTUAL info for one virtual column
1889@param[in,out] table table
1890@param[in] nth_v_col virtual column sequence num
1891@param[in,out] v_col virtual column
1892@param[in,out] heap memory heap
1893*/
1894static
1895void
1896dict_load_virtual_one_col(
1897 dict_table_t* table,
1898 ulint nth_v_col,
1899 dict_v_col_t* v_col,
1900 mem_heap_t* heap)
1901{
1902 dict_table_t* sys_virtual;
1903 dict_index_t* sys_virtual_index;
1904 btr_pcur_t pcur;
1905 dtuple_t* tuple;
1906 dfield_t* dfield;
1907 const rec_t* rec;
1908 byte* buf;
1909 ulint i = 0;
1910 mtr_t mtr;
1911 ulint skipped = 0;
1912
1913 ut_ad(mutex_own(&dict_sys->mutex));
1914
1915 if (v_col->num_base == 0) {
1916 return;
1917 }
1918
1919 mtr_start(&mtr);
1920
1921 sys_virtual = dict_table_get_low("SYS_VIRTUAL");
1922 sys_virtual_index = UT_LIST_GET_FIRST(sys_virtual->indexes);
1923 ut_ad(!dict_table_is_comp(sys_virtual));
1924
1925 ut_ad(name_of_col_is(sys_virtual, sys_virtual_index,
1926 DICT_FLD__SYS_VIRTUAL__POS, "POS"));
1927
1928 tuple = dtuple_create(heap, 2);
1929
1930 /* table ID field */
1931 dfield = dtuple_get_nth_field(tuple, 0);
1932
1933 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
1934 mach_write_to_8(buf, table->id);
1935
1936 dfield_set_data(dfield, buf, 8);
1937
1938 /* virtual column pos field */
1939 dfield = dtuple_get_nth_field(tuple, 1);
1940
1941 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1942 ulint vcol_pos = dict_create_v_col_pos(nth_v_col, v_col->m_col.ind);
1943 mach_write_to_4(buf, vcol_pos);
1944
1945 dfield_set_data(dfield, buf, 4);
1946
1947 dict_index_copy_types(tuple, sys_virtual_index, 2);
1948
1949 btr_pcur_open_on_user_rec(sys_virtual_index, tuple, PAGE_CUR_GE,
1950 BTR_SEARCH_LEAF, &pcur, &mtr);
1951
1952 for (i = 0; i < v_col->num_base + skipped; i++) {
1953 const char* err_msg;
1954 ulint pos;
1955
1956 ut_ad(btr_pcur_is_on_user_rec(&pcur));
1957
1958 rec = btr_pcur_get_rec(&pcur);
1959
1960 ut_a(btr_pcur_is_on_user_rec(&pcur));
1961
1962 err_msg = dict_load_virtual_low(table,
1963 &v_col->base_col[i - skipped],
1964 NULL,
1965 &pos, NULL, rec);
1966
1967 if (err_msg) {
1968 if (err_msg != dict_load_virtual_del) {
1969 ib::fatal() << err_msg;
1970 } else {
1971 skipped++;
1972 }
1973 } else {
1974 ut_ad(pos == vcol_pos);
1975 }
1976
1977 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1978 }
1979
1980 btr_pcur_close(&pcur);
1981 mtr_commit(&mtr);
1982}
1983
1984/** Loads info from SYS_VIRTUAL for virtual columns.
1985@param[in,out] table table
1986@param[in] heap memory heap
1987*/
1988static
1989void
1990dict_load_virtual(
1991 dict_table_t* table,
1992 mem_heap_t* heap)
1993{
1994 for (ulint i = 0; i < table->n_v_cols; i++) {
1995 dict_v_col_t* v_col = dict_table_get_nth_v_col(table, i);
1996
1997 dict_load_virtual_one_col(table, i, v_col, heap);
1998 }
1999}
2000
2001/** Error message for a delete-marked record in dict_load_field_low() */
2002static const char* dict_load_field_del = "delete-marked record in SYS_FIELDS";
2003
2004/** Load an index field definition from a SYS_FIELDS record to dict_index_t.
2005@return error message
2006@retval NULL on success */
2007static
2008const char*
2009dict_load_field_low(
2010 byte* index_id, /*!< in/out: index id (8 bytes)
2011 an "in" value if index != NULL
2012 and "out" if index == NULL */
2013 dict_index_t* index, /*!< in/out: index, could be NULL
2014 if we just populate a dict_field_t
2015 struct with information from
2016 a SYS_FIELDS record */
2017 dict_field_t* sys_field, /*!< out: dict_field_t to be
2018 filled */
2019 ulint* pos, /*!< out: Field position */
2020 byte* last_index_id, /*!< in: last index id */
2021 mem_heap_t* heap, /*!< in/out: memory heap
2022 for temporary storage */
2023 const rec_t* rec) /*!< in: SYS_FIELDS record */
2024{
2025 const byte* field;
2026 ulint len;
2027 unsigned pos_and_prefix_len;
2028 unsigned prefix_len;
2029 bool first_field;
2030 ulint position;
2031
2032 /* Either index or sys_field is supplied, not both */
2033 ut_a((!index) || (!sys_field));
2034
2035 if (rec_get_deleted_flag(rec, 0)) {
2036 return(dict_load_field_del);
2037 }
2038
2039 if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_FIELDS) {
2040 return("wrong number of columns in SYS_FIELDS record");
2041 }
2042
2043 field = rec_get_nth_field_old(
2044 rec, DICT_FLD__SYS_FIELDS__INDEX_ID, &len);
2045 if (len != 8) {
2046err_len:
2047 return("incorrect column length in SYS_FIELDS");
2048 }
2049
2050 if (!index) {
2051 ut_a(last_index_id);
2052 memcpy(index_id, (const char*) field, 8);
2053 first_field = memcmp(index_id, last_index_id, 8);
2054 } else {
2055 first_field = (index->n_def == 0);
2056 if (memcmp(field, index_id, 8)) {
2057 return("SYS_FIELDS.INDEX_ID mismatch");
2058 }
2059 }
2060
2061 /* The next field stores the field position in the index and a
2062 possible column prefix length if the index field does not
2063 contain the whole column. The storage format is like this: if
2064 there is at least one prefix field in the index, then the HIGH
2065 2 bytes contain the field number (index->n_def) and the low 2
2066 bytes the prefix length for the field. Otherwise the field
2067 number (index->n_def) is contained in the 2 LOW bytes. */
2068
2069 field = rec_get_nth_field_old(
2070 rec, DICT_FLD__SYS_FIELDS__POS, &len);
2071 if (len != 4) {
2072 goto err_len;
2073 }
2074
2075 pos_and_prefix_len = mach_read_from_4(field);
2076
2077 if (index && UNIV_UNLIKELY
2078 ((pos_and_prefix_len & 0xFFFFUL) != index->n_def
2079 && (pos_and_prefix_len >> 16 & 0xFFFF) != index->n_def)) {
2080 return("SYS_FIELDS.POS mismatch");
2081 }
2082
2083 if (first_field || pos_and_prefix_len > 0xFFFFUL) {
2084 prefix_len = pos_and_prefix_len & 0xFFFFUL;
2085 position = (pos_and_prefix_len & 0xFFFF0000UL) >> 16;
2086 } else {
2087 prefix_len = 0;
2088 position = pos_and_prefix_len & 0xFFFFUL;
2089 }
2090
2091 rec_get_nth_field_offs_old(
2092 rec, DICT_FLD__SYS_FIELDS__DB_TRX_ID, &len);
2093 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2094 goto err_len;
2095 }
2096 rec_get_nth_field_offs_old(
2097 rec, DICT_FLD__SYS_FIELDS__DB_ROLL_PTR, &len);
2098 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2099 goto err_len;
2100 }
2101
2102 field = rec_get_nth_field_old(
2103 rec, DICT_FLD__SYS_FIELDS__COL_NAME, &len);
2104 if (len == 0 || len == UNIV_SQL_NULL) {
2105 goto err_len;
2106 }
2107
2108 if (index) {
2109 dict_mem_index_add_field(
2110 index, mem_heap_strdupl(heap, (const char*) field, len),
2111 prefix_len);
2112 } else {
2113 ut_a(sys_field);
2114 ut_a(pos);
2115
2116 sys_field->name = mem_heap_strdupl(
2117 heap, (const char*) field, len);
2118 sys_field->prefix_len = prefix_len;
2119 *pos = position;
2120 }
2121
2122 return(NULL);
2123}
2124
2125/********************************************************************//**
2126Loads definitions for index fields.
2127@return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
2128static
2129ulint
2130dict_load_fields(
2131/*=============*/
2132 dict_index_t* index, /*!< in/out: index whose fields to load */
2133 mem_heap_t* heap) /*!< in: memory heap for temporary storage */
2134{
2135 dict_table_t* sys_fields;
2136 dict_index_t* sys_index;
2137 btr_pcur_t pcur;
2138 dtuple_t* tuple;
2139 dfield_t* dfield;
2140 const rec_t* rec;
2141 byte* buf;
2142 ulint i;
2143 mtr_t mtr;
2144 dberr_t error;
2145
2146 ut_ad(mutex_own(&dict_sys->mutex));
2147
2148 mtr_start(&mtr);
2149
2150 sys_fields = dict_table_get_low("SYS_FIELDS");
2151 sys_index = UT_LIST_GET_FIRST(sys_fields->indexes);
2152 ut_ad(!dict_table_is_comp(sys_fields));
2153 ut_ad(name_of_col_is(sys_fields, sys_index,
2154 DICT_FLD__SYS_FIELDS__COL_NAME, "COL_NAME"));
2155
2156 tuple = dtuple_create(heap, 1);
2157 dfield = dtuple_get_nth_field(tuple, 0);
2158
2159 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2160 mach_write_to_8(buf, index->id);
2161
2162 dfield_set_data(dfield, buf, 8);
2163 dict_index_copy_types(tuple, sys_index, 1);
2164
2165 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2166 BTR_SEARCH_LEAF, &pcur, &mtr);
2167 for (i = 0; i < index->n_fields; i++) {
2168 const char* err_msg;
2169
2170 rec = btr_pcur_get_rec(&pcur);
2171
2172 ut_a(btr_pcur_is_on_user_rec(&pcur));
2173
2174 err_msg = dict_load_field_low(buf, index, NULL, NULL, NULL,
2175 heap, rec);
2176
2177 if (err_msg == dict_load_field_del) {
2178 /* There could be delete marked records in
2179 SYS_FIELDS because SYS_FIELDS.INDEX_ID can be
2180 updated by ALTER TABLE ADD INDEX. */
2181
2182 goto next_rec;
2183 } else if (err_msg) {
2184 ib::error() << err_msg;
2185 error = DB_CORRUPTION;
2186 goto func_exit;
2187 }
2188next_rec:
2189 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2190 }
2191
2192 error = DB_SUCCESS;
2193func_exit:
2194 btr_pcur_close(&pcur);
2195 mtr_commit(&mtr);
2196 return(error);
2197}
2198
2199/** Error message for a delete-marked record in dict_load_index_low() */
2200static const char* dict_load_index_del = "delete-marked record in SYS_INDEXES";
2201/** Error message for table->id mismatch in dict_load_index_low() */
2202static const char* dict_load_index_id_err = "SYS_INDEXES.TABLE_ID mismatch";
2203/** Error message for SYS_TABLES flags mismatch in dict_load_table_low() */
2204static const char* dict_load_table_flags = "incorrect flags in SYS_TABLES";
2205
2206/** Load an index definition from a SYS_INDEXES record to dict_index_t.
2207If allocate=TRUE, we will create a dict_index_t structure and fill it
2208accordingly. If allocated=FALSE, the dict_index_t will be supplied by
2209the caller and filled with information read from the record.
2210@return error message
2211@retval NULL on success */
2212static
2213const char*
2214dict_load_index_low(
2215 byte* table_id, /*!< in/out: table id (8 bytes),
2216 an "in" value if allocate=TRUE
2217 and "out" when allocate=FALSE */
2218 mem_heap_t* heap, /*!< in/out: temporary memory heap */
2219 const rec_t* rec, /*!< in: SYS_INDEXES record */
2220 ibool allocate, /*!< in: TRUE=allocate *index,
2221 FALSE=fill in a pre-allocated
2222 *index */
2223 dict_index_t** index) /*!< out,own: index, or NULL */
2224{
2225 const byte* field;
2226 ulint len;
2227 ulint name_len;
2228 char* name_buf;
2229 index_id_t id;
2230 ulint n_fields;
2231 ulint type;
2232 unsigned merge_threshold;
2233
2234 if (allocate) {
2235 /* If allocate=TRUE, no dict_index_t will
2236 be supplied. Initialize "*index" to NULL */
2237 *index = NULL;
2238 }
2239
2240 if (rec_get_deleted_flag(rec, 0)) {
2241 return(dict_load_index_del);
2242 }
2243
2244 if (rec_get_n_fields_old(rec) == DICT_NUM_FIELDS__SYS_INDEXES) {
2245 /* MERGE_THRESHOLD exists */
2246 field = rec_get_nth_field_old(
2247 rec, DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD, &len);
2248 switch (len) {
2249 case 4:
2250 merge_threshold = mach_read_from_4(field);
2251 break;
2252 case UNIV_SQL_NULL:
2253 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2254 break;
2255 default:
2256 return("incorrect MERGE_THRESHOLD length"
2257 " in SYS_INDEXES");
2258 }
2259 } else if (rec_get_n_fields_old(rec)
2260 == DICT_NUM_FIELDS__SYS_INDEXES - 1) {
2261 /* MERGE_THRESHOLD doesn't exist */
2262
2263 merge_threshold = DICT_INDEX_MERGE_THRESHOLD_DEFAULT;
2264 } else {
2265 return("wrong number of columns in SYS_INDEXES record");
2266 }
2267
2268 field = rec_get_nth_field_old(
2269 rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
2270 if (len != 8) {
2271err_len:
2272 return("incorrect column length in SYS_INDEXES");
2273 }
2274
2275 if (!allocate) {
2276 /* We are reading a SYS_INDEXES record. Copy the table_id */
2277 memcpy(table_id, (const char*) field, 8);
2278 } else if (memcmp(field, table_id, 8)) {
2279 /* Caller supplied table_id, verify it is the same
2280 id as on the index record */
2281 return(dict_load_index_id_err);
2282 }
2283
2284 field = rec_get_nth_field_old(
2285 rec, DICT_FLD__SYS_INDEXES__ID, &len);
2286 if (len != 8) {
2287 goto err_len;
2288 }
2289
2290 id = mach_read_from_8(field);
2291
2292 rec_get_nth_field_offs_old(
2293 rec, DICT_FLD__SYS_INDEXES__DB_TRX_ID, &len);
2294 if (len != DATA_TRX_ID_LEN && len != UNIV_SQL_NULL) {
2295 goto err_len;
2296 }
2297 rec_get_nth_field_offs_old(
2298 rec, DICT_FLD__SYS_INDEXES__DB_ROLL_PTR, &len);
2299 if (len != DATA_ROLL_PTR_LEN && len != UNIV_SQL_NULL) {
2300 goto err_len;
2301 }
2302
2303 field = rec_get_nth_field_old(
2304 rec, DICT_FLD__SYS_INDEXES__NAME, &name_len);
2305 if (name_len == UNIV_SQL_NULL) {
2306 goto err_len;
2307 }
2308
2309 name_buf = mem_heap_strdupl(heap, (const char*) field,
2310 name_len);
2311
2312 field = rec_get_nth_field_old(
2313 rec, DICT_FLD__SYS_INDEXES__N_FIELDS, &len);
2314 if (len != 4) {
2315 goto err_len;
2316 }
2317 n_fields = mach_read_from_4(field);
2318
2319 field = rec_get_nth_field_old(
2320 rec, DICT_FLD__SYS_INDEXES__TYPE, &len);
2321 if (len != 4) {
2322 goto err_len;
2323 }
2324 type = mach_read_from_4(field);
2325 if (type & (~0U << DICT_IT_BITS)) {
2326 return("unknown SYS_INDEXES.TYPE bits");
2327 }
2328
2329 field = rec_get_nth_field_old(
2330 rec, DICT_FLD__SYS_INDEXES__PAGE_NO, &len);
2331 if (len != 4) {
2332 goto err_len;
2333 }
2334
2335 if (allocate) {
2336 *index = dict_mem_index_create(NULL, name_buf, type, n_fields);
2337 } else {
2338 ut_a(*index);
2339
2340 dict_mem_fill_index_struct(*index, NULL, name_buf,
2341 type, n_fields);
2342 }
2343
2344 (*index)->id = id;
2345 (*index)->page = mach_read_from_4(field);
2346 ut_ad((*index)->page);
2347 (*index)->merge_threshold = merge_threshold;
2348
2349 return(NULL);
2350}
2351
2352/********************************************************************//**
2353Loads definitions for table indexes. Adds them to the data dictionary
2354cache.
2355@return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
2356table or DB_UNSUPPORTED if table has unknown index type */
2357static MY_ATTRIBUTE((nonnull))
2358dberr_t
2359dict_load_indexes(
2360/*==============*/
2361 dict_table_t* table, /*!< in/out: table */
2362 mem_heap_t* heap, /*!< in: memory heap for temporary storage */
2363 dict_err_ignore_t ignore_err)
2364 /*!< in: error to be ignored when
2365 loading the index definition */
2366{
2367 dict_table_t* sys_indexes;
2368 dict_index_t* sys_index;
2369 btr_pcur_t pcur;
2370 dtuple_t* tuple;
2371 dfield_t* dfield;
2372 const rec_t* rec;
2373 byte* buf;
2374 mtr_t mtr;
2375 dberr_t error = DB_SUCCESS;
2376
2377 ut_ad(mutex_own(&dict_sys->mutex));
2378
2379 mtr_start(&mtr);
2380
2381 sys_indexes = dict_table_get_low("SYS_INDEXES");
2382 sys_index = UT_LIST_GET_FIRST(sys_indexes->indexes);
2383 ut_ad(!dict_table_is_comp(sys_indexes));
2384 ut_ad(name_of_col_is(sys_indexes, sys_index,
2385 DICT_FLD__SYS_INDEXES__NAME, "NAME"));
2386 ut_ad(name_of_col_is(sys_indexes, sys_index,
2387 DICT_FLD__SYS_INDEXES__PAGE_NO, "PAGE_NO"));
2388
2389 tuple = dtuple_create(heap, 1);
2390 dfield = dtuple_get_nth_field(tuple, 0);
2391
2392 buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
2393 mach_write_to_8(buf, table->id);
2394
2395 dfield_set_data(dfield, buf, 8);
2396 dict_index_copy_types(tuple, sys_index, 1);
2397
2398 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2399 BTR_SEARCH_LEAF, &pcur, &mtr);
2400 for (;;) {
2401 dict_index_t* index = NULL;
2402 const char* err_msg;
2403
2404 if (!btr_pcur_is_on_user_rec(&pcur)) {
2405
2406 /* We should allow the table to open even
2407 without index when DICT_ERR_IGNORE_CORRUPT is set.
2408 DICT_ERR_IGNORE_CORRUPT is currently only set
2409 for drop table */
2410 if (dict_table_get_first_index(table) == NULL
2411 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2412 ib::warn() << "Cannot load table "
2413 << table->name
2414 << " because it has no indexes in"
2415 " InnoDB internal data dictionary.";
2416 error = DB_CORRUPTION;
2417 goto func_exit;
2418 }
2419
2420 break;
2421 }
2422
2423 rec = btr_pcur_get_rec(&pcur);
2424
2425 if ((ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2426 && (rec_get_n_fields_old(rec)
2427 == DICT_NUM_FIELDS__SYS_INDEXES
2428 /* a record for older SYS_INDEXES table
2429 (missing merge_threshold column) is acceptable. */
2430 || rec_get_n_fields_old(rec)
2431 == DICT_NUM_FIELDS__SYS_INDEXES - 1)) {
2432 const byte* field;
2433 ulint len;
2434 field = rec_get_nth_field_old(
2435 rec, DICT_FLD__SYS_INDEXES__NAME, &len);
2436
2437 if (len != UNIV_SQL_NULL
2438 && static_cast<char>(*field)
2439 == static_cast<char>(*TEMP_INDEX_PREFIX_STR)) {
2440 /* Skip indexes whose name starts with
2441 TEMP_INDEX_PREFIX, because they will
2442 be dropped during crash recovery. */
2443 goto next_rec;
2444 }
2445 }
2446
2447 err_msg = dict_load_index_low(buf, heap, rec, TRUE, &index);
2448 ut_ad((index == NULL && err_msg != NULL)
2449 || (index != NULL && err_msg == NULL));
2450
2451 if (err_msg == dict_load_index_id_err) {
2452 /* TABLE_ID mismatch means that we have
2453 run out of index definitions for the table. */
2454
2455 if (dict_table_get_first_index(table) == NULL
2456 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)) {
2457
2458 ib::warn() << "Failed to load the"
2459 " clustered index for table "
2460 << table->name
2461 << " because of the following error: "
2462 << err_msg << "."
2463 " Refusing to load the rest of the"
2464 " indexes (if any) and the whole table"
2465 " altogether.";
2466 error = DB_CORRUPTION;
2467 goto func_exit;
2468 }
2469
2470 break;
2471 } else if (err_msg == dict_load_index_del) {
2472 /* Skip delete-marked records. */
2473 goto next_rec;
2474 } else if (err_msg) {
2475 ib::error() << err_msg;
2476 if (ignore_err & DICT_ERR_IGNORE_CORRUPT) {
2477 goto next_rec;
2478 }
2479 error = DB_CORRUPTION;
2480 goto func_exit;
2481 }
2482
2483 ut_ad(index);
2484 ut_ad(!dict_index_is_online_ddl(index));
2485
2486 /* Check whether the index is corrupted */
2487 if (index->is_corrupted()) {
2488 ib::error() << "Index " << index->name
2489 << " of table " << table->name
2490 << " is corrupted";
2491
2492 if (!srv_load_corrupted
2493 && !(ignore_err & DICT_ERR_IGNORE_CORRUPT)
2494 && dict_index_is_clust(index)) {
2495 dict_mem_index_free(index);
2496
2497 error = DB_INDEX_CORRUPT;
2498 goto func_exit;
2499 } else {
2500 /* We will load the index if
2501 1) srv_load_corrupted is TRUE
2502 2) ignore_err is set with
2503 DICT_ERR_IGNORE_CORRUPT
2504 3) if the index corrupted is a secondary
2505 index */
2506 ib::info() << "Load corrupted index "
2507 << index->name
2508 << " of table " << table->name;
2509 }
2510 }
2511
2512 if (index->type & DICT_FTS
2513 && !dict_table_has_fts_index(table)) {
2514 /* This should have been created by now. */
2515 ut_a(table->fts != NULL);
2516 DICT_TF2_FLAG_SET(table, DICT_TF2_FTS);
2517 }
2518
2519 /* We check for unsupported types first, so that the
2520 subsequent checks are relevant for the supported types. */
2521 if (index->type & ~(DICT_CLUSTERED | DICT_UNIQUE
2522 | DICT_CORRUPT | DICT_FTS
2523 | DICT_SPATIAL | DICT_VIRTUAL)) {
2524
2525 ib::error() << "Unknown type " << index->type
2526 << " of index " << index->name
2527 << " of table " << table->name;
2528
2529 error = DB_UNSUPPORTED;
2530 dict_mem_index_free(index);
2531 goto func_exit;
2532 } else if (index->page == FIL_NULL
2533 && table->is_readable()
2534 && (!(index->type & DICT_FTS))) {
2535
2536 ib::error() << "Trying to load index " << index->name
2537 << " for table " << table->name
2538 << ", but the index tree has been freed!";
2539
2540 if (ignore_err & DICT_ERR_IGNORE_INDEX_ROOT) {
2541 /* If caller can tolerate this error,
2542 we will continue to load the index and
2543 let caller deal with this error. However
2544 mark the index and table corrupted. We
2545 only need to mark such in the index
2546 dictionary cache for such metadata corruption,
2547 since we would always be able to set it
2548 when loading the dictionary cache */
2549 index->table = table;
2550 dict_set_corrupted_index_cache_only(index);
2551
2552 ib::info() << "Index is corrupt but forcing"
2553 " load into data dictionary";
2554 } else {
2555corrupted:
2556 dict_mem_index_free(index);
2557 error = DB_CORRUPTION;
2558 goto func_exit;
2559 }
2560 } else if (!dict_index_is_clust(index)
2561 && NULL == dict_table_get_first_index(table)) {
2562
2563 ib::error() << "Trying to load index " << index->name
2564 << " for table " << table->name
2565 << ", but the first index is not clustered!";
2566
2567 goto corrupted;
2568 } else if (dict_is_sys_table(table->id)
2569 && (dict_index_is_clust(index)
2570 || ((table == dict_sys->sys_tables)
2571 && !strcmp("ID_IND", index->name)))) {
2572
2573 /* The index was created in memory already at booting
2574 of the database server */
2575 dict_mem_index_free(index);
2576 } else {
2577 dict_load_fields(index, heap);
2578 index->table = table;
2579
2580 /* The data dictionary tables should never contain
2581 invalid index definitions. If we ignored this error
2582 and simply did not load this index definition, the
2583 .frm file would disagree with the index definitions
2584 inside InnoDB. */
2585 if (!dict_index_add_to_cache(
2586 index, index->page, false, &error)) {
2587 goto func_exit;
2588 }
2589 }
2590next_rec:
2591 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2592 }
2593
2594 ut_ad(table->fts_doc_id_index == NULL);
2595
2596 if (table->fts != NULL) {
2597 table->fts_doc_id_index = dict_table_get_index_on_name(
2598 table, FTS_DOC_ID_INDEX_NAME);
2599 }
2600
2601 /* If the table contains FTS indexes, populate table->fts->indexes */
2602 if (dict_table_has_fts_index(table)) {
2603 ut_ad(table->fts_doc_id_index != NULL);
2604 /* table->fts->indexes should have been created. */
2605 ut_a(table->fts->indexes != NULL);
2606 dict_table_get_all_fts_indexes(table, table->fts->indexes);
2607 }
2608
2609func_exit:
2610 btr_pcur_close(&pcur);
2611 mtr_commit(&mtr);
2612
2613 return(error);
2614}
2615
2616/** Load a table definition from a SYS_TABLES record to dict_table_t.
2617Do not load any columns or indexes.
2618@param[in] name Table name
2619@param[in] rec SYS_TABLES record
2620@param[out,own] table table, or NULL
2621@return error message
2622@retval NULL on success */
2623static
2624const char*
2625dict_load_table_low(table_name_t& name, const rec_t* rec, dict_table_t** table)
2626{
2627 table_id_t table_id;
2628 ulint space_id;
2629 ulint n_cols;
2630 ulint t_num;
2631 ulint flags;
2632 ulint flags2;
2633 ulint n_v_col;
2634
2635 if (const char* error_text = dict_sys_tables_rec_check(rec)) {
2636 *table = NULL;
2637 return(error_text);
2638 }
2639
2640 if (!dict_sys_tables_rec_read(rec, name, &table_id, &space_id,
2641 &t_num, &flags, &flags2)) {
2642 *table = NULL;
2643 return(dict_load_table_flags);
2644 }
2645
2646 dict_table_decode_n_col(t_num, &n_cols, &n_v_col);
2647
2648 *table = dict_mem_table_create(
2649 name.m_name, NULL, n_cols + n_v_col, n_v_col, flags, flags2);
2650 (*table)->space_id = space_id;
2651 (*table)->id = table_id;
2652 (*table)->file_unreadable = false;
2653
2654 return(NULL);
2655}
2656
2657/********************************************************************//**
2658Using the table->heap, copy the null-terminated filepath into
2659table->data_dir_path and replace the 'databasename/tablename.ibd'
2660portion with 'tablename'.
2661This allows SHOW CREATE TABLE to return the correct DATA DIRECTORY path.
2662Make this data directory path only if it has not yet been saved. */
2663static
2664void
2665dict_save_data_dir_path(
2666/*====================*/
2667 dict_table_t* table, /*!< in/out: table */
2668 const char* filepath) /*!< in: filepath of tablespace */
2669{
2670 ut_ad(mutex_own(&dict_sys->mutex));
2671 ut_a(DICT_TF_HAS_DATA_DIR(table->flags));
2672
2673 ut_a(!table->data_dir_path);
2674 ut_a(filepath);
2675
2676 /* Be sure this filepath is not the default filepath. */
2677 char* default_filepath = fil_make_filepath(
2678 NULL, table->name.m_name, IBD, false);
2679 if (default_filepath) {
2680 if (0 != strcmp(filepath, default_filepath)) {
2681 ulint pathlen = strlen(filepath);
2682 ut_a(pathlen < OS_FILE_MAX_PATH);
2683 ut_a(0 == strcmp(filepath + pathlen - 4, DOT_IBD));
2684
2685 table->data_dir_path = mem_heap_strdup(
2686 table->heap, filepath);
2687 os_file_make_data_dir_path(table->data_dir_path);
2688 }
2689
2690 ut_free(default_filepath);
2691 }
2692}
2693
2694/** Make sure the data_dir_path is saved in dict_table_t if DATA DIRECTORY
2695was used. Try to read it from the fil_system first, then from SYS_DATAFILES.
2696@param[in] table Table object
2697@param[in] dict_mutex_own true if dict_sys->mutex is owned already */
2698void
2699dict_get_and_save_data_dir_path(
2700 dict_table_t* table,
2701 bool dict_mutex_own)
2702{
2703 ut_ad(!table->is_temporary());
2704 ut_ad(!table->space || table->space->id == table->space_id);
2705
2706 if (!table->data_dir_path && table->space_id) {
2707 if (!dict_mutex_own) {
2708 dict_mutex_enter_for_mysql();
2709 }
2710
2711 if (const char* p = table->space
2712 ? table->space->chain.start->name : NULL) {
2713 table->flags |= (1 << DICT_TF_POS_DATA_DIR);
2714 dict_save_data_dir_path(table, p);
2715 } else if (char* path = dict_get_first_path(table->space_id)) {
2716 table->flags |= (1 << DICT_TF_POS_DATA_DIR);
2717 dict_save_data_dir_path(table, path);
2718 ut_free(path);
2719 }
2720
2721 if (table->data_dir_path == NULL) {
2722 /* Since we did not set the table data_dir_path,
2723 unset the flag. This does not change SYS_DATAFILES
2724 or SYS_TABLES or FSP_FLAGS on the header page of the
2725 tablespace, but it makes dict_table_t consistent. */
2726 table->flags &= ~DICT_TF_MASK_DATA_DIR;
2727 }
2728
2729 if (!dict_mutex_own) {
2730 dict_mutex_exit_for_mysql();
2731 }
2732 }
2733}
2734
2735/** Loads a table definition and also all its index definitions, and also
2736the cluster definition if the table is a member in a cluster. Also loads
2737all foreign key constraints where the foreign key is in the table or where
2738a foreign key references columns in this table.
2739@param[in] name Table name in the dbname/tablename format
2740@param[in] cached true=add to cache, false=do not
2741@param[in] ignore_err Error to be ignored when loading
2742 table and its index definition
2743@return table, NULL if does not exist; if the table is stored in an
2744.ibd file, but the file does not exist, then we set the file_unreadable
2745flag in the table object we return. */
2746dict_table_t*
2747dict_load_table(
2748 const char* name,
2749 bool cached,
2750 dict_err_ignore_t ignore_err)
2751{
2752 dict_names_t fk_list;
2753 dict_table_t* result;
2754 dict_names_t::iterator i;
2755 table_name_t table_name;
2756
2757 DBUG_ENTER("dict_load_table");
2758 DBUG_PRINT("dict_load_table", ("loading table: '%s'", name));
2759
2760 ut_ad(mutex_own(&dict_sys->mutex));
2761
2762 table_name.m_name = const_cast<char*>(name);
2763
2764 result = dict_table_check_if_in_cache_low(name);
2765
2766 if (!result) {
2767 result = dict_load_table_one(table_name, cached, ignore_err,
2768 fk_list);
2769 while (!fk_list.empty()) {
2770 table_name_t fk_table_name;
2771 dict_table_t* fk_table;
2772
2773 fk_table_name.m_name =
2774 const_cast<char*>(fk_list.front());
2775 fk_table = dict_table_check_if_in_cache_low(
2776 fk_table_name.m_name);
2777 if (!fk_table) {
2778 dict_load_table_one(fk_table_name, cached,
2779 ignore_err, fk_list);
2780 }
2781 fk_list.pop_front();
2782 }
2783 }
2784
2785 DBUG_RETURN(result);
2786}
2787
2788/** Opens a tablespace for dict_load_table_one()
2789@param[in,out] table A table that refers to the tablespace to open
2790@param[in] ignore_err Whether to ignore an error. */
2791UNIV_INLINE
2792void
2793dict_load_tablespace(
2794 dict_table_t* table,
2795 dict_err_ignore_t ignore_err)
2796{
2797 ut_ad(!table->is_temporary());
2798 ut_ad(!table->space);
2799 ut_ad(table->space_id < SRV_LOG_SPACE_FIRST_ID);
2800 ut_ad(fil_system.sys_space);
2801
2802 if (table->space_id == TRX_SYS_SPACE) {
2803 table->space = fil_system.sys_space;
2804 return;
2805 }
2806
2807 if (table->flags2 & DICT_TF2_DISCARDED) {
2808 ib::warn() << "Tablespace for table " << table->name
2809 << " is set as discarded.";
2810 table->file_unreadable = true;
2811 return;
2812 }
2813
2814 /* The tablespace may already be open. */
2815 table->space = fil_space_for_table_exists_in_mem(
2816 table->space_id, table->name.m_name, false, table->flags);
2817 if (table->space) {
2818 return;
2819 }
2820
2821 if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
2822 ib::error() << "Failed to find tablespace for table "
2823 << table->name << " in the cache. Attempting"
2824 " to load the tablespace with space id "
2825 << table->space_id;
2826 }
2827
2828 /* Use the remote filepath if needed. This parameter is optional
2829 in the call to fil_ibd_open(). If not supplied, it will be built
2830 from the table->name. */
2831 char* filepath = NULL;
2832 if (DICT_TF_HAS_DATA_DIR(table->flags)) {
2833 /* This will set table->data_dir_path from either
2834 fil_system or SYS_DATAFILES */
2835 dict_get_and_save_data_dir_path(table, true);
2836
2837 if (table->data_dir_path) {
2838 filepath = fil_make_filepath(
2839 table->data_dir_path,
2840 table->name.m_name, IBD, true);
2841 }
2842 }
2843
2844 /* Try to open the tablespace. We set the 2nd param (fix_dict) to
2845 false because we do not have an x-lock on dict_operation_lock */
2846 table->space = fil_ibd_open(
2847 true, false, FIL_TYPE_TABLESPACE, table->space_id,
2848 dict_tf_to_fsp_flags(table->flags),
2849 table->name, filepath);
2850
2851 if (!table->space) {
2852 /* We failed to find a sensible tablespace file */
2853 table->file_unreadable = true;
2854 }
2855
2856 ut_free(filepath);
2857}
2858
2859/** Loads a table definition and also all its index definitions.
2860
2861Loads those foreign key constraints whose referenced table is already in
2862dictionary cache. If a foreign key constraint is not loaded, then the
2863referenced table is pushed into the output stack (fk_tables), if it is not
2864NULL. These tables must be subsequently loaded so that all the foreign
2865key constraints are loaded into memory.
2866
2867@param[in] name Table name in the db/tablename format
2868@param[in] cached true=add to cache, false=do not
2869@param[in] ignore_err Error to be ignored when loading table
2870 and its index definition
2871@param[out] fk_tables Related table names that must also be
2872 loaded to ensure that all foreign key
2873 constraints are loaded.
2874@return table, NULL if does not exist; if the table is stored in an
2875.ibd file, but the file does not exist, then we set the
2876file_unreadable flag in the table object we return */
2877static
2878dict_table_t*
2879dict_load_table_one(
2880 table_name_t& name,
2881 bool cached,
2882 dict_err_ignore_t ignore_err,
2883 dict_names_t& fk_tables)
2884{
2885 dberr_t err;
2886 dict_table_t* sys_tables;
2887 btr_pcur_t pcur;
2888 dict_index_t* sys_index;
2889 dtuple_t* tuple;
2890 mem_heap_t* heap;
2891 dfield_t* dfield;
2892 const rec_t* rec;
2893 const byte* field;
2894 ulint len;
2895 mtr_t mtr;
2896
2897 DBUG_ENTER("dict_load_table_one");
2898 DBUG_PRINT("dict_load_table_one", ("table: %s", name.m_name));
2899
2900 ut_ad(mutex_own(&dict_sys->mutex));
2901
2902 heap = mem_heap_create(32000);
2903
2904 mtr_start(&mtr);
2905
2906 sys_tables = dict_table_get_low("SYS_TABLES");
2907 sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
2908 ut_ad(!dict_table_is_comp(sys_tables));
2909 ut_ad(name_of_col_is(sys_tables, sys_index,
2910 DICT_FLD__SYS_TABLES__ID, "ID"));
2911 ut_ad(name_of_col_is(sys_tables, sys_index,
2912 DICT_FLD__SYS_TABLES__N_COLS, "N_COLS"));
2913 ut_ad(name_of_col_is(sys_tables, sys_index,
2914 DICT_FLD__SYS_TABLES__TYPE, "TYPE"));
2915 ut_ad(name_of_col_is(sys_tables, sys_index,
2916 DICT_FLD__SYS_TABLES__MIX_LEN, "MIX_LEN"));
2917 ut_ad(name_of_col_is(sys_tables, sys_index,
2918 DICT_FLD__SYS_TABLES__SPACE, "SPACE"));
2919
2920 tuple = dtuple_create(heap, 1);
2921 dfield = dtuple_get_nth_field(tuple, 0);
2922
2923 dfield_set_data(dfield, name.m_name, ut_strlen(name.m_name));
2924 dict_index_copy_types(tuple, sys_index, 1);
2925
2926 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
2927 BTR_SEARCH_LEAF, &pcur, &mtr);
2928 rec = btr_pcur_get_rec(&pcur);
2929
2930 if (!btr_pcur_is_on_user_rec(&pcur)
2931 || rec_get_deleted_flag(rec, 0)) {
2932 /* Not found */
2933err_exit:
2934 btr_pcur_close(&pcur);
2935 mtr_commit(&mtr);
2936 mem_heap_free(heap);
2937
2938 DBUG_RETURN(NULL);
2939 }
2940
2941 field = rec_get_nth_field_old(
2942 rec, DICT_FLD__SYS_TABLES__NAME, &len);
2943
2944 /* Check if the table name in record is the searched one */
2945 if (len != ut_strlen(name.m_name)
2946 || 0 != ut_memcmp(name.m_name, field, len)) {
2947
2948 goto err_exit;
2949 }
2950
2951 dict_table_t* table;
2952 if (const char* err_msg = dict_load_table_low(name, rec, &table)) {
2953 if (err_msg != dict_load_table_flags) {
2954 ib::error() << err_msg;
2955 }
2956 goto err_exit;
2957 }
2958
2959 btr_pcur_close(&pcur);
2960 mtr_commit(&mtr);
2961
2962 dict_load_tablespace(table, ignore_err);
2963
2964 dict_load_columns(table, heap);
2965
2966 dict_load_virtual(table, heap);
2967
2968 dict_table_add_system_columns(table, heap);
2969
2970 if (cached) {
2971 table->can_be_evicted = true;
2972 table->add_to_cache();
2973 }
2974
2975 mem_heap_empty(heap);
2976
2977 ut_ad(dict_tf2_is_valid(table->flags, table->flags2));
2978
2979 /* If there is no tablespace for the table then we only need to
2980 load the index definitions. So that we can IMPORT the tablespace
2981 later. When recovering table locks for resurrected incomplete
2982 transactions, the tablespace should exist, because DDL operations
2983 were not allowed while the table is being locked by a transaction. */
2984 dict_err_ignore_t index_load_err =
2985 !(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
2986 && !table->is_readable()
2987 ? DICT_ERR_IGNORE_ALL
2988 : ignore_err;
2989
2990 err = dict_load_indexes(table, heap, index_load_err);
2991
2992 if (err == DB_INDEX_CORRUPT) {
2993 /* Refuse to load the table if the table has a corrupted
2994 cluster index */
2995 if (!srv_load_corrupted) {
2996
2997 ib::error() << "Load table " << table->name
2998 << " failed, the table has"
2999 " corrupted clustered indexes. Turn on"
3000 " 'innodb_force_load_corrupted' to drop it";
3001 dict_table_remove_from_cache(table);
3002 table = NULL;
3003 goto func_exit;
3004 } else {
3005 if (table->indexes.start->is_corrupted()) {
3006 table->corrupted = true;
3007 }
3008 }
3009 }
3010
3011 if (err == DB_SUCCESS && cached && table->is_readable()) {
3012 if (table->space && !fil_space_get_size(table->space->id)) {
3013 table->corrupted = true;
3014 table->file_unreadable = true;
3015 } else if (table->supports_instant()) {
3016 err = btr_cur_instant_init(table);
3017 }
3018 }
3019
3020 /* Initialize table foreign_child value. Its value could be
3021 changed when dict_load_foreigns() is called below */
3022 table->fk_max_recusive_level = 0;
3023
3024 /* If the force recovery flag is set, we open the table irrespective
3025 of the error condition, since the user may want to dump data from the
3026 clustered index. However we load the foreign key information only if
3027 all indexes were loaded. */
3028 if (!cached || !table->is_readable()) {
3029 /* Don't attempt to load the indexes from disk. */
3030 } else if (err == DB_SUCCESS) {
3031 err = dict_load_foreigns(table->name.m_name, NULL,
3032 true, true,
3033 ignore_err, fk_tables);
3034
3035 if (err != DB_SUCCESS) {
3036 ib::warn() << "Load table " << table->name
3037 << " failed, the table has missing"
3038 " foreign key indexes. Turn off"
3039 " 'foreign_key_checks' and try again.";
3040
3041 dict_table_remove_from_cache(table);
3042 table = NULL;
3043 } else {
3044 dict_mem_table_fill_foreign_vcol_set(table);
3045 table->fk_max_recusive_level = 0;
3046 }
3047 } else {
3048 dict_index_t* index;
3049
3050 /* Make sure that at least the clustered index was loaded.
3051 Otherwise refuse to load the table */
3052 index = dict_table_get_first_index(table);
3053
3054 if (!srv_force_recovery
3055 || !index
3056 || !index->is_primary()) {
3057 dict_table_remove_from_cache(table);
3058 table = NULL;
3059 } else if (index->is_corrupted()
3060 && table->is_readable()) {
3061 /* It is possible we force to load a corrupted
3062 clustered index if srv_load_corrupted is set.
3063 Mark the table as corrupted in this case */
3064 table->corrupted = true;
3065 }
3066 }
3067
3068func_exit:
3069 mem_heap_free(heap);
3070
3071 ut_ad(!table
3072 || ignore_err != DICT_ERR_IGNORE_NONE
3073 || !table->is_readable()
3074 || !table->corrupted);
3075
3076 if (table && table->fts) {
3077 if (!(dict_table_has_fts_index(table)
3078 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)
3079 || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID))) {
3080 /* the table->fts could be created in dict_load_column
3081 when a user defined FTS_DOC_ID is present, but no
3082 FTS */
3083 fts_optimize_remove_table(table);
3084 fts_free(table);
3085 } else {
3086 fts_optimize_add_table(table);
3087 }
3088 }
3089
3090 ut_ad(err != DB_SUCCESS || dict_foreign_set_validate(*table));
3091
3092 DBUG_RETURN(table);
3093}
3094
3095/***********************************************************************//**
3096Loads a table object based on the table id.
3097@return table; NULL if table does not exist */
3098dict_table_t*
3099dict_load_table_on_id(
3100/*==================*/
3101 table_id_t table_id, /*!< in: table id */
3102 dict_err_ignore_t ignore_err) /*!< in: errors to ignore
3103 when loading the table */
3104{
3105 byte id_buf[8];
3106 btr_pcur_t pcur;
3107 mem_heap_t* heap;
3108 dtuple_t* tuple;
3109 dfield_t* dfield;
3110 dict_index_t* sys_table_ids;
3111 dict_table_t* sys_tables;
3112 const rec_t* rec;
3113 const byte* field;
3114 ulint len;
3115 dict_table_t* table;
3116 mtr_t mtr;
3117
3118 ut_ad(mutex_own(&dict_sys->mutex));
3119
3120 table = NULL;
3121
3122 /* NOTE that the operation of this function is protected by
3123 the dictionary mutex, and therefore no deadlocks can occur
3124 with other dictionary operations. */
3125
3126 mtr_start(&mtr);
3127 /*---------------------------------------------------*/
3128 /* Get the secondary index based on ID for table SYS_TABLES */
3129 sys_tables = dict_sys->sys_tables;
3130 sys_table_ids = dict_table_get_next_index(
3131 dict_table_get_first_index(sys_tables));
3132 ut_ad(!dict_table_is_comp(sys_tables));
3133 ut_ad(!dict_index_is_clust(sys_table_ids));
3134 heap = mem_heap_create(256);
3135
3136 tuple = dtuple_create(heap, 1);
3137 dfield = dtuple_get_nth_field(tuple, 0);
3138
3139 /* Write the table id in byte format to id_buf */
3140 mach_write_to_8(id_buf, table_id);
3141
3142 dfield_set_data(dfield, id_buf, 8);
3143 dict_index_copy_types(tuple, sys_table_ids, 1);
3144
3145 btr_pcur_open_on_user_rec(sys_table_ids, tuple, PAGE_CUR_GE,
3146 BTR_SEARCH_LEAF, &pcur, &mtr);
3147
3148 rec = btr_pcur_get_rec(&pcur);
3149
3150 if (page_rec_is_user_rec(rec)) {
3151 /*---------------------------------------------------*/
3152 /* Now we have the record in the secondary index
3153 containing the table ID and NAME */
3154check_rec:
3155 field = rec_get_nth_field_old(
3156 rec, DICT_FLD__SYS_TABLE_IDS__ID, &len);
3157 ut_ad(len == 8);
3158
3159 /* Check if the table id in record is the one searched for */
3160 if (table_id == mach_read_from_8(field)) {
3161 if (rec_get_deleted_flag(rec, 0)) {
3162 /* Until purge has completed, there
3163 may be delete-marked duplicate records
3164 for the same SYS_TABLES.ID, but different
3165 SYS_TABLES.NAME. */
3166 while (btr_pcur_move_to_next(&pcur, &mtr)) {
3167 rec = btr_pcur_get_rec(&pcur);
3168
3169 if (page_rec_is_user_rec(rec)) {
3170 goto check_rec;
3171 }
3172 }
3173 } else {
3174 /* Now we get the table name from the record */
3175 field = rec_get_nth_field_old(rec,
3176 DICT_FLD__SYS_TABLE_IDS__NAME, &len);
3177 /* Load the table definition to memory */
3178 char* table_name = mem_heap_strdupl(
3179 heap, (char*) field, len);
3180 table = dict_load_table(table_name, true, ignore_err);
3181 }
3182 }
3183 }
3184
3185 btr_pcur_close(&pcur);
3186 mtr_commit(&mtr);
3187 mem_heap_free(heap);
3188
3189 return(table);
3190}
3191
3192/********************************************************************//**
3193This function is called when the database is booted. Loads system table
3194index definitions except for the clustered index which is added to the
3195dictionary cache at booting before calling this function. */
3196void
3197dict_load_sys_table(
3198/*================*/
3199 dict_table_t* table) /*!< in: system table */
3200{
3201 mem_heap_t* heap;
3202
3203 ut_ad(mutex_own(&dict_sys->mutex));
3204
3205 heap = mem_heap_create(1000);
3206
3207 dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
3208
3209 mem_heap_free(heap);
3210}
3211
3212/********************************************************************//**
3213Loads foreign key constraint col names (also for the referenced table).
3214Members that must be set (and valid) in foreign:
3215foreign->heap
3216foreign->n_fields
3217foreign->id ('\0'-terminated)
3218Members that will be created and set by this function:
3219foreign->foreign_col_names[i]
3220foreign->referenced_col_names[i]
3221(for i=0..foreign->n_fields-1) */
3222static
3223void
3224dict_load_foreign_cols(
3225/*===================*/
3226 dict_foreign_t* foreign)/*!< in/out: foreign constraint object */
3227{
3228 dict_table_t* sys_foreign_cols;
3229 dict_index_t* sys_index;
3230 btr_pcur_t pcur;
3231 dtuple_t* tuple;
3232 dfield_t* dfield;
3233 const rec_t* rec;
3234 const byte* field;
3235 ulint len;
3236 ulint i;
3237 mtr_t mtr;
3238 size_t id_len;
3239
3240 ut_ad(mutex_own(&dict_sys->mutex));
3241
3242 id_len = strlen(foreign->id);
3243
3244 foreign->foreign_col_names = static_cast<const char**>(
3245 mem_heap_alloc(foreign->heap,
3246 foreign->n_fields * sizeof(void*)));
3247
3248 foreign->referenced_col_names = static_cast<const char**>(
3249 mem_heap_alloc(foreign->heap,
3250 foreign->n_fields * sizeof(void*)));
3251
3252 mtr_start(&mtr);
3253
3254 sys_foreign_cols = dict_table_get_low("SYS_FOREIGN_COLS");
3255
3256 sys_index = UT_LIST_GET_FIRST(sys_foreign_cols->indexes);
3257 ut_ad(!dict_table_is_comp(sys_foreign_cols));
3258
3259 tuple = dtuple_create(foreign->heap, 1);
3260 dfield = dtuple_get_nth_field(tuple, 0);
3261
3262 dfield_set_data(dfield, foreign->id, id_len);
3263 dict_index_copy_types(tuple, sys_index, 1);
3264
3265 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3266 BTR_SEARCH_LEAF, &pcur, &mtr);
3267 for (i = 0; i < foreign->n_fields; i++) {
3268
3269 rec = btr_pcur_get_rec(&pcur);
3270
3271 ut_a(btr_pcur_is_on_user_rec(&pcur));
3272 ut_a(!rec_get_deleted_flag(rec, 0));
3273
3274 field = rec_get_nth_field_old(
3275 rec, DICT_FLD__SYS_FOREIGN_COLS__ID, &len);
3276
3277 if (len != id_len || ut_memcmp(foreign->id, field, len) != 0) {
3278 const rec_t* pos;
3279 ulint pos_len;
3280 const rec_t* for_col_name;
3281 ulint for_col_name_len;
3282 const rec_t* ref_col_name;
3283 ulint ref_col_name_len;
3284
3285 pos = rec_get_nth_field_old(
3286 rec, DICT_FLD__SYS_FOREIGN_COLS__POS,
3287 &pos_len);
3288
3289 for_col_name = rec_get_nth_field_old(
3290 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME,
3291 &for_col_name_len);
3292
3293 ref_col_name = rec_get_nth_field_old(
3294 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME,
3295 &ref_col_name_len);
3296
3297 ib::fatal sout;
3298
3299 sout << "Unable to load column names for foreign"
3300 " key '" << foreign->id
3301 << "' because it was not found in"
3302 " InnoDB internal table SYS_FOREIGN_COLS. The"
3303 " closest entry we found is:"
3304 " (ID='";
3305 sout.write(field, len);
3306 sout << "', POS=" << mach_read_from_4(pos)
3307 << ", FOR_COL_NAME='";
3308 sout.write(for_col_name, for_col_name_len);
3309 sout << "', REF_COL_NAME='";
3310 sout.write(ref_col_name, ref_col_name_len);
3311 sout << "')";
3312 }
3313
3314 field = rec_get_nth_field_old(
3315 rec, DICT_FLD__SYS_FOREIGN_COLS__POS, &len);
3316 ut_a(len == 4);
3317 ut_a(i == mach_read_from_4(field));
3318
3319 field = rec_get_nth_field_old(
3320 rec, DICT_FLD__SYS_FOREIGN_COLS__FOR_COL_NAME, &len);
3321 foreign->foreign_col_names[i] = mem_heap_strdupl(
3322 foreign->heap, (char*) field, len);
3323
3324 field = rec_get_nth_field_old(
3325 rec, DICT_FLD__SYS_FOREIGN_COLS__REF_COL_NAME, &len);
3326 foreign->referenced_col_names[i] = mem_heap_strdupl(
3327 foreign->heap, (char*) field, len);
3328
3329 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3330 }
3331
3332 btr_pcur_close(&pcur);
3333 mtr_commit(&mtr);
3334}
3335
3336/***********************************************************************//**
3337Loads a foreign key constraint to the dictionary cache. If the referenced
3338table is not yet loaded, it is added in the output parameter (fk_tables).
3339@return DB_SUCCESS or error code */
3340static MY_ATTRIBUTE((nonnull(1), warn_unused_result))
3341dberr_t
3342dict_load_foreign(
3343/*==============*/
3344 const char* id,
3345 /*!< in: foreign constraint id, must be
3346 '\0'-terminated */
3347 const char** col_names,
3348 /*!< in: column names, or NULL
3349 to use foreign->foreign_table->col_names */
3350 bool check_recursive,
3351 /*!< in: whether to record the foreign table
3352 parent count to avoid unlimited recursive
3353 load of chained foreign tables */
3354 bool check_charsets,
3355 /*!< in: whether to check charset
3356 compatibility */
3357 dict_err_ignore_t ignore_err,
3358 /*!< in: error to be ignored */
3359 dict_names_t& fk_tables)
3360 /*!< out: the foreign key constraint is added
3361 to the dictionary cache only if the referenced
3362 table is already in cache. Otherwise, the
3363 foreign key constraint is not added to cache,
3364 and the referenced table is added to this
3365 stack. */
3366{
3367 dict_foreign_t* foreign;
3368 dict_table_t* sys_foreign;
3369 btr_pcur_t pcur;
3370 dict_index_t* sys_index;
3371 dtuple_t* tuple;
3372 mem_heap_t* heap2;
3373 dfield_t* dfield;
3374 const rec_t* rec;
3375 const byte* field;
3376 ulint len;
3377 ulint n_fields_and_type;
3378 mtr_t mtr;
3379 dict_table_t* for_table;
3380 dict_table_t* ref_table;
3381 size_t id_len;
3382
3383 DBUG_ENTER("dict_load_foreign");
3384 DBUG_PRINT("dict_load_foreign",
3385 ("id: '%s', check_recursive: %d", id, check_recursive));
3386
3387 ut_ad(mutex_own(&dict_sys->mutex));
3388
3389 id_len = strlen(id);
3390
3391 heap2 = mem_heap_create(1000);
3392
3393 mtr_start(&mtr);
3394
3395 sys_foreign = dict_table_get_low("SYS_FOREIGN");
3396
3397 sys_index = UT_LIST_GET_FIRST(sys_foreign->indexes);
3398 ut_ad(!dict_table_is_comp(sys_foreign));
3399
3400 tuple = dtuple_create(heap2, 1);
3401 dfield = dtuple_get_nth_field(tuple, 0);
3402
3403 dfield_set_data(dfield, id, id_len);
3404 dict_index_copy_types(tuple, sys_index, 1);
3405
3406 btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
3407 BTR_SEARCH_LEAF, &pcur, &mtr);
3408 rec = btr_pcur_get_rec(&pcur);
3409
3410 if (!btr_pcur_is_on_user_rec(&pcur)
3411 || rec_get_deleted_flag(rec, 0)) {
3412 /* Not found */
3413
3414 ib::error() << "Cannot load foreign constraint " << id
3415 << ": could not find the relevant record in "
3416 << "SYS_FOREIGN";
3417
3418 btr_pcur_close(&pcur);
3419 mtr_commit(&mtr);
3420 mem_heap_free(heap2);
3421
3422 DBUG_RETURN(DB_ERROR);
3423 }
3424
3425 field = rec_get_nth_field_old(rec, DICT_FLD__SYS_FOREIGN__ID, &len);
3426
3427 /* Check if the id in record is the searched one */
3428 if (len != id_len || ut_memcmp(id, field, len) != 0) {
3429
3430 {
3431 ib::error err;
3432 err << "Cannot load foreign constraint " << id
3433 << ": found ";
3434 err.write(field, len);
3435 err << " instead in SYS_FOREIGN";
3436 }
3437
3438 btr_pcur_close(&pcur);
3439 mtr_commit(&mtr);
3440 mem_heap_free(heap2);
3441
3442 DBUG_RETURN(DB_ERROR);
3443 }
3444
3445 /* Read the table names and the number of columns associated
3446 with the constraint */
3447
3448 mem_heap_free(heap2);
3449
3450 foreign = dict_mem_foreign_create();
3451
3452 n_fields_and_type = mach_read_from_4(
3453 rec_get_nth_field_old(
3454 rec, DICT_FLD__SYS_FOREIGN__N_COLS, &len));
3455
3456 ut_a(len == 4);
3457
3458 /* We store the type in the bits 24..29 of n_fields_and_type. */
3459
3460 foreign->type = (unsigned int) (n_fields_and_type >> 24);
3461 foreign->n_fields = (unsigned int) (n_fields_and_type & 0x3FFUL);
3462
3463 foreign->id = mem_heap_strdupl(foreign->heap, id, id_len);
3464
3465 field = rec_get_nth_field_old(
3466 rec, DICT_FLD__SYS_FOREIGN__FOR_NAME, &len);
3467
3468 foreign->foreign_table_name = mem_heap_strdupl(
3469 foreign->heap, (char*) field, len);
3470 dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
3471
3472 const ulint foreign_table_name_len = len;
3473
3474 field = rec_get_nth_field_old(
3475 rec, DICT_FLD__SYS_FOREIGN__REF_NAME, &len);
3476 foreign->referenced_table_name = mem_heap_strdupl(
3477 foreign->heap, (char*) field, len);
3478 dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
3479
3480 btr_pcur_close(&pcur);
3481 mtr_commit(&mtr);
3482
3483 dict_load_foreign_cols(foreign);
3484
3485 ref_table = dict_table_check_if_in_cache_low(
3486 foreign->referenced_table_name_lookup);
3487 for_table = dict_table_check_if_in_cache_low(
3488 foreign->foreign_table_name_lookup);
3489
3490 if (!for_table) {
3491 /* To avoid recursively loading the tables related through
3492 the foreign key constraints, the child table name is saved
3493 here. The child table will be loaded later, along with its
3494 foreign key constraint. */
3495
3496 ut_a(ref_table != NULL);
3497 fk_tables.push_back(
3498 mem_heap_strdupl(ref_table->heap,
3499 foreign->foreign_table_name_lookup,
3500 foreign_table_name_len));
3501
3502 dict_foreign_remove_from_cache(foreign);
3503 DBUG_RETURN(DB_SUCCESS);
3504 }
3505
3506 ut_a(for_table || ref_table);
3507
3508 /* Note that there may already be a foreign constraint object in
3509 the dictionary cache for this constraint: then the following
3510 call only sets the pointers in it to point to the appropriate table
3511 and index objects and frees the newly created object foreign.
3512 Adding to the cache should always succeed since we are not creating
3513 a new foreign key constraint but loading one from the data
3514 dictionary. */
3515
3516 DBUG_RETURN(dict_foreign_add_to_cache(foreign, col_names,
3517 check_charsets,
3518 ignore_err));
3519}
3520
3521/***********************************************************************//**
3522Loads foreign key constraints where the table is either the foreign key
3523holder or where the table is referenced by a foreign key. Adds these
3524constraints to the data dictionary.
3525
3526The foreign key constraint is loaded only if the referenced table is also
3527in the dictionary cache. If the referenced table is not in dictionary
3528cache, then it is added to the output parameter (fk_tables).
3529
3530@return DB_SUCCESS or error code */
3531dberr_t
3532dict_load_foreigns(
3533 const char* table_name, /*!< in: table name */
3534 const char** col_names, /*!< in: column names, or NULL
3535 to use table->col_names */
3536 bool check_recursive,/*!< in: Whether to check
3537 recursive load of tables
3538 chained by FK */
3539 bool check_charsets, /*!< in: whether to check
3540 charset compatibility */
3541 dict_err_ignore_t ignore_err, /*!< in: error to be ignored */
3542 dict_names_t& fk_tables)
3543 /*!< out: stack of table
3544 names which must be loaded
3545 subsequently to load all the
3546 foreign key constraints. */
3547{
3548 ulint tuple_buf[(DTUPLE_EST_ALLOC(1) + sizeof(ulint) - 1)
3549 / sizeof(ulint)];
3550 btr_pcur_t pcur;
3551 dtuple_t* tuple;
3552 dfield_t* dfield;
3553 dict_index_t* sec_index;
3554 dict_table_t* sys_foreign;
3555 const rec_t* rec;
3556 const byte* field;
3557 ulint len;
3558 dberr_t err;
3559 mtr_t mtr;
3560
3561 DBUG_ENTER("dict_load_foreigns");
3562
3563 ut_ad(mutex_own(&dict_sys->mutex));
3564
3565 sys_foreign = dict_table_get_low("SYS_FOREIGN");
3566
3567 if (sys_foreign == NULL) {
3568 /* No foreign keys defined yet in this database */
3569
3570 ib::info() << "No foreign key system tables in the database";
3571 DBUG_RETURN(DB_ERROR);
3572 }
3573
3574 ut_ad(!dict_table_is_comp(sys_foreign));
3575 mtr_start(&mtr);
3576
3577 /* Get the secondary index based on FOR_NAME from table
3578 SYS_FOREIGN */
3579
3580 sec_index = dict_table_get_next_index(
3581 dict_table_get_first_index(sys_foreign));
3582 ut_ad(!dict_index_is_clust(sec_index));
3583start_load:
3584
3585 tuple = dtuple_create_from_mem(tuple_buf, sizeof(tuple_buf), 1, 0);
3586 dfield = dtuple_get_nth_field(tuple, 0);
3587
3588 dfield_set_data(dfield, table_name, ut_strlen(table_name));
3589 dict_index_copy_types(tuple, sec_index, 1);
3590
3591 btr_pcur_open_on_user_rec(sec_index, tuple, PAGE_CUR_GE,
3592 BTR_SEARCH_LEAF, &pcur, &mtr);
3593loop:
3594 rec = btr_pcur_get_rec(&pcur);
3595
3596 if (!btr_pcur_is_on_user_rec(&pcur)) {
3597 /* End of index */
3598
3599 goto load_next_index;
3600 }
3601
3602 /* Now we have the record in the secondary index containing a table
3603 name and a foreign constraint ID */
3604
3605 field = rec_get_nth_field_old(
3606 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__NAME, &len);
3607
3608 /* Check if the table name in the record is the one searched for; the
3609 following call does the comparison in the latin1_swedish_ci
3610 charset-collation, in a case-insensitive way. */
3611
3612 if (0 != cmp_data_data(dfield_get_type(dfield)->mtype,
3613 dfield_get_type(dfield)->prtype,
3614 static_cast<const byte*>(
3615 dfield_get_data(dfield)),
3616 dfield_get_len(dfield),
3617 field, len)) {
3618
3619 goto load_next_index;
3620 }
3621
3622 /* Since table names in SYS_FOREIGN are stored in a case-insensitive
3623 order, we have to check that the table name matches also in a binary
3624 string comparison. On Unix, MySQL allows table names that only differ
3625 in character case. If lower_case_table_names=2 then what is stored
3626 may not be the same case, but the previous comparison showed that they
3627 match with no-case. */
3628
3629 if (rec_get_deleted_flag(rec, 0)) {
3630 goto next_rec;
3631 }
3632
3633 if ((innobase_get_lower_case_table_names() != 2)
3634 && (0 != ut_memcmp(field, table_name, len))) {
3635 goto next_rec;
3636 }
3637
3638 /* Now we get a foreign key constraint id */
3639 field = rec_get_nth_field_old(
3640 rec, DICT_FLD__SYS_FOREIGN_FOR_NAME__ID, &len);
3641
3642 /* Copy the string because the page may be modified or evicted
3643 after mtr_commit() below. */
3644 char fk_id[MAX_TABLE_NAME_LEN + 1];
3645
3646 ut_a(len <= MAX_TABLE_NAME_LEN);
3647 memcpy(fk_id, field, len);
3648 fk_id[len] = '\0';
3649
3650 btr_pcur_store_position(&pcur, &mtr);
3651
3652 mtr_commit(&mtr);
3653
3654 /* Load the foreign constraint definition to the dictionary cache */
3655
3656 err = dict_load_foreign(fk_id, col_names,
3657 check_recursive, check_charsets, ignore_err,
3658 fk_tables);
3659
3660 if (err != DB_SUCCESS) {
3661 btr_pcur_close(&pcur);
3662
3663 DBUG_RETURN(err);
3664 }
3665
3666 mtr_start(&mtr);
3667
3668 btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
3669next_rec:
3670 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
3671
3672 goto loop;
3673
3674load_next_index:
3675 btr_pcur_close(&pcur);
3676 mtr_commit(&mtr);
3677
3678 sec_index = dict_table_get_next_index(sec_index);
3679
3680 if (sec_index != NULL) {
3681
3682 mtr_start(&mtr);
3683
3684 /* Switch to scan index on REF_NAME, fk_max_recusive_level
3685 already been updated when scanning FOR_NAME index, no need to
3686 update again */
3687 check_recursive = FALSE;
3688
3689 goto start_load;
3690 }
3691
3692 DBUG_RETURN(DB_SUCCESS);
3693}
3694
3695/***********************************************************************//**
3696Loads a table id based on the index id.
3697@return true if found */
3698static
3699bool
3700dict_load_table_id_on_index_id(
3701/*===========================*/
3702 index_id_t index_id, /*!< in: index id */
3703 table_id_t* table_id) /*!< out: table id */
3704{
3705 /* check hard coded indexes */
3706 switch(index_id) {
3707 case DICT_TABLES_ID:
3708 case DICT_COLUMNS_ID:
3709 case DICT_INDEXES_ID:
3710 case DICT_FIELDS_ID:
3711 *table_id = index_id;
3712 return true;
3713 case DICT_TABLE_IDS_ID:
3714 /* The following is a secondary index on SYS_TABLES */
3715 *table_id = DICT_TABLES_ID;
3716 return true;
3717 }
3718
3719 bool found = false;
3720 mtr_t mtr;
3721
3722 ut_ad(mutex_own(&(dict_sys->mutex)));
3723
3724 /* NOTE that the operation of this function is protected by
3725 the dictionary mutex, and therefore no deadlocks can occur
3726 with other dictionary operations. */
3727
3728 mtr_start(&mtr);
3729
3730 btr_pcur_t pcur;
3731 const rec_t* rec = dict_startscan_system(&pcur, &mtr, SYS_INDEXES);
3732
3733 while (rec) {
3734 ulint len;
3735 const byte* field = rec_get_nth_field_old(
3736 rec, DICT_FLD__SYS_INDEXES__ID, &len);
3737 ut_ad(len == 8);
3738
3739 /* Check if the index id is the one searched for */
3740 if (index_id == mach_read_from_8(field)) {
3741 found = true;
3742 /* Now we get the table id */
3743 const byte* field = rec_get_nth_field_old(
3744 rec,
3745 DICT_FLD__SYS_INDEXES__TABLE_ID,
3746 &len);
3747 *table_id = mach_read_from_8(field);
3748 break;
3749 }
3750 mtr_commit(&mtr);
3751 mtr_start(&mtr);
3752 rec = dict_getnext_system(&pcur, &mtr);
3753 }
3754
3755 btr_pcur_close(&pcur);
3756 mtr_commit(&mtr);
3757
3758 return(found);
3759}
3760
3761UNIV_INTERN
3762dict_table_t*
3763dict_table_open_on_index_id(
3764/*========================*/
3765 index_id_t index_id, /*!< in: index id */
3766 bool dict_locked) /*!< in: dict locked */
3767{
3768 if (!dict_locked) {
3769 mutex_enter(&dict_sys->mutex);
3770 }
3771
3772 ut_ad(mutex_own(&dict_sys->mutex));
3773 table_id_t table_id;
3774 dict_table_t * table = NULL;
3775 if (dict_load_table_id_on_index_id(index_id, &table_id)) {
3776 bool local_dict_locked = true;
3777 table = dict_table_open_on_id(table_id,
3778 local_dict_locked,
3779 DICT_TABLE_OP_LOAD_TABLESPACE);
3780 }
3781
3782 if (!dict_locked) {
3783 mutex_exit(&dict_sys->mutex);
3784 }
3785 return table;
3786}
3787