1/*****************************************************************************
2
3Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2016, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file row/row0ins.cc
22Insert into a table
23
24Created 4/20/1996 Heikki Tuuri
25*******************************************************/
26
27#include "ha_prototypes.h"
28
29#include "row0ins.h"
30#include "dict0dict.h"
31#include "dict0boot.h"
32#include "trx0rec.h"
33#include "trx0undo.h"
34#include "btr0btr.h"
35#include "btr0cur.h"
36#include "mach0data.h"
37#include "ibuf0ibuf.h"
38#include "que0que.h"
39#include "row0upd.h"
40#include "row0sel.h"
41#include "row0row.h"
42#include "row0log.h"
43#include "rem0cmp.h"
44#include "lock0lock.h"
45#include "log0log.h"
46#include "eval0eval.h"
47#include "data0data.h"
48#include "buf0lru.h"
49#include "fts0fts.h"
50#include "fts0types.h"
51#include "m_string.h"
52#include "gis0geo.h"
53
54/*************************************************************************
55IMPORTANT NOTE: Any operation that generates redo MUST check that there
56is enough space in the redo log before for that operation. This is
57done by calling log_free_check(). The reason for checking the
58availability of the redo log space before the start of the operation is
59that we MUST not hold any synchonization objects when performing the
60check.
61If you make a change in this module make sure that no codepath is
62introduced where a call to log_free_check() is bypassed. */
63
64/*********************************************************************//**
65Creates an insert node struct.
66@return own: insert node struct */
67ins_node_t*
68ins_node_create(
69/*============*/
70 ulint ins_type, /*!< in: INS_VALUES, ... */
71 dict_table_t* table, /*!< in: table where to insert */
72 mem_heap_t* heap) /*!< in: mem heap where created */
73{
74 ins_node_t* node;
75
76 node = static_cast<ins_node_t*>(
77 mem_heap_alloc(heap, sizeof(ins_node_t)));
78
79 node->common.type = QUE_NODE_INSERT;
80
81 node->ins_type = ins_type;
82
83 node->state = INS_NODE_SET_IX_LOCK;
84 node->table = table;
85 node->index = NULL;
86 node->entry = NULL;
87
88 node->select = NULL;
89
90 node->trx_id = 0;
91 node->duplicate = NULL;
92
93 node->entry_sys_heap = mem_heap_create(128);
94
95 node->magic_n = INS_NODE_MAGIC_N;
96
97 return(node);
98}
99
100/***********************************************************//**
101Creates an entry template for each index of a table. */
102static
103void
104ins_node_create_entry_list(
105/*=======================*/
106 ins_node_t* node) /*!< in: row insert node */
107{
108 dict_index_t* index;
109 dtuple_t* entry;
110
111 ut_ad(node->entry_sys_heap);
112
113 UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
114
115 /* We will include all indexes (include those corrupted
116 secondary indexes) in the entry list. Filteration of
117 these corrupted index will be done in row_ins() */
118
119 for (index = dict_table_get_first_index(node->table);
120 index != 0;
121 index = dict_table_get_next_index(index)) {
122
123 entry = row_build_index_entry_low(
124 node->row, NULL, index, node->entry_sys_heap,
125 ROW_BUILD_FOR_INSERT);
126
127 UT_LIST_ADD_LAST(node->entry_list, entry);
128 }
129}
130
131/*****************************************************************//**
132Adds system field buffers to a row. */
133static
134void
135row_ins_alloc_sys_fields(
136/*=====================*/
137 ins_node_t* node) /*!< in: insert node */
138{
139 dtuple_t* row;
140 dict_table_t* table;
141 const dict_col_t* col;
142 dfield_t* dfield;
143
144 row = node->row;
145 table = node->table;
146
147 ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
148
149 /* allocate buffer to hold the needed system created hidden columns. */
150 compile_time_assert(DATA_ROW_ID_LEN
151 + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN
152 == sizeof node->sys_buf);
153 memset(node->sys_buf, 0, sizeof node->sys_buf);
154 /* Assign DB_ROLL_PTR to 1 << ROLL_PTR_INSERT_FLAG_POS */
155 node->sys_buf[DATA_ROW_ID_LEN + DATA_TRX_ID_LEN] = 0x80;
156 ut_ad(!memcmp(node->sys_buf + DATA_ROW_ID_LEN, reset_trx_id,
157 sizeof reset_trx_id));
158
159 /* 1. Populate row-id */
160 col = dict_table_get_sys_col(table, DATA_ROW_ID);
161
162 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
163
164 dfield_set_data(dfield, node->sys_buf, DATA_ROW_ID_LEN);
165
166 /* 2. Populate trx id */
167 col = dict_table_get_sys_col(table, DATA_TRX_ID);
168
169 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
170
171 dfield_set_data(dfield, &node->sys_buf[DATA_ROW_ID_LEN],
172 DATA_TRX_ID_LEN);
173
174 col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
175
176 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
177
178 dfield_set_data(dfield, &node->sys_buf[DATA_ROW_ID_LEN
179 + DATA_TRX_ID_LEN],
180 DATA_ROLL_PTR_LEN);
181}
182
183/*********************************************************************//**
184Sets a new row to insert for an INS_DIRECT node. This function is only used
185if we have constructed the row separately, which is a rare case; this
186function is quite slow. */
187void
188ins_node_set_new_row(
189/*=================*/
190 ins_node_t* node, /*!< in: insert node */
191 dtuple_t* row) /*!< in: new row (or first row) for the node */
192{
193 node->state = INS_NODE_SET_IX_LOCK;
194 node->index = NULL;
195 node->entry = NULL;
196 node->duplicate = NULL;
197
198 node->row = row;
199
200 mem_heap_empty(node->entry_sys_heap);
201
202 /* Create templates for index entries */
203
204 ins_node_create_entry_list(node);
205
206 /* Allocate from entry_sys_heap buffers for sys fields */
207
208 row_ins_alloc_sys_fields(node);
209
210 /* As we allocated a new trx id buf, the trx id should be written
211 there again: */
212
213 node->trx_id = 0;
214}
215
216/*******************************************************************//**
217Does an insert operation by updating a delete-marked existing record
218in the index. This situation can occur if the delete-marked record is
219kept in the index for consistent reads.
220@return DB_SUCCESS or error code */
221static MY_ATTRIBUTE((nonnull, warn_unused_result))
222dberr_t
223row_ins_sec_index_entry_by_modify(
224/*==============================*/
225 ulint flags, /*!< in: undo logging and locking flags */
226 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
227 depending on whether mtr holds just a leaf
228 latch or also a tree latch */
229 btr_cur_t* cursor, /*!< in: B-tree cursor */
230 ulint** offsets,/*!< in/out: offsets on cursor->page_cur.rec */
231 mem_heap_t* offsets_heap,
232 /*!< in/out: memory heap that can be emptied */
233 mem_heap_t* heap, /*!< in/out: memory heap */
234 const dtuple_t* entry, /*!< in: index entry to insert */
235 que_thr_t* thr, /*!< in: query thread */
236 mtr_t* mtr) /*!< in: mtr; must be committed before
237 latching any further pages */
238{
239 big_rec_t* dummy_big_rec;
240 upd_t* update;
241 rec_t* rec;
242 dberr_t err;
243
244 rec = btr_cur_get_rec(cursor);
245
246 ut_ad(!dict_index_is_clust(cursor->index));
247 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
248 ut_ad(!entry->info_bits);
249
250 /* We know that in the alphabetical ordering, entry and rec are
251 identified. But in their binary form there may be differences if
252 there are char fields in them. Therefore we have to calculate the
253 difference. */
254
255 update = row_upd_build_sec_rec_difference_binary(
256 rec, cursor->index, *offsets, entry, heap);
257
258 if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
259 /* We should never insert in place of a record that
260 has not been delete-marked. The only exception is when
261 online CREATE INDEX copied the changes that we already
262 made to the clustered index, and completed the
263 secondary index creation before we got here. In this
264 case, the change would already be there. The CREATE
265 INDEX should be waiting for a MySQL meta-data lock
266 upgrade at least until this INSERT or UPDATE
267 returns. After that point, set_committed(true)
268 would be invoked in commit_inplace_alter_table(). */
269 ut_a(update->n_fields == 0);
270 ut_a(!cursor->index->is_committed());
271 ut_ad(!dict_index_is_online_ddl(cursor->index));
272 return(DB_SUCCESS);
273 }
274
275 if (mode == BTR_MODIFY_LEAF) {
276 /* Try an optimistic updating of the record, keeping changes
277 within the page */
278
279 /* TODO: pass only *offsets */
280 err = btr_cur_optimistic_update(
281 flags | BTR_KEEP_SYS_FLAG, cursor,
282 offsets, &offsets_heap, update, 0, thr,
283 thr_get_trx(thr)->id, mtr);
284 switch (err) {
285 case DB_OVERFLOW:
286 case DB_UNDERFLOW:
287 case DB_ZIP_OVERFLOW:
288 err = DB_FAIL;
289 default:
290 break;
291 }
292 } else {
293 ut_a(mode == BTR_MODIFY_TREE);
294 if (buf_LRU_buf_pool_running_out()) {
295
296 return(DB_LOCK_TABLE_FULL);
297 }
298
299 err = btr_cur_pessimistic_update(
300 flags | BTR_KEEP_SYS_FLAG, cursor,
301 offsets, &offsets_heap,
302 heap, &dummy_big_rec, update, 0,
303 thr, thr_get_trx(thr)->id, mtr);
304 ut_ad(!dummy_big_rec);
305 }
306
307 return(err);
308}
309
310/*******************************************************************//**
311Does an insert operation by delete unmarking and updating a delete marked
312existing record in the index. This situation can occur if the delete marked
313record is kept in the index for consistent reads.
314@return DB_SUCCESS, DB_FAIL, or error code */
315static MY_ATTRIBUTE((nonnull, warn_unused_result))
316dberr_t
317row_ins_clust_index_entry_by_modify(
318/*================================*/
319 btr_pcur_t* pcur, /*!< in/out: a persistent cursor pointing
320 to the clust_rec that is being modified. */
321 ulint flags, /*!< in: undo logging and locking flags */
322 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
323 depending on whether mtr holds just a leaf
324 latch or also a tree latch */
325 ulint** offsets,/*!< out: offsets on cursor->page_cur.rec */
326 mem_heap_t** offsets_heap,
327 /*!< in/out: pointer to memory heap that can
328 be emptied, or NULL */
329 mem_heap_t* heap, /*!< in/out: memory heap */
330 const dtuple_t* entry, /*!< in: index entry to insert */
331 que_thr_t* thr, /*!< in: query thread */
332 mtr_t* mtr) /*!< in: mtr; must be committed before
333 latching any further pages */
334{
335 const rec_t* rec;
336 upd_t* update;
337 dberr_t err;
338 btr_cur_t* cursor = btr_pcur_get_btr_cur(pcur);
339 TABLE* mysql_table = NULL;
340 ut_ad(dict_index_is_clust(cursor->index));
341
342 rec = btr_cur_get_rec(cursor);
343
344 ut_ad(rec_get_deleted_flag(rec,
345 dict_table_is_comp(cursor->index->table)));
346 /* In delete-marked records, DB_TRX_ID must
347 always refer to an existing undo log record. */
348 ut_ad(rec_get_trx_id(rec, cursor->index));
349
350 /* Build an update vector containing all the fields to be modified;
351 NOTE that this vector may NOT contain system columns trx_id or
352 roll_ptr */
353 if (thr->prebuilt != NULL) {
354 mysql_table = thr->prebuilt->m_mysql_table;
355 ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
356 }
357
358 update = row_upd_build_difference_binary(
359 cursor->index, entry, rec, NULL, true,
360 thr_get_trx(thr), heap, mysql_table);
361 if (mode != BTR_MODIFY_TREE) {
362 ut_ad((mode & ulint(~BTR_ALREADY_S_LATCHED))
363 == BTR_MODIFY_LEAF);
364
365 /* Try optimistic updating of the record, keeping changes
366 within the page */
367
368 err = btr_cur_optimistic_update(
369 flags, cursor, offsets, offsets_heap, update, 0, thr,
370 thr_get_trx(thr)->id, mtr);
371 switch (err) {
372 case DB_OVERFLOW:
373 case DB_UNDERFLOW:
374 case DB_ZIP_OVERFLOW:
375 err = DB_FAIL;
376 default:
377 break;
378 }
379 } else {
380 if (buf_LRU_buf_pool_running_out()) {
381
382 return(DB_LOCK_TABLE_FULL);
383
384 }
385
386 big_rec_t* big_rec = NULL;
387
388 err = btr_cur_pessimistic_update(
389 flags | BTR_KEEP_POS_FLAG,
390 cursor, offsets, offsets_heap, heap,
391 &big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
392
393 if (big_rec) {
394 ut_a(err == DB_SUCCESS);
395
396 DEBUG_SYNC_C("before_row_ins_upd_extern");
397 err = btr_store_big_rec_extern_fields(
398 pcur, *offsets, big_rec, mtr,
399 BTR_STORE_INSERT_UPDATE);
400 DEBUG_SYNC_C("after_row_ins_upd_extern");
401 dtuple_big_rec_free(big_rec);
402 }
403 }
404
405 return(err);
406}
407
408/*********************************************************************//**
409Returns TRUE if in a cascaded update/delete an ancestor node of node
410updates (not DELETE, but UPDATE) table.
411@return TRUE if an ancestor updates table */
412static
413ibool
414row_ins_cascade_ancestor_updates_table(
415/*===================================*/
416 que_node_t* node, /*!< in: node in a query graph */
417 dict_table_t* table) /*!< in: table */
418{
419 que_node_t* parent;
420
421 for (parent = que_node_get_parent(node);
422 que_node_get_type(parent) == QUE_NODE_UPDATE;
423 parent = que_node_get_parent(parent)) {
424
425 upd_node_t* upd_node;
426
427 upd_node = static_cast<upd_node_t*>(parent);
428
429 if (upd_node->table == table && !upd_node->is_delete) {
430
431 return(TRUE);
432 }
433 }
434
435 return(FALSE);
436}
437
438/*********************************************************************//**
439Returns the number of ancestor UPDATE or DELETE nodes of a
440cascaded update/delete node.
441@return number of ancestors */
442static MY_ATTRIBUTE((nonnull, warn_unused_result))
443ulint
444row_ins_cascade_n_ancestors(
445/*========================*/
446 que_node_t* node) /*!< in: node in a query graph */
447{
448 que_node_t* parent;
449 ulint n_ancestors = 0;
450
451 for (parent = que_node_get_parent(node);
452 que_node_get_type(parent) == QUE_NODE_UPDATE;
453 parent = que_node_get_parent(parent)) {
454
455 n_ancestors++;
456 }
457
458 return(n_ancestors);
459}
460
461/******************************************************************//**
462Calculates the update vector node->cascade->update for a child table in
463a cascaded update.
464@return whether any FULLTEXT INDEX is affected */
465static MY_ATTRIBUTE((nonnull, warn_unused_result))
466bool
467row_ins_cascade_calc_update_vec(
468/*============================*/
469 upd_node_t* node, /*!< in: update node of the parent
470 table */
471 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
472 type is != 0 */
473 mem_heap_t* heap, /*!< in: memory heap to use as
474 temporary storage */
475 trx_t* trx) /*!< in: update transaction */
476{
477 upd_node_t* cascade = node->cascade_node;
478 dict_table_t* table = foreign->foreign_table;
479 dict_index_t* index = foreign->foreign_index;
480 upd_t* update;
481 dict_table_t* parent_table;
482 dict_index_t* parent_index;
483 upd_t* parent_update;
484 ulint n_fields_updated;
485 ulint parent_field_no;
486 ulint i;
487 ulint j;
488 bool doc_id_updated = false;
489 ulint doc_id_pos = 0;
490 doc_id_t new_doc_id = FTS_NULL_DOC_ID;
491 ulint prefix_col;
492
493 ut_a(node);
494 ut_a(foreign);
495 ut_a(cascade);
496 ut_a(table);
497 ut_a(index);
498
499 /* Calculate the appropriate update vector which will set the fields
500 in the child index record to the same value (possibly padded with
501 spaces if the column is a fixed length CHAR or FIXBINARY column) as
502 the referenced index record will get in the update. */
503
504 parent_table = node->table;
505 ut_a(parent_table == foreign->referenced_table);
506 parent_index = foreign->referenced_index;
507 parent_update = node->update;
508
509 update = cascade->update;
510
511 update->info_bits = 0;
512
513 n_fields_updated = 0;
514
515 bool affects_fulltext = false;
516
517 if (table->fts) {
518 doc_id_pos = dict_table_get_nth_col_pos(
519 table, table->fts->doc_col, &prefix_col);
520 }
521
522 for (i = 0; i < foreign->n_fields; i++) {
523
524 parent_field_no = dict_table_get_nth_col_pos(
525 parent_table,
526 dict_index_get_nth_col_no(parent_index, i),
527 &prefix_col);
528
529 for (j = 0; j < parent_update->n_fields; j++) {
530 const upd_field_t* parent_ufield
531 = &parent_update->fields[j];
532
533 if (parent_ufield->field_no == parent_field_no) {
534
535 ulint min_size;
536 const dict_col_t* col;
537 ulint ufield_len;
538 upd_field_t* ufield;
539
540 col = dict_index_get_nth_col(index, i);
541
542 /* A field in the parent index record is
543 updated. Let us make the update vector
544 field for the child table. */
545
546 ufield = update->fields + n_fields_updated;
547
548 ufield->field_no
549 = dict_table_get_nth_col_pos(
550 table, dict_col_get_no(col),
551 &prefix_col);
552
553 ufield->orig_len = 0;
554 ufield->exp = NULL;
555
556 ufield->new_val = parent_ufield->new_val;
557 dfield_get_type(&ufield->new_val)->prtype |=
558 col->prtype & DATA_VERSIONED;
559 ufield_len = dfield_get_len(&ufield->new_val);
560
561 /* Clear the "external storage" flag */
562 dfield_set_len(&ufield->new_val, ufield_len);
563
564 /* Do not allow a NOT NULL column to be
565 updated as NULL */
566
567 if (dfield_is_null(&ufield->new_val)
568 && (col->prtype & DATA_NOT_NULL)) {
569 goto err_exit;
570 }
571
572 /* If the new value would not fit in the
573 column, do not allow the update */
574
575 if (!dfield_is_null(&ufield->new_val)
576 && dtype_get_at_most_n_mbchars(
577 col->prtype,
578 col->mbminlen, col->mbmaxlen,
579 col->len,
580 ufield_len,
581 static_cast<char*>(
582 dfield_get_data(
583 &ufield->new_val)))
584 < ufield_len) {
585 goto err_exit;
586 }
587
588 /* If the parent column type has a different
589 length than the child column type, we may
590 need to pad with spaces the new value of the
591 child column */
592
593 min_size = dict_col_get_min_size(col);
594
595 /* Because UNIV_SQL_NULL (the marker
596 of SQL NULL values) exceeds all possible
597 values of min_size, the test below will
598 not hold for SQL NULL columns. */
599
600 if (min_size > ufield_len) {
601
602 byte* pad;
603 ulint pad_len;
604 byte* padded_data;
605 ulint mbminlen;
606
607 padded_data = static_cast<byte*>(
608 mem_heap_alloc(
609 heap, min_size));
610
611 pad = padded_data + ufield_len;
612 pad_len = min_size - ufield_len;
613
614 memcpy(padded_data,
615 dfield_get_data(&ufield
616 ->new_val),
617 ufield_len);
618
619 mbminlen = dict_col_get_mbminlen(col);
620
621 ut_ad(!(ufield_len % mbminlen));
622 ut_ad(!(min_size % mbminlen));
623
624 if (mbminlen == 1
625 && dtype_get_charset_coll(
626 col->prtype)
627 == DATA_MYSQL_BINARY_CHARSET_COLL) {
628 /* Do not pad BINARY columns */
629 goto err_exit;
630 }
631
632 row_mysql_pad_col(mbminlen,
633 pad, pad_len);
634 dfield_set_data(&ufield->new_val,
635 padded_data, min_size);
636 }
637
638 /* Check whether the current column has
639 FTS index on it */
640 if (table->fts
641 && dict_table_is_fts_column(
642 table->fts->indexes,
643 dict_col_get_no(col),
644 col->is_virtual())
645 != ULINT_UNDEFINED) {
646 affects_fulltext = true;
647 }
648
649 /* If Doc ID is updated, check whether the
650 Doc ID is valid */
651 if (table->fts
652 && ufield->field_no == doc_id_pos) {
653 doc_id_t n_doc_id;
654
655 n_doc_id =
656 table->fts->cache->next_doc_id;
657
658 new_doc_id = fts_read_doc_id(
659 static_cast<const byte*>(
660 dfield_get_data(
661 &ufield->new_val)));
662
663 affects_fulltext = true;
664 doc_id_updated = true;
665
666 if (new_doc_id <= 0) {
667 ib::error() << "FTS Doc ID"
668 " must be larger than"
669 " 0";
670 goto err_exit;
671 }
672
673 if (new_doc_id < n_doc_id) {
674 ib::error() << "FTS Doc ID"
675 " must be larger than "
676 << n_doc_id - 1
677 << " for table "
678 << table->name;
679 goto err_exit;
680 }
681 }
682
683 n_fields_updated++;
684 }
685 }
686 }
687
688 if (affects_fulltext) {
689 ut_ad(table->fts);
690
691 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
692 doc_id_t doc_id;
693 doc_id_t* next_doc_id;
694 upd_field_t* ufield;
695
696 next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
697 heap, sizeof(doc_id_t)));
698
699 ut_ad(!doc_id_updated);
700 ufield = update->fields + n_fields_updated;
701 fts_get_next_doc_id(table, next_doc_id);
702 doc_id = fts_update_doc_id(table, ufield, next_doc_id);
703 n_fields_updated++;
704 fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
705 } else {
706 if (doc_id_updated) {
707 ut_ad(new_doc_id);
708 fts_trx_add_op(trx, table, new_doc_id,
709 FTS_INSERT, NULL);
710 } else {
711 ib::error() << "FTS Doc ID must be updated"
712 " along with FTS indexed column for"
713 " table " << table->name;
714err_exit:
715 n_fields_updated = ULINT_UNDEFINED;
716 }
717 }
718 }
719
720 update->n_fields = n_fields_updated;
721
722 return affects_fulltext;
723}
724
725/*********************************************************************//**
726Set detailed error message associated with foreign key errors for
727the given transaction. */
728static
729void
730row_ins_set_detailed(
731/*=================*/
732 trx_t* trx, /*!< in: transaction */
733 dict_foreign_t* foreign) /*!< in: foreign key constraint */
734{
735 ut_ad(!srv_read_only_mode);
736
737 mutex_enter(&srv_misc_tmpfile_mutex);
738 rewind(srv_misc_tmpfile);
739
740 if (os_file_set_eof(srv_misc_tmpfile)) {
741 ut_print_name(srv_misc_tmpfile, trx,
742 foreign->foreign_table_name);
743 std::string fk_str = dict_print_info_on_foreign_key_in_create_format(
744 trx, foreign, FALSE);
745 fputs(fk_str.c_str(), srv_misc_tmpfile);
746 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
747 } else {
748 trx_set_detailed_error(trx, "temp file operation failed");
749 }
750
751 mutex_exit(&srv_misc_tmpfile_mutex);
752}
753
754/*********************************************************************//**
755Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
756and displays information about the given transaction.
757The caller must release dict_foreign_err_mutex. */
758static
759void
760row_ins_foreign_trx_print(
761/*======================*/
762 trx_t* trx) /*!< in: transaction */
763{
764 ulint n_rec_locks;
765 ulint n_trx_locks;
766 ulint heap_size;
767
768 ut_ad(!srv_read_only_mode);
769
770 lock_mutex_enter();
771 n_rec_locks = lock_number_of_rows_locked(&trx->lock);
772 n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
773 heap_size = mem_heap_get_size(trx->lock.lock_heap);
774 lock_mutex_exit();
775
776 mutex_enter(&dict_foreign_err_mutex);
777 rewind(dict_foreign_err_file);
778 ut_print_timestamp(dict_foreign_err_file);
779 fputs(" Transaction:\n", dict_foreign_err_file);
780
781 trx_print_low(dict_foreign_err_file, trx, 600,
782 n_rec_locks, n_trx_locks, heap_size);
783
784 ut_ad(mutex_own(&dict_foreign_err_mutex));
785}
786
787/*********************************************************************//**
788Reports a foreign key error associated with an update or a delete of a
789parent table index entry. */
790static
791void
792row_ins_foreign_report_err(
793/*=======================*/
794 const char* errstr, /*!< in: error string from the viewpoint
795 of the parent table */
796 que_thr_t* thr, /*!< in: query thread whose run_node
797 is an update node */
798 dict_foreign_t* foreign, /*!< in: foreign key constraint */
799 const rec_t* rec, /*!< in: a matching index record in the
800 child table */
801 const dtuple_t* entry) /*!< in: index entry in the parent
802 table */
803{
804 std::string fk_str;
805
806 if (srv_read_only_mode) {
807 return;
808 }
809
810 FILE* ef = dict_foreign_err_file;
811 trx_t* trx = thr_get_trx(thr);
812
813 row_ins_set_detailed(trx, foreign);
814
815 row_ins_foreign_trx_print(trx);
816
817 fputs("Foreign key constraint fails for table ", ef);
818 ut_print_name(ef, trx, foreign->foreign_table_name);
819 fputs(":\n", ef);
820 fk_str = dict_print_info_on_foreign_key_in_create_format(trx, foreign,
821 TRUE);
822 fputs(fk_str.c_str(), ef);
823 putc('\n', ef);
824 fputs(errstr, ef);
825 fprintf(ef, " in parent table, in index %s",
826 foreign->referenced_index->name());
827 if (entry) {
828 fputs(" tuple:\n", ef);
829 dtuple_print(ef, entry);
830 }
831 fputs("\nBut in child table ", ef);
832 ut_print_name(ef, trx, foreign->foreign_table_name);
833 fprintf(ef, ", in index %s", foreign->foreign_index->name());
834 if (rec) {
835 fputs(", there is a record:\n", ef);
836 rec_print(ef, rec, foreign->foreign_index);
837 } else {
838 fputs(", the record is not available\n", ef);
839 }
840 putc('\n', ef);
841
842 mutex_exit(&dict_foreign_err_mutex);
843}
844
845/*********************************************************************//**
846Reports a foreign key error to dict_foreign_err_file when we are trying
847to add an index entry to a child table. Note that the adding may be the result
848of an update, too. */
849static
850void
851row_ins_foreign_report_add_err(
852/*===========================*/
853 trx_t* trx, /*!< in: transaction */
854 dict_foreign_t* foreign, /*!< in: foreign key constraint */
855 const rec_t* rec, /*!< in: a record in the parent table:
856 it does not match entry because we
857 have an error! */
858 const dtuple_t* entry) /*!< in: index entry to insert in the
859 child table */
860{
861 std::string fk_str;
862
863 if (srv_read_only_mode) {
864 return;
865 }
866
867 FILE* ef = dict_foreign_err_file;
868
869 row_ins_set_detailed(trx, foreign);
870
871 row_ins_foreign_trx_print(trx);
872
873 fputs("Foreign key constraint fails for table ", ef);
874 ut_print_name(ef, trx, foreign->foreign_table_name);
875 fputs(":\n", ef);
876 fk_str = dict_print_info_on_foreign_key_in_create_format(trx, foreign,
877 TRUE);
878 fputs(fk_str.c_str(), ef);
879 fprintf(ef, " in parent table, in index %s",
880 foreign->foreign_index->name());
881 if (entry) {
882 fputs(" tuple:\n", ef);
883 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
884 It would be better to only display the user columns. */
885 dtuple_print(ef, entry);
886 }
887 fputs("\nBut in parent table ", ef);
888 ut_print_name(ef, trx, foreign->referenced_table_name);
889 fprintf(ef, ", in index %s,\n"
890 "the closest match we can find is record:\n",
891 foreign->referenced_index->name());
892 if (rec && page_rec_is_supremum(rec)) {
893 /* If the cursor ended on a supremum record, it is better
894 to report the previous record in the error message, so that
895 the user gets a more descriptive error message. */
896 rec = page_rec_get_prev_const(rec);
897 }
898
899 if (rec) {
900 rec_print(ef, rec, foreign->referenced_index);
901 }
902 putc('\n', ef);
903
904 mutex_exit(&dict_foreign_err_mutex);
905}
906
907/*********************************************************************//**
908Invalidate the query cache for the given table. */
909static
910void
911row_ins_invalidate_query_cache(
912/*===========================*/
913 que_thr_t* thr, /*!< in: query thread whose run_node
914 is an update node */
915 const char* name) /*!< in: table name prefixed with
916 database name and a '/' character */
917{
918 innobase_invalidate_query_cache(thr_get_trx(thr), name);
919}
920
921
922/** Fill virtual column information in cascade node for the child table.
923@param[out] cascade child update node
924@param[in] rec clustered rec of child table
925@param[in] index clustered index of child table
926@param[in] node parent update node
927@param[in] foreign foreign key information
928@param[out] err error code. */
929static
930void
931row_ins_foreign_fill_virtual(
932 upd_node_t* cascade,
933 const rec_t* rec,
934 dict_index_t* index,
935 upd_node_t* node,
936 dict_foreign_t* foreign,
937 dberr_t* err)
938{
939 THD* thd = current_thd;
940 row_ext_t* ext;
941 ulint offsets_[REC_OFFS_NORMAL_SIZE];
942 rec_offs_init(offsets_);
943 const ulint* offsets =
944 rec_get_offsets(rec, index, offsets_, true,
945 ULINT_UNDEFINED, &cascade->heap);
946 mem_heap_t* v_heap = NULL;
947 upd_t* update = cascade->update;
948 ulint n_v_fld = index->table->n_v_def;
949 ulint n_diff;
950 upd_field_t* upd_field;
951 dict_vcol_set* v_cols = foreign->v_cols;
952 update->old_vrow = row_build(
953 ROW_COPY_POINTERS, index, rec,
954 offsets, index->table, NULL, NULL,
955 &ext, cascade->heap);
956 n_diff = update->n_fields;
957
958 update->n_fields += n_v_fld;
959
960 if (index->table->vc_templ == NULL) {
961 /** This can occur when there is a cascading
962 delete or update after restart. */
963 innobase_init_vc_templ(index->table);
964 }
965
966 for (ulint i = 0; i < n_v_fld; i++) {
967
968 dict_v_col_t* col = dict_table_get_nth_v_col(
969 index->table, i);
970
971 dict_vcol_set::iterator it = v_cols->find(col);
972
973 if (it == v_cols->end()) {
974 continue;
975 }
976
977 dfield_t* vfield = innobase_get_computed_value(
978 update->old_vrow, col, index,
979 &v_heap, update->heap, NULL, thd, NULL,
980 NULL, NULL, NULL);
981
982 if (vfield == NULL) {
983 *err = DB_COMPUTE_VALUE_FAILED;
984 goto func_exit;
985 }
986
987 upd_field = upd_get_nth_field(update, n_diff);
988
989 upd_field->old_v_val = static_cast<dfield_t*>(
990 mem_heap_alloc(cascade->heap,
991 sizeof *upd_field->old_v_val));
992
993 dfield_copy(upd_field->old_v_val, vfield);
994
995 upd_field_set_v_field_no(upd_field, i, index);
996
997 if (node->is_delete
998 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
999 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1000
1001 dfield_set_null(&upd_field->new_val);
1002 }
1003
1004 if (!node->is_delete
1005 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1006
1007 dfield_t* new_vfield = innobase_get_computed_value(
1008 update->old_vrow, col, index,
1009 &v_heap, update->heap, NULL, thd,
1010 NULL, NULL, node->update, foreign);
1011
1012 if (new_vfield == NULL) {
1013 *err = DB_COMPUTE_VALUE_FAILED;
1014 goto func_exit;
1015 }
1016
1017 dfield_copy(&(upd_field->new_val), new_vfield);
1018 }
1019
1020 n_diff++;
1021 }
1022
1023 update->n_fields = n_diff;
1024 *err = DB_SUCCESS;
1025
1026func_exit:
1027 if (v_heap) {
1028 mem_heap_free(v_heap);
1029 }
1030}
1031
1032#ifdef WITH_WSREP
1033dberr_t wsrep_append_foreign_key(trx_t *trx,
1034 dict_foreign_t* foreign,
1035 const rec_t* clust_rec,
1036 dict_index_t* clust_index,
1037 ibool referenced,
1038 ibool shared);
1039#endif /* WITH_WSREP */
1040
1041/*********************************************************************//**
1042Perform referential actions or checks when a parent row is deleted or updated
1043and the constraint had an ON DELETE or ON UPDATE condition which was not
1044RESTRICT.
1045@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
1046static MY_ATTRIBUTE((nonnull, warn_unused_result))
1047dberr_t
1048row_ins_foreign_check_on_constraint(
1049/*================================*/
1050 que_thr_t* thr, /*!< in: query thread whose run_node
1051 is an update node */
1052 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
1053 type is != 0 */
1054 btr_pcur_t* pcur, /*!< in: cursor placed on a matching
1055 index record in the child table */
1056 dtuple_t* entry, /*!< in: index entry in the parent
1057 table */
1058 mtr_t* mtr) /*!< in: mtr holding the latch of pcur
1059 page */
1060{
1061 upd_node_t* node;
1062 upd_node_t* cascade;
1063 dict_table_t* table = foreign->foreign_table;
1064 dict_index_t* index;
1065 dict_index_t* clust_index;
1066 dtuple_t* ref;
1067 const rec_t* rec;
1068 const rec_t* clust_rec;
1069 const buf_block_t* clust_block;
1070 upd_t* update;
1071 dberr_t err;
1072 trx_t* trx;
1073 mem_heap_t* tmp_heap = NULL;
1074 doc_id_t doc_id = FTS_NULL_DOC_ID;
1075
1076 DBUG_ENTER("row_ins_foreign_check_on_constraint");
1077 ut_a(thr);
1078 ut_a(foreign);
1079 ut_a(pcur);
1080 ut_a(mtr);
1081
1082 trx = thr_get_trx(thr);
1083
1084 /* Since we are going to delete or update a row, we have to invalidate
1085 the MySQL query cache for table. A deadlock of threads is not possible
1086 here because the caller of this function does not hold any latches with
1087 the mutex rank above the lock_sys_t::mutex. The query cache mutex
1088 has a rank just above the lock_sys_t::mutex. */
1089
1090 row_ins_invalidate_query_cache(thr, table->name.m_name);
1091
1092 node = static_cast<upd_node_t*>(thr->run_node);
1093
1094 if (node->is_delete && 0 == (foreign->type
1095 & (DICT_FOREIGN_ON_DELETE_CASCADE
1096 | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
1097
1098 row_ins_foreign_report_err("Trying to delete",
1099 thr, foreign,
1100 btr_pcur_get_rec(pcur), entry);
1101
1102 DBUG_RETURN(DB_ROW_IS_REFERENCED);
1103 }
1104
1105 if (!node->is_delete && 0 == (foreign->type
1106 & (DICT_FOREIGN_ON_UPDATE_CASCADE
1107 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1108
1109 /* This is an UPDATE */
1110
1111 row_ins_foreign_report_err("Trying to update",
1112 thr, foreign,
1113 btr_pcur_get_rec(pcur), entry);
1114
1115 DBUG_RETURN(DB_ROW_IS_REFERENCED);
1116 }
1117
1118 if (node->cascade_node == NULL) {
1119 node->cascade_heap = mem_heap_create(128);
1120 node->cascade_node = row_create_update_node_for_mysql(
1121 table, node->cascade_heap);
1122 que_node_set_parent(node->cascade_node, node);
1123
1124 }
1125 cascade = node->cascade_node;
1126 cascade->table = table;
1127 cascade->foreign = foreign;
1128
1129 if (node->is_delete
1130 && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1131 cascade->is_delete = PLAIN_DELETE;
1132 } else {
1133 cascade->is_delete = NO_DELETE;
1134
1135 if (foreign->n_fields > cascade->update_n_fields) {
1136 /* We have to make the update vector longer */
1137
1138 cascade->update = upd_create(foreign->n_fields,
1139 node->cascade_heap);
1140 cascade->update_n_fields = foreign->n_fields;
1141 }
1142
1143 /* We do not allow cyclic cascaded updating (DELETE is
1144 allowed, but not UPDATE) of the same table, as this
1145 can lead to an infinite cycle. Check that we are not
1146 updating the same table which is already being
1147 modified in this cascade chain. We have to check this
1148 also because the modification of the indexes of a
1149 'parent' table may still be incomplete, and we must
1150 avoid seeing the indexes of the parent table in an
1151 inconsistent state! */
1152
1153 if (row_ins_cascade_ancestor_updates_table(cascade, table)) {
1154
1155 /* We do not know if this would break foreign key
1156 constraints, but play safe and return an error */
1157
1158 err = DB_ROW_IS_REFERENCED;
1159
1160 row_ins_foreign_report_err(
1161 "Trying an update, possibly causing a cyclic"
1162 " cascaded update\n"
1163 "in the child table,", thr, foreign,
1164 btr_pcur_get_rec(pcur), entry);
1165
1166 goto nonstandard_exit_func;
1167 }
1168 }
1169
1170 if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1171 err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1172
1173 row_ins_foreign_report_err(
1174 "Trying a too deep cascaded delete or update\n",
1175 thr, foreign, btr_pcur_get_rec(pcur), entry);
1176
1177 goto nonstandard_exit_func;
1178 }
1179
1180 index = btr_pcur_get_btr_cur(pcur)->index;
1181
1182 ut_a(index == foreign->foreign_index);
1183
1184 rec = btr_pcur_get_rec(pcur);
1185
1186 tmp_heap = mem_heap_create(256);
1187
1188 if (dict_index_is_clust(index)) {
1189 /* pcur is already positioned in the clustered index of
1190 the child table */
1191
1192 clust_index = index;
1193 clust_rec = rec;
1194 clust_block = btr_pcur_get_block(pcur);
1195 } else {
1196 /* We have to look for the record in the clustered index
1197 in the child table */
1198
1199 clust_index = dict_table_get_first_index(table);
1200
1201 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1202 tmp_heap);
1203 btr_pcur_open_with_no_init(clust_index, ref,
1204 PAGE_CUR_LE, BTR_SEARCH_LEAF,
1205 cascade->pcur, 0, mtr);
1206
1207 clust_rec = btr_pcur_get_rec(cascade->pcur);
1208 clust_block = btr_pcur_get_block(cascade->pcur);
1209
1210 if (!page_rec_is_user_rec(clust_rec)
1211 || btr_pcur_get_low_match(cascade->pcur)
1212 < dict_index_get_n_unique(clust_index)) {
1213
1214 ib::error() << "In cascade of a foreign key op index "
1215 << index->name
1216 << " of table " << index->table->name;
1217
1218 fputs("InnoDB: record ", stderr);
1219 rec_print(stderr, rec, index);
1220 fputs("\n"
1221 "InnoDB: clustered record ", stderr);
1222 rec_print(stderr, clust_rec, clust_index);
1223 fputs("\n"
1224 "InnoDB: Submit a detailed bug report to"
1225 " https://jira.mariadb.org/\n", stderr);
1226 ut_ad(0);
1227 err = DB_SUCCESS;
1228
1229 goto nonstandard_exit_func;
1230 }
1231 }
1232
1233 /* Set an X-lock on the row to delete or update in the child table */
1234
1235 err = lock_table(0, table, LOCK_IX, thr);
1236
1237 if (err == DB_SUCCESS) {
1238 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1239 we already have a normal shared lock on the appropriate
1240 gap if the search criterion was not unique */
1241
1242 err = lock_clust_rec_read_check_and_lock_alt(
1243 0, clust_block, clust_rec, clust_index,
1244 LOCK_X, LOCK_REC_NOT_GAP, thr);
1245 }
1246
1247 if (err != DB_SUCCESS) {
1248
1249 goto nonstandard_exit_func;
1250 }
1251
1252 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1253 /* In delete-marked records, DB_TRX_ID must
1254 always refer to an existing undo log record. */
1255 ut_ad(rec_get_trx_id(clust_rec, clust_index));
1256 /* This can happen if there is a circular reference of
1257 rows such that cascading delete comes to delete a row
1258 already in the process of being delete marked */
1259 err = DB_SUCCESS;
1260
1261 goto nonstandard_exit_func;
1262 }
1263
1264 if (table->fts) {
1265 doc_id = fts_get_doc_id_from_rec(table, clust_rec,
1266 clust_index, tmp_heap);
1267 }
1268
1269 if (node->is_delete
1270 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1271 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1272 /* Build the appropriate update vector which sets
1273 foreign->n_fields first fields in rec to SQL NULL */
1274
1275 update = cascade->update;
1276
1277 update->info_bits = 0;
1278 update->n_fields = foreign->n_fields;
1279 UNIV_MEM_INVALID(update->fields,
1280 update->n_fields * sizeof *update->fields);
1281
1282 bool affects_fulltext = false;
1283
1284 for (ulint i = 0; i < foreign->n_fields; i++) {
1285 upd_field_t* ufield = &update->fields[i];
1286 ulint col_no = dict_index_get_nth_col_no(
1287 index, i);
1288 ulint prefix_col;
1289
1290 ufield->field_no = dict_table_get_nth_col_pos(
1291 table, col_no, &prefix_col);
1292 dict_col_t* col = dict_table_get_nth_col(
1293 table, col_no);
1294 dict_col_copy_type(col, dfield_get_type(&ufield->new_val));
1295
1296 ufield->orig_len = 0;
1297 ufield->exp = NULL;
1298 dfield_set_null(&ufield->new_val);
1299
1300 if (!affects_fulltext
1301 && table->fts && dict_table_is_fts_column(
1302 table->fts->indexes,
1303 dict_index_get_nth_col(index, i)->ind,
1304 dict_index_get_nth_col(index, i)
1305 ->is_virtual())
1306 != ULINT_UNDEFINED) {
1307 affects_fulltext = true;
1308 }
1309 }
1310
1311 if (affects_fulltext) {
1312 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1313 }
1314
1315 if (foreign->v_cols != NULL
1316 && foreign->v_cols->size() > 0) {
1317 row_ins_foreign_fill_virtual(
1318 cascade, clust_rec, clust_index,
1319 node, foreign, &err);
1320
1321 if (err != DB_SUCCESS) {
1322 goto nonstandard_exit_func;
1323 }
1324 }
1325 } else if (table->fts && cascade->is_delete == PLAIN_DELETE) {
1326 /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1327 bool affects_fulltext = false;
1328
1329 for (ulint i = 0; i < foreign->n_fields; i++) {
1330 if (dict_table_is_fts_column(
1331 table->fts->indexes,
1332 dict_index_get_nth_col(index, i)->ind,
1333 dict_index_get_nth_col(index, i)->is_virtual())
1334 != ULINT_UNDEFINED) {
1335 affects_fulltext = true;
1336 break;
1337 }
1338 }
1339
1340 if (affects_fulltext) {
1341 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1342 }
1343 }
1344
1345 if (!node->is_delete
1346 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1347
1348 /* Build the appropriate update vector which sets changing
1349 foreign->n_fields first fields in rec to new values */
1350
1351 bool affects_fulltext = row_ins_cascade_calc_update_vec(
1352 node, foreign, tmp_heap, trx);
1353
1354 if (foreign->v_cols && !foreign->v_cols->empty()) {
1355 row_ins_foreign_fill_virtual(
1356 cascade, clust_rec, clust_index,
1357 node, foreign, &err);
1358
1359 if (err != DB_SUCCESS) {
1360 goto nonstandard_exit_func;
1361 }
1362 }
1363
1364 switch (cascade->update->n_fields) {
1365 case ULINT_UNDEFINED:
1366 err = DB_ROW_IS_REFERENCED;
1367
1368 row_ins_foreign_report_err(
1369 "Trying a cascaded update where the"
1370 " updated value in the child\n"
1371 "table would not fit in the length"
1372 " of the column, or the value would\n"
1373 "be NULL and the column is"
1374 " declared as not NULL in the child table,",
1375 thr, foreign, btr_pcur_get_rec(pcur), entry);
1376
1377 goto nonstandard_exit_func;
1378 case 0:
1379 /* The update does not change any columns referred
1380 to in this foreign key constraint: no need to do
1381 anything */
1382
1383 err = DB_SUCCESS;
1384
1385 goto nonstandard_exit_func;
1386 }
1387
1388 /* Mark the old Doc ID as deleted */
1389 if (affects_fulltext) {
1390 ut_ad(table->fts);
1391 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1392 }
1393 }
1394
1395 if (table->versioned() && cascade->is_delete != PLAIN_DELETE
1396 && cascade->update->affects_versioned()) {
1397 ut_ad(!cascade->historical_heap);
1398 cascade->historical_heap = mem_heap_create(128);
1399 cascade->historical_row = row_build(
1400 ROW_COPY_POINTERS, clust_index, clust_rec, NULL, table,
1401 NULL, NULL, NULL, cascade->historical_heap);
1402 }
1403
1404 /* Store pcur position and initialize or store the cascade node
1405 pcur stored position */
1406
1407 btr_pcur_store_position(pcur, mtr);
1408
1409 if (index == clust_index) {
1410 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1411 } else {
1412 btr_pcur_store_position(cascade->pcur, mtr);
1413 }
1414
1415 mtr_commit(mtr);
1416
1417 ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1418
1419 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1420
1421#ifdef WITH_WSREP
1422 err = wsrep_append_foreign_key(trx, foreign, clust_rec, clust_index,
1423 FALSE, FALSE);
1424 if (err != DB_SUCCESS) {
1425 fprintf(stderr,
1426 "WSREP: foreign key append failed: %d\n", err);
1427 } else
1428#endif /* WITH_WSREP */
1429 err = row_update_cascade_for_mysql(thr, cascade,
1430 foreign->foreign_table);
1431
1432 /* Release the data dictionary latch for a while, so that we do not
1433 starve other threads from doing CREATE TABLE etc. if we have a huge
1434 cascaded operation running. */
1435
1436 row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1437
1438 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1439
1440 row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1441
1442 mtr_start(mtr);
1443
1444 /* Restore pcur position */
1445
1446 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1447
1448 if (tmp_heap) {
1449 mem_heap_free(tmp_heap);
1450 }
1451
1452 DBUG_RETURN(err);
1453
1454nonstandard_exit_func:
1455
1456 if (tmp_heap) {
1457 mem_heap_free(tmp_heap);
1458 }
1459
1460 btr_pcur_store_position(pcur, mtr);
1461
1462 mtr_commit(mtr);
1463 mtr_start(mtr);
1464
1465 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1466
1467 DBUG_RETURN(err);
1468}
1469
1470/*********************************************************************//**
1471Sets a shared lock on a record. Used in locking possible duplicate key
1472records and also in checking foreign key constraints.
1473@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1474static
1475dberr_t
1476row_ins_set_shared_rec_lock(
1477/*========================*/
1478 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1479 LOCK_REC_NOT_GAP type lock */
1480 const buf_block_t* block, /*!< in: buffer block of rec */
1481 const rec_t* rec, /*!< in: record */
1482 dict_index_t* index, /*!< in: index */
1483 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1484 que_thr_t* thr) /*!< in: query thread */
1485{
1486 dberr_t err;
1487
1488 ut_ad(rec_offs_validate(rec, index, offsets));
1489
1490 if (dict_index_is_clust(index)) {
1491 err = lock_clust_rec_read_check_and_lock(
1492 0, block, rec, index, offsets, LOCK_S, type, thr);
1493 } else {
1494 err = lock_sec_rec_read_check_and_lock(
1495 0, block, rec, index, offsets, LOCK_S, type, thr);
1496 }
1497
1498 return(err);
1499}
1500
1501/*********************************************************************//**
1502Sets a exclusive lock on a record. Used in locking possible duplicate key
1503records
1504@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1505static
1506dberr_t
1507row_ins_set_exclusive_rec_lock(
1508/*===========================*/
1509 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1510 LOCK_REC_NOT_GAP type lock */
1511 const buf_block_t* block, /*!< in: buffer block of rec */
1512 const rec_t* rec, /*!< in: record */
1513 dict_index_t* index, /*!< in: index */
1514 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1515 que_thr_t* thr) /*!< in: query thread */
1516{
1517 dberr_t err;
1518
1519 ut_ad(rec_offs_validate(rec, index, offsets));
1520
1521 if (dict_index_is_clust(index)) {
1522 err = lock_clust_rec_read_check_and_lock(
1523 0, block, rec, index, offsets, LOCK_X, type, thr);
1524 } else {
1525 err = lock_sec_rec_read_check_and_lock(
1526 0, block, rec, index, offsets, LOCK_X, type, thr);
1527 }
1528
1529 return(err);
1530}
1531
1532/***************************************************************//**
1533Checks if foreign key constraint fails for an index entry. Sets shared locks
1534which lock either the success or the failure of the constraint. NOTE that
1535the caller must have a shared latch on dict_operation_lock.
1536@return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1537dberr_t
1538row_ins_check_foreign_constraint(
1539/*=============================*/
1540 ibool check_ref,/*!< in: TRUE if we want to check that
1541 the referenced table is ok, FALSE if we
1542 want to check the foreign key table */
1543 dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
1544 tables mentioned in it must be in the
1545 dictionary cache if they exist at all */
1546 dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
1547 table, else the referenced table */
1548 dtuple_t* entry, /*!< in: index entry for index */
1549 que_thr_t* thr) /*!< in: query thread */
1550{
1551 dberr_t err;
1552 upd_node_t* upd_node;
1553 dict_table_t* check_table;
1554 dict_index_t* check_index;
1555 ulint n_fields_cmp;
1556 btr_pcur_t pcur;
1557 int cmp;
1558 mtr_t mtr;
1559 trx_t* trx = thr_get_trx(thr);
1560 mem_heap_t* heap = NULL;
1561 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1562 ulint* offsets = offsets_;
1563
1564 bool skip_gap_lock;
1565
1566 skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);
1567
1568 DBUG_ENTER("row_ins_check_foreign_constraint");
1569
1570 rec_offs_init(offsets_);
1571
1572#ifdef WITH_WSREP
1573 upd_node= NULL;
1574#endif /* WITH_WSREP */
1575
1576 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_S));
1577
1578 err = DB_SUCCESS;
1579
1580 if (trx->check_foreigns == FALSE) {
1581 /* The user has suppressed foreign key checks currently for
1582 this session */
1583 goto exit_func;
1584 }
1585
1586 /* If any of the foreign key fields in entry is SQL NULL, we
1587 suppress the foreign key check: this is compatible with Oracle,
1588 for example */
1589 for (ulint i = 0; i < entry->n_fields; i++) {
1590 dfield_t* field = dtuple_get_nth_field(entry, i);
1591 if (i < foreign->n_fields && dfield_is_null(field)) {
1592 goto exit_func;
1593 }
1594 /* System Versioning: if row_end != Inf, we
1595 suppress the foreign key check */
1596 if (field->type.vers_sys_end() && field->vers_history_row()) {
1597 goto exit_func;
1598 }
1599 }
1600
1601 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1602 upd_node = static_cast<upd_node_t*>(thr->run_node);
1603
1604 if (upd_node->is_delete != PLAIN_DELETE
1605 && upd_node->foreign == foreign) {
1606 /* If a cascaded update is done as defined by a
1607 foreign key constraint, do not check that
1608 constraint for the child row. In ON UPDATE CASCADE
1609 the update of the parent row is only half done when
1610 we come here: if we would check the constraint here
1611 for the child row it would fail.
1612
1613 A QUESTION remains: if in the child table there are
1614 several constraints which refer to the same parent
1615 table, we should merge all updates to the child as
1616 one update? And the updates can be contradictory!
1617 Currently we just perform the update associated
1618 with each foreign key constraint, one after
1619 another, and the user has problems predicting in
1620 which order they are performed. */
1621
1622 goto exit_func;
1623 }
1624 }
1625
1626 if (que_node_get_type(thr->run_node) == QUE_NODE_INSERT) {
1627 ins_node_t* insert_node =
1628 static_cast<ins_node_t*>(thr->run_node);
1629 dict_table_t* table = insert_node->index->table;
1630 if (table->versioned()) {
1631 dfield_t* row_end = dtuple_get_nth_field(
1632 insert_node->row, table->vers_end);
1633 if (row_end->vers_history_row()) {
1634 goto exit_func;
1635 }
1636 }
1637 }
1638
1639 if (check_ref) {
1640 check_table = foreign->referenced_table;
1641 check_index = foreign->referenced_index;
1642 } else {
1643 check_table = foreign->foreign_table;
1644 check_index = foreign->foreign_index;
1645 }
1646
1647 if (check_table == NULL
1648 || !check_table->is_readable()
1649 || check_index == NULL) {
1650
1651 if (!srv_read_only_mode && check_ref) {
1652 FILE* ef = dict_foreign_err_file;
1653 std::string fk_str;
1654
1655 row_ins_set_detailed(trx, foreign);
1656
1657 row_ins_foreign_trx_print(trx);
1658
1659 fputs("Foreign key constraint fails for table ", ef);
1660 ut_print_name(ef, trx,
1661 foreign->foreign_table_name);
1662 fputs(":\n", ef);
1663 fk_str = dict_print_info_on_foreign_key_in_create_format(
1664 trx, foreign, TRUE);
1665 fputs(fk_str.c_str(), ef);
1666 fprintf(ef, "\nTrying to add to index %s tuple:\n",
1667 foreign->foreign_index->name());
1668 dtuple_print(ef, entry);
1669 fputs("\nBut the parent table ", ef);
1670 ut_print_name(ef, trx,
1671 foreign->referenced_table_name);
1672 fputs("\nor its .ibd file does"
1673 " not currently exist!\n", ef);
1674 mutex_exit(&dict_foreign_err_mutex);
1675
1676 err = DB_NO_REFERENCED_ROW;
1677 }
1678
1679 goto exit_func;
1680 }
1681
1682 if (check_table != table) {
1683 /* We already have a LOCK_IX on table, but not necessarily
1684 on check_table */
1685
1686 err = lock_table(0, check_table, LOCK_IS, thr);
1687
1688 if (err != DB_SUCCESS) {
1689
1690 goto do_possible_lock_wait;
1691 }
1692 }
1693
1694 mtr_start(&mtr);
1695
1696 /* Store old value on n_fields_cmp */
1697
1698 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1699
1700 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1701
1702 btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1703 BTR_SEARCH_LEAF, &pcur, &mtr);
1704
1705 /* Scan index records and check if there is a matching record */
1706
1707 do {
1708 const rec_t* rec = btr_pcur_get_rec(&pcur);
1709 const buf_block_t* block = btr_pcur_get_block(&pcur);
1710
1711 if (page_rec_is_infimum(rec)) {
1712
1713 continue;
1714 }
1715
1716 offsets = rec_get_offsets(rec, check_index, offsets, true,
1717 ULINT_UNDEFINED, &heap);
1718
1719 if (page_rec_is_supremum(rec)) {
1720
1721 if (skip_gap_lock) {
1722
1723 continue;
1724 }
1725
1726 err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1727 rec, check_index,
1728 offsets, thr);
1729 switch (err) {
1730 case DB_SUCCESS_LOCKED_REC:
1731 case DB_SUCCESS:
1732 continue;
1733 default:
1734 goto end_scan;
1735 }
1736 }
1737
1738 cmp = cmp_dtuple_rec(entry, rec, offsets);
1739
1740 if (cmp == 0) {
1741 if (check_table->versioned()) {
1742 bool history_row = false;
1743
1744 if (check_index->is_primary()) {
1745 history_row = check_index->
1746 vers_history_row(rec, offsets);
1747 } else if (check_index->
1748 vers_history_row(rec, history_row))
1749 {
1750 break;
1751 }
1752
1753 if (history_row) {
1754 continue;
1755 }
1756 }
1757
1758 if (rec_get_deleted_flag(rec,
1759 rec_offs_comp(offsets))) {
1760 /* In delete-marked records, DB_TRX_ID must
1761 always refer to an existing undo log record. */
1762 ut_ad(!dict_index_is_clust(check_index)
1763 || row_get_rec_trx_id(rec, check_index,
1764 offsets));
1765
1766 err = row_ins_set_shared_rec_lock(
1767 skip_gap_lock
1768 ? LOCK_REC_NOT_GAP
1769 : LOCK_ORDINARY, block,
1770 rec, check_index, offsets, thr);
1771 switch (err) {
1772 case DB_SUCCESS_LOCKED_REC:
1773 case DB_SUCCESS:
1774 break;
1775 default:
1776 goto end_scan;
1777 }
1778 } else {
1779 /* Found a matching record. Lock only
1780 a record because we can allow inserts
1781 into gaps */
1782
1783 err = row_ins_set_shared_rec_lock(
1784 LOCK_REC_NOT_GAP, block,
1785 rec, check_index, offsets, thr);
1786
1787 switch (err) {
1788 case DB_SUCCESS_LOCKED_REC:
1789 case DB_SUCCESS:
1790 break;
1791 default:
1792 goto end_scan;
1793 }
1794
1795 if (check_ref) {
1796 err = DB_SUCCESS;
1797#ifdef WITH_WSREP
1798 err = wsrep_append_foreign_key(
1799 thr_get_trx(thr),
1800 foreign,
1801 rec,
1802 check_index,
1803 check_ref,
1804 (upd_node) ? TRUE : FALSE);
1805#endif /* WITH_WSREP */
1806 goto end_scan;
1807 } else if (foreign->type != 0) {
1808 /* There is an ON UPDATE or ON DELETE
1809 condition: check them in a separate
1810 function */
1811
1812 err = row_ins_foreign_check_on_constraint(
1813 thr, foreign, &pcur, entry,
1814 &mtr);
1815 if (err != DB_SUCCESS) {
1816 /* Since reporting a plain
1817 "duplicate key" error
1818 message to the user in
1819 cases where a long CASCADE
1820 operation would lead to a
1821 duplicate key in some
1822 other table is very
1823 confusing, map duplicate
1824 key errors resulting from
1825 FK constraints to a
1826 separate error code. */
1827
1828 if (err == DB_DUPLICATE_KEY) {
1829 err = DB_FOREIGN_DUPLICATE_KEY;
1830 }
1831
1832 goto end_scan;
1833 }
1834
1835 /* row_ins_foreign_check_on_constraint
1836 may have repositioned pcur on a
1837 different block */
1838 block = btr_pcur_get_block(&pcur);
1839 } else {
1840 row_ins_foreign_report_err(
1841 "Trying to delete or update",
1842 thr, foreign, rec, entry);
1843
1844 err = DB_ROW_IS_REFERENCED;
1845 goto end_scan;
1846 }
1847 }
1848 } else {
1849 ut_a(cmp < 0);
1850
1851 err = skip_gap_lock
1852 ? DB_SUCCESS
1853 : row_ins_set_shared_rec_lock(
1854 LOCK_GAP, block,
1855 rec, check_index, offsets, thr);
1856
1857 switch (err) {
1858 case DB_SUCCESS_LOCKED_REC:
1859 err = DB_SUCCESS;
1860 /* fall through */
1861 case DB_SUCCESS:
1862 if (check_ref) {
1863 err = DB_NO_REFERENCED_ROW;
1864 row_ins_foreign_report_add_err(
1865 trx, foreign, rec, entry);
1866 }
1867 default:
1868 break;
1869 }
1870
1871 goto end_scan;
1872 }
1873 } while (btr_pcur_move_to_next(&pcur, &mtr));
1874
1875 if (check_ref) {
1876 row_ins_foreign_report_add_err(
1877 trx, foreign, btr_pcur_get_rec(&pcur), entry);
1878 err = DB_NO_REFERENCED_ROW;
1879 } else {
1880 err = DB_SUCCESS;
1881 }
1882
1883end_scan:
1884 btr_pcur_close(&pcur);
1885
1886 mtr_commit(&mtr);
1887
1888 /* Restore old value */
1889 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1890
1891do_possible_lock_wait:
1892 if (err == DB_LOCK_WAIT) {
1893 trx->error_state = err;
1894
1895 que_thr_stop_for_mysql(thr);
1896
1897 thr->lock_state = QUE_THR_LOCK_ROW;
1898
1899 check_table->inc_fk_checks();
1900
1901 lock_wait_suspend_thread(thr);
1902
1903 thr->lock_state = QUE_THR_LOCK_NOLOCK;
1904
1905 if (check_table->to_be_dropped
1906 || trx->error_state == DB_LOCK_WAIT_TIMEOUT) {
1907 err = DB_LOCK_WAIT_TIMEOUT;
1908 }
1909
1910 check_table->dec_fk_checks();
1911 }
1912
1913exit_func:
1914 if (heap != NULL) {
1915 mem_heap_free(heap);
1916 }
1917
1918 DBUG_RETURN(err);
1919}
1920
1921/***************************************************************//**
1922Checks if foreign key constraints fail for an index entry. If index
1923is not mentioned in any constraint, this function does nothing,
1924Otherwise does searches to the indexes of referenced tables and
1925sets shared locks which lock either the success or the failure of
1926a constraint.
1927@return DB_SUCCESS or error code */
1928static MY_ATTRIBUTE((nonnull, warn_unused_result))
1929dberr_t
1930row_ins_check_foreign_constraints(
1931/*==============================*/
1932 dict_table_t* table, /*!< in: table */
1933 dict_index_t* index, /*!< in: index */
1934 dtuple_t* entry, /*!< in: index entry for index */
1935 que_thr_t* thr) /*!< in: query thread */
1936{
1937 dict_foreign_t* foreign;
1938 dberr_t err;
1939 trx_t* trx;
1940 ibool got_s_lock = FALSE;
1941
1942 trx = thr_get_trx(thr);
1943
1944 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1945 "foreign_constraint_check_for_ins");
1946
1947 for (dict_foreign_set::iterator it = table->foreign_set.begin();
1948 it != table->foreign_set.end();
1949 ++it) {
1950
1951 foreign = *it;
1952
1953 if (foreign->foreign_index == index) {
1954 dict_table_t* ref_table = NULL;
1955 dict_table_t* referenced_table
1956 = foreign->referenced_table;
1957
1958 if (referenced_table == NULL) {
1959
1960 ref_table = dict_table_open_on_name(
1961 foreign->referenced_table_name_lookup,
1962 FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1963 }
1964
1965 if (0 == trx->dict_operation_lock_mode) {
1966 got_s_lock = TRUE;
1967
1968 row_mysql_freeze_data_dictionary(trx);
1969 }
1970
1971 if (referenced_table) {
1972 foreign->foreign_table->inc_fk_checks();
1973 }
1974
1975 /* NOTE that if the thread ends up waiting for a lock
1976 we will release dict_operation_lock temporarily!
1977 But the counter on the table protects the referenced
1978 table from being dropped while the check is running. */
1979
1980 err = row_ins_check_foreign_constraint(
1981 TRUE, foreign, table, entry, thr);
1982
1983 if (referenced_table) {
1984 foreign->foreign_table->dec_fk_checks();
1985 }
1986
1987 if (got_s_lock) {
1988 row_mysql_unfreeze_data_dictionary(trx);
1989 }
1990
1991 if (ref_table != NULL) {
1992 dict_table_close(ref_table, FALSE, FALSE);
1993 }
1994
1995 if (err != DB_SUCCESS) {
1996
1997 return(err);
1998 }
1999 }
2000 }
2001
2002 return(DB_SUCCESS);
2003}
2004
2005/***************************************************************//**
2006Checks if a unique key violation to rec would occur at the index entry
2007insert.
2008@return TRUE if error */
2009static
2010ibool
2011row_ins_dupl_error_with_rec(
2012/*========================*/
2013 const rec_t* rec, /*!< in: user record; NOTE that we assume
2014 that the caller already has a record lock on
2015 the record! */
2016 const dtuple_t* entry, /*!< in: entry to insert */
2017 dict_index_t* index, /*!< in: index */
2018 const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
2019{
2020 ulint matched_fields;
2021 ulint n_unique;
2022 ulint i;
2023
2024 ut_ad(rec_offs_validate(rec, index, offsets));
2025
2026 n_unique = dict_index_get_n_unique(index);
2027
2028 matched_fields = 0;
2029
2030 cmp_dtuple_rec_with_match(entry, rec, offsets, &matched_fields);
2031
2032 if (matched_fields < n_unique) {
2033
2034 return(FALSE);
2035 }
2036
2037 /* In a unique secondary index we allow equal key values if they
2038 contain SQL NULLs */
2039
2040 if (!dict_index_is_clust(index) && !index->nulls_equal) {
2041
2042 for (i = 0; i < n_unique; i++) {
2043 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
2044
2045 return(FALSE);
2046 }
2047 }
2048 }
2049
2050 return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2051}
2052
2053/***************************************************************//**
2054Scans a unique non-clustered index at a given index entry to determine
2055whether a uniqueness violation has occurred for the key value of the entry.
2056Set shared locks on possible duplicate records.
2057@return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
2058static MY_ATTRIBUTE((nonnull, warn_unused_result))
2059dberr_t
2060row_ins_scan_sec_index_for_duplicate(
2061/*=================================*/
2062 ulint flags, /*!< in: undo logging and locking flags */
2063 dict_index_t* index, /*!< in: non-clustered unique index */
2064 dtuple_t* entry, /*!< in: index entry */
2065 que_thr_t* thr, /*!< in: query thread */
2066 bool s_latch,/*!< in: whether index->lock is being held */
2067 mtr_t* mtr, /*!< in/out: mini-transaction */
2068 mem_heap_t* offsets_heap)
2069 /*!< in/out: memory heap that can be emptied */
2070{
2071 ulint n_unique;
2072 int cmp;
2073 ulint n_fields_cmp;
2074 btr_pcur_t pcur;
2075 dberr_t err = DB_SUCCESS;
2076 ulint allow_duplicates;
2077 ulint* offsets = NULL;
2078 DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");
2079
2080
2081 ut_ad(s_latch == rw_lock_own_flagged(
2082 &index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
2083
2084 n_unique = dict_index_get_n_unique(index);
2085
2086 /* If the secondary index is unique, but one of the fields in the
2087 n_unique first fields is NULL, a unique key violation cannot occur,
2088 since we define NULL != NULL in this case */
2089
2090 if (!index->nulls_equal) {
2091 for (ulint i = 0; i < n_unique; i++) {
2092 if (UNIV_SQL_NULL == dfield_get_len(
2093 dtuple_get_nth_field(entry, i))) {
2094
2095 DBUG_RETURN(DB_SUCCESS);
2096 }
2097 }
2098 }
2099
2100 /* Store old value on n_fields_cmp */
2101
2102 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
2103
2104 dtuple_set_n_fields_cmp(entry, n_unique);
2105
2106 btr_pcur_open(index, entry, PAGE_CUR_GE,
2107 s_latch
2108 ? BTR_SEARCH_LEAF_ALREADY_S_LATCHED
2109 : BTR_SEARCH_LEAF,
2110 &pcur, mtr);
2111
2112 allow_duplicates = thr_get_trx(thr)->duplicates;
2113
2114 /* Scan index records and check if there is a duplicate */
2115
2116 do {
2117 const rec_t* rec = btr_pcur_get_rec(&pcur);
2118 const buf_block_t* block = btr_pcur_get_block(&pcur);
2119 const ulint lock_type = LOCK_ORDINARY;
2120
2121 if (page_rec_is_infimum(rec)) {
2122
2123 continue;
2124 }
2125
2126 offsets = rec_get_offsets(rec, index, offsets, true,
2127 ULINT_UNDEFINED, &offsets_heap);
2128
2129 if (flags & BTR_NO_LOCKING_FLAG) {
2130 /* Set no locks when applying log
2131 in online table rebuild. */
2132 } else if (allow_duplicates) {
2133
2134 /* If the SQL-query will update or replace
2135 duplicate key we will take X-lock for
2136 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2137 INSERT ON DUPLICATE KEY UPDATE). */
2138
2139 err = row_ins_set_exclusive_rec_lock(
2140 lock_type, block, rec, index, offsets, thr);
2141 } else {
2142
2143 err = row_ins_set_shared_rec_lock(
2144 lock_type, block, rec, index, offsets, thr);
2145 }
2146
2147 switch (err) {
2148 case DB_SUCCESS_LOCKED_REC:
2149 err = DB_SUCCESS;
2150 case DB_SUCCESS:
2151 break;
2152 default:
2153 goto end_scan;
2154 }
2155
2156 if (page_rec_is_supremum(rec)) {
2157
2158 continue;
2159 }
2160
2161 cmp = cmp_dtuple_rec(entry, rec, offsets);
2162
2163 if (cmp == 0) {
2164 if (row_ins_dupl_error_with_rec(rec, entry,
2165 index, offsets)) {
2166 err = DB_DUPLICATE_KEY;
2167
2168 thr_get_trx(thr)->error_info = index;
2169
2170 /* If the duplicate is on hidden FTS_DOC_ID,
2171 state so in the error log */
2172 if (index == index->table->fts_doc_id_index
2173 && DICT_TF2_FLAG_IS_SET(
2174 index->table,
2175 DICT_TF2_FTS_HAS_DOC_ID)) {
2176
2177 ib::error() << "Duplicate FTS_DOC_ID"
2178 " value on table "
2179 << index->table->name;
2180 }
2181
2182 goto end_scan;
2183 }
2184 } else {
2185 ut_a(cmp < 0);
2186 goto end_scan;
2187 }
2188 } while (btr_pcur_move_to_next(&pcur, mtr));
2189
2190end_scan:
2191 /* Restore old value */
2192 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2193
2194 DBUG_RETURN(err);
2195}
2196
2197/** Checks for a duplicate when the table is being rebuilt online.
2198@retval DB_SUCCESS when no duplicate is detected
2199@retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2200a newer version of entry (the entry should not be inserted)
2201@retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2202static MY_ATTRIBUTE((nonnull, warn_unused_result))
2203dberr_t
2204row_ins_duplicate_online(
2205/*=====================*/
2206 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2207 const dtuple_t* entry, /*!< in: entry that is being inserted */
2208 const rec_t* rec, /*!< in: clustered index record */
2209 ulint* offsets)/*!< in/out: rec_get_offsets(rec) */
2210{
2211 ulint fields = 0;
2212
2213 /* During rebuild, there should not be any delete-marked rows
2214 in the new table. */
2215 ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2216 ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2217
2218 /* Compare the PRIMARY KEY fields and the
2219 DB_TRX_ID, DB_ROLL_PTR. */
2220 cmp_dtuple_rec_with_match_low(
2221 entry, rec, offsets, n_uniq + 2, &fields);
2222
2223 if (fields < n_uniq) {
2224 /* Not a duplicate. */
2225 return(DB_SUCCESS);
2226 }
2227
2228 if (fields == n_uniq + 2) {
2229 /* rec is an exact match of entry. */
2230 return(DB_SUCCESS_LOCKED_REC);
2231 }
2232
2233 return(DB_DUPLICATE_KEY);
2234}
2235
2236/** Checks for a duplicate when the table is being rebuilt online.
2237@retval DB_SUCCESS when no duplicate is detected
2238@retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2239a newer version of entry (the entry should not be inserted)
2240@retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2241static MY_ATTRIBUTE((nonnull, warn_unused_result))
2242dberr_t
2243row_ins_duplicate_error_in_clust_online(
2244/*====================================*/
2245 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2246 const dtuple_t* entry, /*!< in: entry that is being inserted */
2247 const btr_cur_t*cursor, /*!< in: cursor on insert position */
2248 ulint** offsets,/*!< in/out: rec_get_offsets(rec) */
2249 mem_heap_t** heap) /*!< in/out: heap for offsets */
2250{
2251 dberr_t err = DB_SUCCESS;
2252 const rec_t* rec = btr_cur_get_rec(cursor);
2253
2254 ut_ad(!cursor->index->is_instant());
2255
2256 if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2257 *offsets = rec_get_offsets(rec, cursor->index, *offsets, true,
2258 ULINT_UNDEFINED, heap);
2259 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2260 if (err != DB_SUCCESS) {
2261 return(err);
2262 }
2263 }
2264
2265 rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2266
2267 if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2268 *offsets = rec_get_offsets(rec, cursor->index, *offsets, true,
2269 ULINT_UNDEFINED, heap);
2270 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2271 }
2272
2273 return(err);
2274}
2275
2276/***************************************************************//**
2277Checks if a unique key violation error would occur at an index entry
2278insert. Sets shared locks on possible duplicate records. Works only
2279for a clustered index!
2280@retval DB_SUCCESS if no error
2281@retval DB_DUPLICATE_KEY if error,
2282@retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2283record */
2284static MY_ATTRIBUTE((nonnull, warn_unused_result))
2285dberr_t
2286row_ins_duplicate_error_in_clust(
2287 ulint flags, /*!< in: undo logging and locking flags */
2288 btr_cur_t* cursor, /*!< in: B-tree cursor */
2289 const dtuple_t* entry, /*!< in: entry to insert */
2290 que_thr_t* thr) /*!< in: query thread */
2291{
2292 dberr_t err;
2293 rec_t* rec;
2294 ulint n_unique;
2295 trx_t* trx = thr_get_trx(thr);
2296 mem_heap_t*heap = NULL;
2297 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2298 ulint* offsets = offsets_;
2299 rec_offs_init(offsets_);
2300
2301 ut_ad(dict_index_is_clust(cursor->index));
2302
2303 /* NOTE: For unique non-clustered indexes there may be any number
2304 of delete marked records with the same value for the non-clustered
2305 index key (remember multiversioning), and which differ only in
2306 the row refererence part of the index record, containing the
2307 clustered index key fields. For such a secondary index record,
2308 to avoid race condition, we must FIRST do the insertion and after
2309 that check that the uniqueness condition is not breached! */
2310
2311 /* NOTE: A problem is that in the B-tree node pointers on an
2312 upper level may match more to the entry than the actual existing
2313 user records on the leaf level. So, even if low_match would suggest
2314 that a duplicate key violation may occur, this may not be the case. */
2315
2316 n_unique = dict_index_get_n_unique(cursor->index);
2317
2318 if (cursor->low_match >= n_unique) {
2319
2320 rec = btr_cur_get_rec(cursor);
2321
2322 if (!page_rec_is_infimum(rec)) {
2323 offsets = rec_get_offsets(rec, cursor->index, offsets,
2324 true,
2325 ULINT_UNDEFINED, &heap);
2326
2327 ulint lock_type;
2328
2329 lock_type =
2330 trx->isolation_level <= TRX_ISO_READ_COMMITTED
2331 ? LOCK_REC_NOT_GAP : LOCK_ORDINARY;
2332
2333 /* We set a lock on the possible duplicate: this
2334 is needed in logical logging of MySQL to make
2335 sure that in roll-forward we get the same duplicate
2336 errors as in original execution */
2337
2338 if (flags & BTR_NO_LOCKING_FLAG) {
2339 /* Do nothing if no-locking is set */
2340 err = DB_SUCCESS;
2341 } else if (trx->duplicates) {
2342
2343 /* If the SQL-query will update or replace
2344 duplicate key we will take X-lock for
2345 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2346 INSERT ON DUPLICATE KEY UPDATE). */
2347
2348 err = row_ins_set_exclusive_rec_lock(
2349 lock_type,
2350 btr_cur_get_block(cursor),
2351 rec, cursor->index, offsets, thr);
2352 } else {
2353
2354 err = row_ins_set_shared_rec_lock(
2355 lock_type,
2356 btr_cur_get_block(cursor), rec,
2357 cursor->index, offsets, thr);
2358 }
2359
2360 switch (err) {
2361 case DB_SUCCESS_LOCKED_REC:
2362 case DB_SUCCESS:
2363 break;
2364 default:
2365 goto func_exit;
2366 }
2367
2368 if (row_ins_dupl_error_with_rec(
2369 rec, entry, cursor->index, offsets)) {
2370duplicate:
2371 trx->error_info = cursor->index;
2372 err = DB_DUPLICATE_KEY;
2373 goto func_exit;
2374 }
2375 }
2376 }
2377
2378 if (cursor->up_match >= n_unique) {
2379
2380 rec = page_rec_get_next(btr_cur_get_rec(cursor));
2381
2382 if (!page_rec_is_supremum(rec)) {
2383 offsets = rec_get_offsets(rec, cursor->index, offsets,
2384 true,
2385 ULINT_UNDEFINED, &heap);
2386
2387 if (trx->duplicates) {
2388
2389 /* If the SQL-query will update or replace
2390 duplicate key we will take X-lock for
2391 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2392 INSERT ON DUPLICATE KEY UPDATE). */
2393
2394 err = row_ins_set_exclusive_rec_lock(
2395 LOCK_REC_NOT_GAP,
2396 btr_cur_get_block(cursor),
2397 rec, cursor->index, offsets, thr);
2398 } else {
2399
2400 err = row_ins_set_shared_rec_lock(
2401 LOCK_REC_NOT_GAP,
2402 btr_cur_get_block(cursor),
2403 rec, cursor->index, offsets, thr);
2404 }
2405
2406 switch (err) {
2407 case DB_SUCCESS_LOCKED_REC:
2408 case DB_SUCCESS:
2409 break;
2410 default:
2411 goto func_exit;
2412 }
2413
2414 if (row_ins_dupl_error_with_rec(
2415 rec, entry, cursor->index, offsets)) {
2416 goto duplicate;
2417 }
2418 }
2419
2420 /* This should never happen */
2421 ut_error;
2422 }
2423
2424 err = DB_SUCCESS;
2425func_exit:
2426 if (UNIV_LIKELY_NULL(heap)) {
2427 mem_heap_free(heap);
2428 }
2429 return(err);
2430}
2431
2432/***************************************************************//**
2433Checks if an index entry has long enough common prefix with an
2434existing record so that the intended insert of the entry must be
2435changed to a modify of the existing record. In the case of a clustered
2436index, the prefix must be n_unique fields long. In the case of a
2437secondary index, all fields must be equal. InnoDB never updates
2438secondary index records in place, other than clearing or setting the
2439delete-mark flag. We could be able to update the non-unique fields
2440of a unique secondary index record by checking the cursor->up_match,
2441but we do not do so, because it could have some locking implications.
2442@return TRUE if the existing record should be updated; FALSE if not */
2443UNIV_INLINE
2444ibool
2445row_ins_must_modify_rec(
2446/*====================*/
2447 const btr_cur_t* cursor) /*!< in: B-tree cursor */
2448{
2449 /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2450 Because node pointers on upper levels of the B-tree may match more
2451 to entry than to actual user records on the leaf level, we
2452 have to check if the candidate record is actually a user record.
2453 A clustered index node pointer contains index->n_unique first fields,
2454 and a secondary index node pointer contains all index fields. */
2455
2456 return(cursor->low_match
2457 >= dict_index_get_n_unique_in_tree(cursor->index)
2458 && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2459}
2460
2461/** Insert the externally stored fields (off-page columns)
2462of a clustered index entry.
2463@param[in] entry index entry to insert
2464@param[in] big_rec externally stored fields
2465@param[in,out] offsets rec_get_offsets()
2466@param[in,out] heap memory heap
2467@param[in] thd client connection, or NULL
2468@param[in] index clustered index
2469@return error code
2470@retval DB_SUCCESS
2471@retval DB_OUT_OF_FILE_SPACE */
2472static
2473dberr_t
2474row_ins_index_entry_big_rec(
2475 const dtuple_t* entry,
2476 const big_rec_t* big_rec,
2477 ulint* offsets,
2478 mem_heap_t** heap,
2479#ifndef DBUG_OFF
2480 const void* thd,
2481#endif /* DBUG_OFF */
2482 dict_index_t* index)
2483{
2484 mtr_t mtr;
2485 btr_pcur_t pcur;
2486 rec_t* rec;
2487 dberr_t error;
2488
2489 ut_ad(dict_index_is_clust(index));
2490
2491 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2492
2493 mtr.start();
2494 if (index->table->is_temporary()) {
2495 mtr.set_log_mode(MTR_LOG_NO_REDO);
2496 } else {
2497 index->set_modified(mtr);
2498 }
2499
2500 btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
2501 &pcur, &mtr);
2502 rec = btr_pcur_get_rec(&pcur);
2503 offsets = rec_get_offsets(rec, index, offsets, true,
2504 ULINT_UNDEFINED, heap);
2505
2506 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2507 error = btr_store_big_rec_extern_fields(
2508 &pcur, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2509 DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2510
2511 if (error == DB_SUCCESS
2512 && dict_index_is_online_ddl(index)) {
2513 row_log_table_insert(btr_pcur_get_rec(&pcur), index, offsets);
2514 }
2515
2516 mtr.commit();
2517
2518 btr_pcur_close(&pcur);
2519
2520 return(error);
2521}
2522
2523#ifdef DBUG_OFF
2524# define row_ins_index_entry_big_rec(e,big,ofs,heap,index,thd) \
2525 row_ins_index_entry_big_rec(e,big,ofs,heap,index)
2526#else /* DBUG_OFF */
2527# define row_ins_index_entry_big_rec(e,big,ofs,heap,index,thd) \
2528 row_ins_index_entry_big_rec(e,big,ofs,heap,thd,index)
2529#endif /* DBUG_OFF */
2530
2531/***************************************************************//**
2532Tries to insert an entry into a clustered index, ignoring foreign key
2533constraints. If a record with the same unique key is found, the other
2534record is necessarily marked deleted by a committed transaction, or a
2535unique key violation error occurs. The delete marked record is then
2536updated to an existing record, and we must write an undo log record on
2537the delete marked record.
2538@retval DB_SUCCESS on success
2539@retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2540@retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2541@return error code */
2542dberr_t
2543row_ins_clust_index_entry_low(
2544/*==========================*/
2545 ulint flags, /*!< in: undo logging and locking flags */
2546 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2547 depending on whether we wish optimistic or
2548 pessimistic descent down the index tree */
2549 dict_index_t* index, /*!< in: clustered index */
2550 ulint n_uniq, /*!< in: 0 or index->n_uniq */
2551 dtuple_t* entry, /*!< in/out: index entry to insert */
2552 ulint n_ext, /*!< in: number of externally stored columns */
2553 que_thr_t* thr, /*!< in: query thread */
2554 bool dup_chk_only)
2555 /*!< in: if true, just do duplicate check
2556 and return. don't execute actual insert. */
2557{
2558 btr_pcur_t pcur;
2559 btr_cur_t* cursor;
2560 dberr_t err = DB_SUCCESS;
2561 big_rec_t* big_rec = NULL;
2562 mtr_t mtr;
2563 ib_uint64_t auto_inc = 0;
2564 mem_heap_t* offsets_heap = NULL;
2565 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2566 ulint* offsets = offsets_;
2567 rec_offs_init(offsets_);
2568
2569 DBUG_ENTER("row_ins_clust_index_entry_low");
2570
2571 ut_ad(dict_index_is_clust(index));
2572 ut_ad(!dict_index_is_unique(index)
2573 || n_uniq == dict_index_get_n_unique(index));
2574 ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2575 ut_ad(!thr_get_trx(thr)->in_rollback);
2576
2577 mtr_start(&mtr);
2578
2579 if (index->table->is_temporary()) {
2580 /* Disable REDO logging as the lifetime of temp-tables is
2581 limited to server or connection lifetime and so REDO
2582 information is not needed on restart for recovery.
2583 Disable locking as temp-tables are local to a connection. */
2584
2585 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2586 ut_ad(!dict_index_is_online_ddl(index));
2587 ut_ad(!index->table->persistent_autoinc);
2588 ut_ad(!index->is_instant());
2589 mtr.set_log_mode(MTR_LOG_NO_REDO);
2590 } else {
2591 index->set_modified(mtr);
2592
2593 if (mode == BTR_MODIFY_LEAF
2594 && dict_index_is_online_ddl(index)) {
2595 mode = BTR_MODIFY_LEAF_ALREADY_S_LATCHED;
2596 mtr_s_lock(dict_index_get_lock(index), &mtr);
2597 }
2598
2599 if (unsigned ai = index->table->persistent_autoinc) {
2600 /* Prepare to persist the AUTO_INCREMENT value
2601 from the index entry to PAGE_ROOT_AUTO_INC. */
2602 const dfield_t* dfield = dtuple_get_nth_field(
2603 entry, ai - 1);
2604 auto_inc = dfield_is_null(dfield)
2605 ? 0
2606 : row_parse_int(static_cast<const byte*>(
2607 dfield->data),
2608 dfield->len,
2609 dfield->type.mtype,
2610 dfield->type.prtype
2611 & DATA_UNSIGNED);
2612 }
2613 }
2614
2615 /* Note that we use PAGE_CUR_LE as the search mode, because then
2616 the function will return in both low_match and up_match of the
2617 cursor sensible values */
2618 err = btr_pcur_open_low(index, 0, entry, PAGE_CUR_LE, mode, &pcur,
2619 __FILE__, __LINE__, auto_inc, &mtr);
2620 if (err != DB_SUCCESS) {
2621 index->table->file_unreadable = true;
2622 mtr.commit();
2623 goto func_exit;
2624 }
2625
2626 cursor = btr_pcur_get_btr_cur(&pcur);
2627 cursor->thr = thr;
2628
2629#ifdef UNIV_DEBUG
2630 {
2631 page_t* page = btr_cur_get_page(cursor);
2632 rec_t* first_rec = page_rec_get_next(
2633 page_get_infimum_rec(page));
2634
2635 ut_ad(page_rec_is_supremum(first_rec)
2636 || rec_n_fields_is_sane(index, first_rec, entry));
2637 }
2638#endif /* UNIV_DEBUG */
2639
2640 if (UNIV_UNLIKELY(entry->info_bits != 0)) {
2641 ut_ad(entry->info_bits == REC_INFO_DEFAULT_ROW);
2642 ut_ad(flags == BTR_NO_LOCKING_FLAG);
2643 ut_ad(index->is_instant());
2644 ut_ad(!dict_index_is_online_ddl(index));
2645 ut_ad(!dup_chk_only);
2646
2647 const rec_t* rec = btr_cur_get_rec(cursor);
2648
2649 switch (rec_get_info_bits(rec, page_rec_is_comp(rec))
2650 & (REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG)) {
2651 case REC_INFO_MIN_REC_FLAG:
2652 thr_get_trx(thr)->error_info = index;
2653 err = DB_DUPLICATE_KEY;
2654 goto err_exit;
2655 case REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG:
2656 /* The 'default row' is never delete-marked.
2657 If a table loses its 'instantness', it happens
2658 by the rollback of this first-time insert, or
2659 by a call to btr_page_empty() on the root page
2660 when the table becomes empty. */
2661 err = DB_CORRUPTION;
2662 goto err_exit;
2663 default:
2664 ut_ad(!row_ins_must_modify_rec(cursor));
2665 goto do_insert;
2666 }
2667 }
2668
2669 if (index->is_instant()) entry->trim(*index);
2670
2671 if (rec_is_default_row(btr_cur_get_rec(cursor), index)) {
2672 goto do_insert;
2673 }
2674
2675 if (n_uniq
2676 && (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2677
2678 if (flags
2679 == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2680 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2681 /* Set no locks when applying log
2682 in online table rebuild. Only check for duplicates. */
2683 err = row_ins_duplicate_error_in_clust_online(
2684 n_uniq, entry, cursor,
2685 &offsets, &offsets_heap);
2686
2687 switch (err) {
2688 case DB_SUCCESS:
2689 break;
2690 default:
2691 ut_ad(0);
2692 /* fall through */
2693 case DB_SUCCESS_LOCKED_REC:
2694 case DB_DUPLICATE_KEY:
2695 thr_get_trx(thr)->error_info = cursor->index;
2696 }
2697 } else {
2698 /* Note that the following may return also
2699 DB_LOCK_WAIT */
2700
2701 err = row_ins_duplicate_error_in_clust(
2702 flags, cursor, entry, thr);
2703 }
2704
2705 if (err != DB_SUCCESS) {
2706err_exit:
2707 mtr_commit(&mtr);
2708 goto func_exit;
2709 }
2710 }
2711
2712 if (dup_chk_only) {
2713 mtr_commit(&mtr);
2714 goto func_exit;
2715 }
2716
2717 /* Note: Allowing duplicates would qualify for modification of
2718 an existing record as the new entry is exactly same as old entry. */
2719 if (row_ins_must_modify_rec(cursor)) {
2720 /* There is already an index entry with a long enough common
2721 prefix, we must convert the insert into a modify of an
2722 existing record */
2723 mem_heap_t* entry_heap = mem_heap_create(1024);
2724
2725 err = row_ins_clust_index_entry_by_modify(
2726 &pcur, flags, mode, &offsets, &offsets_heap,
2727 entry_heap, entry, thr, &mtr);
2728
2729 if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2730 row_log_table_insert(btr_cur_get_rec(cursor),
2731 index, offsets);
2732 }
2733
2734 mtr_commit(&mtr);
2735 mem_heap_free(entry_heap);
2736 } else {
2737do_insert:
2738 rec_t* insert_rec;
2739
2740 if (mode != BTR_MODIFY_TREE) {
2741 ut_ad((mode & ulint(~BTR_ALREADY_S_LATCHED))
2742 == BTR_MODIFY_LEAF);
2743 err = btr_cur_optimistic_insert(
2744 flags, cursor, &offsets, &offsets_heap,
2745 entry, &insert_rec, &big_rec,
2746 n_ext, thr, &mtr);
2747 } else {
2748 if (buf_LRU_buf_pool_running_out()) {
2749
2750 err = DB_LOCK_TABLE_FULL;
2751 goto err_exit;
2752 }
2753
2754 DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2755
2756 err = btr_cur_optimistic_insert(
2757 flags, cursor,
2758 &offsets, &offsets_heap,
2759 entry, &insert_rec, &big_rec,
2760 n_ext, thr, &mtr);
2761
2762 if (err == DB_FAIL) {
2763 err = btr_cur_pessimistic_insert(
2764 flags, cursor,
2765 &offsets, &offsets_heap,
2766 entry, &insert_rec, &big_rec,
2767 n_ext, thr, &mtr);
2768 }
2769 }
2770
2771 if (big_rec != NULL) {
2772 mtr_commit(&mtr);
2773
2774 /* Online table rebuild could read (and
2775 ignore) the incomplete record at this point.
2776 If online rebuild is in progress, the
2777 row_ins_index_entry_big_rec() will write log. */
2778
2779 DBUG_EXECUTE_IF(
2780 "row_ins_extern_checkpoint",
2781 log_make_checkpoint_at(
2782 LSN_MAX, TRUE););
2783 err = row_ins_index_entry_big_rec(
2784 entry, big_rec, offsets, &offsets_heap, index,
2785 thr_get_trx(thr)->mysql_thd);
2786 dtuple_convert_back_big_rec(index, entry, big_rec);
2787 } else {
2788 if (err == DB_SUCCESS
2789 && dict_index_is_online_ddl(index)) {
2790 row_log_table_insert(
2791 insert_rec, index, offsets);
2792 }
2793
2794 mtr_commit(&mtr);
2795 }
2796 }
2797
2798func_exit:
2799 if (offsets_heap != NULL) {
2800 mem_heap_free(offsets_heap);
2801 }
2802
2803 btr_pcur_close(&pcur);
2804
2805 DBUG_RETURN(err);
2806}
2807
2808/** Start a mini-transaction and check if the index will be dropped.
2809@param[in,out] mtr mini-transaction
2810@param[in,out] index secondary index
2811@param[in] check whether to check
2812@param[in] search_mode flags
2813@return true if the index is to be dropped */
2814static MY_ATTRIBUTE((warn_unused_result))
2815bool
2816row_ins_sec_mtr_start_and_check_if_aborted(
2817 mtr_t* mtr,
2818 dict_index_t* index,
2819 bool check,
2820 ulint search_mode)
2821{
2822 ut_ad(!dict_index_is_clust(index));
2823 ut_ad(mtr->is_named_space(index->table->space));
2824
2825 const mtr_log_t log_mode = mtr->get_log_mode();
2826
2827 mtr->start();
2828 index->set_modified(*mtr);
2829 mtr->set_log_mode(log_mode);
2830
2831 if (!check) {
2832 return(false);
2833 }
2834
2835 if (search_mode & BTR_ALREADY_S_LATCHED) {
2836 mtr_s_lock(dict_index_get_lock(index), mtr);
2837 } else {
2838 mtr_sx_lock(dict_index_get_lock(index), mtr);
2839 }
2840
2841 switch (index->online_status) {
2842 case ONLINE_INDEX_ABORTED:
2843 case ONLINE_INDEX_ABORTED_DROPPED:
2844 ut_ad(!index->is_committed());
2845 return(true);
2846 case ONLINE_INDEX_COMPLETE:
2847 return(false);
2848 case ONLINE_INDEX_CREATION:
2849 break;
2850 }
2851
2852 ut_error;
2853 return(true);
2854}
2855
2856/***************************************************************//**
2857Tries to insert an entry into a secondary index. If a record with exactly the
2858same fields is found, the other record is necessarily marked deleted.
2859It is then unmarked. Otherwise, the entry is just inserted to the index.
2860@retval DB_SUCCESS on success
2861@retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2862@retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2863@return error code */
2864dberr_t
2865row_ins_sec_index_entry_low(
2866/*========================*/
2867 ulint flags, /*!< in: undo logging and locking flags */
2868 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2869 depending on whether we wish optimistic or
2870 pessimistic descent down the index tree */
2871 dict_index_t* index, /*!< in: secondary index */
2872 mem_heap_t* offsets_heap,
2873 /*!< in/out: memory heap that can be emptied */
2874 mem_heap_t* heap, /*!< in/out: memory heap */
2875 dtuple_t* entry, /*!< in/out: index entry to insert */
2876 trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during
2877 row_log_table_apply(), or 0 */
2878 que_thr_t* thr, /*!< in: query thread */
2879 bool dup_chk_only)
2880 /*!< in: if true, just do duplicate check
2881 and return. don't execute actual insert. */
2882{
2883 DBUG_ENTER("row_ins_sec_index_entry_low");
2884
2885 btr_cur_t cursor;
2886 ulint search_mode = mode;
2887 dberr_t err = DB_SUCCESS;
2888 ulint n_unique;
2889 mtr_t mtr;
2890 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2891 ulint* offsets = offsets_;
2892 rec_offs_init(offsets_);
2893 rtr_info_t rtr_info;
2894
2895 ut_ad(!dict_index_is_clust(index));
2896 ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2897
2898 cursor.thr = thr;
2899 cursor.rtr_info = NULL;
2900 ut_ad(thr_get_trx(thr)->id != 0);
2901
2902 mtr.start();
2903
2904 if (index->table->is_temporary()) {
2905 /* Disable locking, because temporary tables are never
2906 shared between transactions or connections. */
2907 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2908 mtr.set_log_mode(MTR_LOG_NO_REDO);
2909 } else {
2910 index->set_modified(mtr);
2911 if (!dict_index_is_spatial(index)) {
2912 search_mode |= BTR_INSERT;
2913 }
2914 }
2915
2916 /* Ensure that we acquire index->lock when inserting into an
2917 index with index->online_status == ONLINE_INDEX_COMPLETE, but
2918 could still be subject to rollback_inplace_alter_table().
2919 This prevents a concurrent change of index->online_status.
2920 The memory object cannot be freed as long as we have an open
2921 reference to the table, or index->table->n_ref_count > 0. */
2922 const bool check = !index->is_committed();
2923 if (check) {
2924 DEBUG_SYNC_C("row_ins_sec_index_enter");
2925 if (mode == BTR_MODIFY_LEAF) {
2926 search_mode |= BTR_ALREADY_S_LATCHED;
2927 mtr_s_lock(dict_index_get_lock(index), &mtr);
2928 } else {
2929 mtr_sx_lock(dict_index_get_lock(index), &mtr);
2930 }
2931
2932 if (row_log_online_op_try(
2933 index, entry, thr_get_trx(thr)->id)) {
2934 goto func_exit;
2935 }
2936 }
2937
2938 /* Note that we use PAGE_CUR_LE as the search mode, because then
2939 the function will return in both low_match and up_match of the
2940 cursor sensible values */
2941
2942 if (!thr_get_trx(thr)->check_unique_secondary) {
2943 search_mode |= BTR_IGNORE_SEC_UNIQUE;
2944 }
2945
2946 if (dict_index_is_spatial(index)) {
2947 cursor.index = index;
2948 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2949 rtr_info_update_btr(&cursor, &rtr_info);
2950
2951 err = btr_cur_search_to_nth_level(
2952 index, 0, entry, PAGE_CUR_RTREE_INSERT,
2953 search_mode,
2954 &cursor, 0, __FILE__, __LINE__, &mtr);
2955
2956 if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
2957 mtr_commit(&mtr);
2958 rtr_clean_rtr_info(&rtr_info, true);
2959 rtr_init_rtr_info(&rtr_info, false, &cursor,
2960 index, false);
2961 rtr_info_update_btr(&cursor, &rtr_info);
2962 mtr_start(&mtr);
2963 index->set_modified(mtr);
2964 search_mode &= ulint(~BTR_MODIFY_LEAF);
2965 search_mode |= BTR_MODIFY_TREE;
2966 err = btr_cur_search_to_nth_level(
2967 index, 0, entry, PAGE_CUR_RTREE_INSERT,
2968 search_mode,
2969 &cursor, 0, __FILE__, __LINE__, &mtr);
2970 mode = BTR_MODIFY_TREE;
2971 }
2972
2973 DBUG_EXECUTE_IF(
2974 "rtree_test_check_count", {
2975 goto func_exit;});
2976
2977 } else {
2978 err = btr_cur_search_to_nth_level(
2979 index, 0, entry, PAGE_CUR_LE,
2980 search_mode,
2981 &cursor, 0, __FILE__, __LINE__, &mtr);
2982 }
2983
2984 if (err != DB_SUCCESS) {
2985 if (err == DB_DECRYPTION_FAILED) {
2986 ib_push_warning(thr_get_trx(thr)->mysql_thd,
2987 DB_DECRYPTION_FAILED,
2988 "Table %s is encrypted but encryption service or"
2989 " used key_id is not available. "
2990 " Can't continue reading table.",
2991 index->table->name);
2992 index->table->file_unreadable = true;
2993 }
2994 goto func_exit;
2995 }
2996
2997 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2998 ut_ad(!dict_index_is_spatial(index));
2999 /* The insert was buffered during the search: we are done */
3000 goto func_exit;
3001 }
3002
3003#ifdef UNIV_DEBUG
3004 {
3005 page_t* page = btr_cur_get_page(&cursor);
3006 rec_t* first_rec = page_rec_get_next(
3007 page_get_infimum_rec(page));
3008
3009 ut_ad(page_rec_is_supremum(first_rec)
3010 || rec_n_fields_is_sane(index, first_rec, entry));
3011 }
3012#endif /* UNIV_DEBUG */
3013
3014 n_unique = dict_index_get_n_unique(index);
3015
3016 if (dict_index_is_unique(index)
3017 && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
3018 mtr_commit(&mtr);
3019
3020 DEBUG_SYNC_C("row_ins_sec_index_unique");
3021
3022 if (row_ins_sec_mtr_start_and_check_if_aborted(
3023 &mtr, index, check, search_mode)) {
3024 goto func_exit;
3025 }
3026
3027 err = row_ins_scan_sec_index_for_duplicate(
3028 flags, index, entry, thr, check, &mtr, offsets_heap);
3029
3030 mtr_commit(&mtr);
3031
3032 switch (err) {
3033 case DB_SUCCESS:
3034 break;
3035 case DB_DUPLICATE_KEY:
3036 if (!index->is_committed()) {
3037 ut_ad(!thr_get_trx(thr)
3038 ->dict_operation_lock_mode);
3039 mutex_enter(&dict_sys->mutex);
3040 dict_set_corrupted_index_cache_only(index);
3041 mutex_exit(&dict_sys->mutex);
3042 /* Do not return any error to the
3043 caller. The duplicate will be reported
3044 by ALTER TABLE or CREATE UNIQUE INDEX.
3045 Unfortunately we cannot report the
3046 duplicate key value to the DDL thread,
3047 because the altered_table object is
3048 private to its call stack. */
3049 err = DB_SUCCESS;
3050 }
3051 /* fall through */
3052 default:
3053 if (dict_index_is_spatial(index)) {
3054 rtr_clean_rtr_info(&rtr_info, true);
3055 }
3056 DBUG_RETURN(err);
3057 }
3058
3059 if (row_ins_sec_mtr_start_and_check_if_aborted(
3060 &mtr, index, check, search_mode)) {
3061 goto func_exit;
3062 }
3063
3064 DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
3065
3066 /* We did not find a duplicate and we have now
3067 locked with s-locks the necessary records to
3068 prevent any insertion of a duplicate by another
3069 transaction. Let us now reposition the cursor and
3070 continue the insertion. */
3071 btr_cur_search_to_nth_level(
3072 index, 0, entry, PAGE_CUR_LE,
3073 (search_mode
3074 & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)),
3075 &cursor, 0, __FILE__, __LINE__, &mtr);
3076 }
3077
3078 if (!(flags & BTR_NO_LOCKING_FLAG)
3079 && dict_index_is_unique(index)
3080 && thr_get_trx(thr)->duplicates
3081 && thr_get_trx(thr)->isolation_level >= TRX_ISO_REPEATABLE_READ) {
3082
3083 /* When using the REPLACE statement or ON DUPLICATE clause, a
3084 gap lock is taken on the position of the to-be-inserted record,
3085 to avoid other concurrent transactions from inserting the same
3086 record. */
3087
3088 dberr_t err;
3089 const rec_t* rec = page_rec_get_next_const(
3090 btr_cur_get_rec(&cursor));
3091
3092 ut_ad(!page_rec_is_infimum(rec));
3093
3094 offsets = rec_get_offsets(rec, index, offsets, true,
3095 ULINT_UNDEFINED, &offsets_heap);
3096
3097 err = row_ins_set_exclusive_rec_lock(
3098 LOCK_GAP, btr_cur_get_block(&cursor), rec,
3099 index, offsets, thr);
3100
3101 switch (err) {
3102 case DB_SUCCESS:
3103 case DB_SUCCESS_LOCKED_REC:
3104 if (thr_get_trx(thr)->error_state != DB_DUPLICATE_KEY) {
3105 break;
3106 }
3107 /* Fall through (skip actual insert) after we have
3108 successfully acquired the gap lock. */
3109 default:
3110 goto func_exit;
3111 }
3112 }
3113
3114 ut_ad(thr_get_trx(thr)->error_state == DB_SUCCESS);
3115
3116 if (dup_chk_only) {
3117 goto func_exit;
3118 }
3119
3120 if (row_ins_must_modify_rec(&cursor)) {
3121 /* There is already an index entry with a long enough common
3122 prefix, we must convert the insert into a modify of an
3123 existing record */
3124 offsets = rec_get_offsets(
3125 btr_cur_get_rec(&cursor), index, offsets, true,
3126 ULINT_UNDEFINED, &offsets_heap);
3127
3128 err = row_ins_sec_index_entry_by_modify(
3129 flags, mode, &cursor, &offsets,
3130 offsets_heap, heap, entry, thr, &mtr);
3131
3132 if (err == DB_SUCCESS && dict_index_is_spatial(index)
3133 && rtr_info.mbr_adj) {
3134 err = rtr_ins_enlarge_mbr(&cursor, &mtr);
3135 }
3136 } else {
3137 rec_t* insert_rec;
3138 big_rec_t* big_rec;
3139
3140 if (mode == BTR_MODIFY_LEAF) {
3141 err = btr_cur_optimistic_insert(
3142 flags, &cursor, &offsets, &offsets_heap,
3143 entry, &insert_rec,
3144 &big_rec, 0, thr, &mtr);
3145 if (err == DB_SUCCESS
3146 && dict_index_is_spatial(index)
3147 && rtr_info.mbr_adj) {
3148 err = rtr_ins_enlarge_mbr(&cursor, &mtr);
3149 }
3150 } else {
3151 ut_ad(mode == BTR_MODIFY_TREE);
3152 if (buf_LRU_buf_pool_running_out()) {
3153
3154 err = DB_LOCK_TABLE_FULL;
3155 goto func_exit;
3156 }
3157
3158 err = btr_cur_optimistic_insert(
3159 flags, &cursor,
3160 &offsets, &offsets_heap,
3161 entry, &insert_rec,
3162 &big_rec, 0, thr, &mtr);
3163 if (err == DB_FAIL) {
3164 err = btr_cur_pessimistic_insert(
3165 flags, &cursor,
3166 &offsets, &offsets_heap,
3167 entry, &insert_rec,
3168 &big_rec, 0, thr, &mtr);
3169 }
3170 if (err == DB_SUCCESS
3171 && dict_index_is_spatial(index)
3172 && rtr_info.mbr_adj) {
3173 err = rtr_ins_enlarge_mbr(&cursor, &mtr);
3174 }
3175 }
3176
3177 if (err == DB_SUCCESS && trx_id) {
3178 page_update_max_trx_id(
3179 btr_cur_get_block(&cursor),
3180 btr_cur_get_page_zip(&cursor),
3181 trx_id, &mtr);
3182 }
3183
3184 ut_ad(!big_rec);
3185 }
3186
3187func_exit:
3188 if (dict_index_is_spatial(index)) {
3189 rtr_clean_rtr_info(&rtr_info, true);
3190 }
3191
3192 mtr_commit(&mtr);
3193 DBUG_RETURN(err);
3194}
3195
3196/***************************************************************//**
3197Inserts an entry into a clustered index. Tries first optimistic,
3198then pessimistic descent down the tree. If the entry matches enough
3199to a delete marked record, performs the insert by updating or delete
3200unmarking the delete marked record.
3201@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3202dberr_t
3203row_ins_clust_index_entry(
3204/*======================*/
3205 dict_index_t* index, /*!< in: clustered index */
3206 dtuple_t* entry, /*!< in/out: index entry to insert */
3207 que_thr_t* thr, /*!< in: query thread */
3208 ulint n_ext, /*!< in: number of externally stored columns */
3209 bool dup_chk_only)
3210 /*!< in: if true, just do duplicate check
3211 and return. don't execute actual insert. */
3212{
3213 dberr_t err;
3214 ulint n_uniq;
3215
3216 DBUG_ENTER("row_ins_clust_index_entry");
3217
3218 if (!index->table->foreign_set.empty()) {
3219 err = row_ins_check_foreign_constraints(
3220 index->table, index, entry, thr);
3221 if (err != DB_SUCCESS) {
3222
3223 DBUG_RETURN(err);
3224 }
3225 }
3226
3227 n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3228
3229 ulint flags = index->table->no_rollback() ? BTR_NO_ROLLBACK
3230 : index->table->is_temporary()
3231 ? BTR_NO_LOCKING_FLAG : 0;
3232 const ulint orig_n_fields = entry->n_fields;
3233
3234 /* Try first optimistic descent to the B-tree */
3235 log_free_check();
3236
3237 /* For intermediate table during copy alter table,
3238 skip the undo log and record lock checking for
3239 insertion operation.
3240 */
3241 if (index->table->skip_alter_undo) {
3242 flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
3243 }
3244
3245 /* Try first optimistic descent to the B-tree */
3246 log_free_check();
3247
3248 err = row_ins_clust_index_entry_low(
3249 flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
3250 n_ext, thr, dup_chk_only);
3251
3252 entry->n_fields = orig_n_fields;
3253
3254 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3255 "after_row_ins_clust_index_entry_leaf");
3256
3257 if (err != DB_FAIL) {
3258 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3259 DBUG_RETURN(err);
3260 }
3261
3262 /* Try then pessimistic descent to the B-tree */
3263 log_free_check();
3264
3265 err = row_ins_clust_index_entry_low(
3266 flags, BTR_MODIFY_TREE, index, n_uniq, entry,
3267 n_ext, thr, dup_chk_only);
3268
3269 entry->n_fields = orig_n_fields;
3270
3271 DBUG_RETURN(err);
3272}
3273
3274/***************************************************************//**
3275Inserts an entry into a secondary index. Tries first optimistic,
3276then pessimistic descent down the tree. If the entry matches enough
3277to a delete marked record, performs the insert by updating or delete
3278unmarking the delete marked record.
3279@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3280dberr_t
3281row_ins_sec_index_entry(
3282/*====================*/
3283 dict_index_t* index, /*!< in: secondary index */
3284 dtuple_t* entry, /*!< in/out: index entry to insert */
3285 que_thr_t* thr, /*!< in: query thread */
3286 bool dup_chk_only)
3287 /*!< in: if true, just do duplicate check
3288 and return. don't execute actual insert. */
3289{
3290 dberr_t err;
3291 mem_heap_t* offsets_heap;
3292 mem_heap_t* heap;
3293 trx_id_t trx_id = 0;
3294
3295 DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3296 DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3297 return(DB_LOCK_WAIT);});
3298
3299 if (!index->table->foreign_set.empty()) {
3300 err = row_ins_check_foreign_constraints(index->table, index,
3301 entry, thr);
3302 if (err != DB_SUCCESS) {
3303
3304 return(err);
3305 }
3306 }
3307
3308 ut_ad(thr_get_trx(thr)->id != 0);
3309
3310 offsets_heap = mem_heap_create(1024);
3311 heap = mem_heap_create(1024);
3312
3313 /* Try first optimistic descent to the B-tree */
3314
3315 log_free_check();
3316 ulint flags = index->table->is_temporary()
3317 ? BTR_NO_LOCKING_FLAG
3318 : 0;
3319
3320 /* For intermediate table during copy alter table,
3321 skip the undo log and record lock checking for
3322 insertion operation.
3323 */
3324 if (index->table->skip_alter_undo) {
3325 trx_id = thr_get_trx(thr)->id;
3326 flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
3327 }
3328
3329 err = row_ins_sec_index_entry_low(
3330 flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
3331 trx_id, thr, dup_chk_only);
3332 if (err == DB_FAIL) {
3333 mem_heap_empty(heap);
3334
3335 if (index->table->space == fil_system.sys_space
3336 && !(index->type & (DICT_UNIQUE | DICT_SPATIAL))) {
3337 ibuf_free_excess_pages();
3338 }
3339
3340 /* Try then pessimistic descent to the B-tree */
3341 log_free_check();
3342
3343 err = row_ins_sec_index_entry_low(
3344 flags, BTR_MODIFY_TREE, index,
3345 offsets_heap, heap, entry, 0, thr,
3346 dup_chk_only);
3347 }
3348
3349 mem_heap_free(heap);
3350 mem_heap_free(offsets_heap);
3351 return(err);
3352}
3353
3354/***************************************************************//**
3355Inserts an index entry to index. Tries first optimistic, then pessimistic
3356descent down the tree. If the entry matches enough to a delete marked record,
3357performs the insert by updating or delete unmarking the delete marked
3358record.
3359@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3360static
3361dberr_t
3362row_ins_index_entry(
3363/*================*/
3364 dict_index_t* index, /*!< in: index */
3365 dtuple_t* entry, /*!< in/out: index entry to insert */
3366 que_thr_t* thr) /*!< in: query thread */
3367{
3368 ut_ad(thr_get_trx(thr)->id || index->table->no_rollback());
3369
3370 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3371 DBUG_SET("-d,row_ins_index_entry_timeout");
3372 return(DB_LOCK_WAIT);});
3373
3374 if (index->is_primary()) {
3375 return(row_ins_clust_index_entry(index, entry, thr, 0, false));
3376 } else {
3377 return(row_ins_sec_index_entry(index, entry, thr, false));
3378 }
3379}
3380
3381
3382/*****************************************************************//**
3383This function generate MBR (Minimum Bounding Box) for spatial objects
3384and set it to spatial index field. */
3385static
3386void
3387row_ins_spatial_index_entry_set_mbr_field(
3388/*======================================*/
3389 dfield_t* field, /*!< in/out: mbr field */
3390 const dfield_t* row_field) /*!< in: row field */
3391{
3392 uchar* dptr = NULL;
3393 ulint dlen = 0;
3394 double mbr[SPDIMS * 2];
3395
3396 /* This must be a GEOMETRY datatype */
3397 ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3398
3399 dptr = static_cast<uchar*>(dfield_get_data(row_field));
3400 dlen = dfield_get_len(row_field);
3401
3402 /* obtain the MBR */
3403 rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
3404 static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
3405 SPDIMS, mbr);
3406
3407 /* Set mbr as index entry data */
3408 dfield_write_mbr(field, mbr);
3409}
3410
3411/** Sets the values of the dtuple fields in entry from the values of appropriate
3412columns in row.
3413@param[in] index index handler
3414@param[out] entry index entry to make
3415@param[in] row row
3416
3417@return DB_SUCCESS if the set is successful */
3418dberr_t
3419row_ins_index_entry_set_vals(
3420 const dict_index_t* index,
3421 dtuple_t* entry,
3422 const dtuple_t* row)
3423{
3424 ulint n_fields;
3425 ulint i;
3426 ulint num_v = dtuple_get_n_v_fields(entry);
3427
3428 n_fields = dtuple_get_n_fields(entry);
3429
3430 for (i = 0; i < n_fields + num_v; i++) {
3431 dict_field_t* ind_field = NULL;
3432 dfield_t* field;
3433 const dfield_t* row_field;
3434 ulint len;
3435 dict_col_t* col;
3436
3437 if (i >= n_fields) {
3438 /* This is virtual field */
3439 field = dtuple_get_nth_v_field(entry, i - n_fields);
3440 col = &dict_table_get_nth_v_col(
3441 index->table, i - n_fields)->m_col;
3442 } else {
3443 field = dtuple_get_nth_field(entry, i);
3444 ind_field = dict_index_get_nth_field(index, i);
3445 col = ind_field->col;
3446 }
3447
3448 if (col->is_virtual()) {
3449 const dict_v_col_t* v_col
3450 = reinterpret_cast<const dict_v_col_t*>(col);
3451 ut_ad(dtuple_get_n_fields(row)
3452 == dict_table_get_n_cols(index->table));
3453 row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3454 } else {
3455 row_field = dtuple_get_nth_field(
3456 row, ind_field->col->ind);
3457 }
3458
3459 len = dfield_get_len(row_field);
3460
3461 /* Check column prefix indexes */
3462 if (ind_field != NULL && ind_field->prefix_len > 0
3463 && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3464
3465 const dict_col_t* col
3466 = dict_field_get_col(ind_field);
3467
3468 len = dtype_get_at_most_n_mbchars(
3469 col->prtype, col->mbminlen, col->mbmaxlen,
3470 ind_field->prefix_len,
3471 len,
3472 static_cast<const char*>(
3473 dfield_get_data(row_field)));
3474
3475 ut_ad(!dfield_is_ext(row_field));
3476 }
3477
3478 /* Handle spatial index. For the first field, replace
3479 the data with its MBR (Minimum Bounding Box). */
3480 if ((i == 0) && dict_index_is_spatial(index)) {
3481 if (!row_field->data
3482 || row_field->len < GEO_DATA_HEADER_SIZE) {
3483 return(DB_CANT_CREATE_GEOMETRY_OBJECT);
3484 }
3485 row_ins_spatial_index_entry_set_mbr_field(
3486 field, row_field);
3487 continue;
3488 }
3489
3490 dfield_set_data(field, dfield_get_data(row_field), len);
3491 if (dfield_is_ext(row_field)) {
3492 ut_ad(dict_index_is_clust(index));
3493 dfield_set_ext(field);
3494 }
3495 }
3496
3497 return(DB_SUCCESS);
3498}
3499
3500/***********************************************************//**
3501Inserts a single index entry to the table.
3502@return DB_SUCCESS if operation successfully completed, else error
3503code or DB_LOCK_WAIT */
3504static MY_ATTRIBUTE((nonnull, warn_unused_result))
3505dberr_t
3506row_ins_index_entry_step(
3507/*=====================*/
3508 ins_node_t* node, /*!< in: row insert node */
3509 que_thr_t* thr) /*!< in: query thread */
3510{
3511 dberr_t err;
3512
3513 DBUG_ENTER("row_ins_index_entry_step");
3514
3515 ut_ad(dtuple_check_typed(node->row));
3516
3517 err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3518
3519 if (err != DB_SUCCESS) {
3520 DBUG_RETURN(err);
3521 }
3522
3523 ut_ad(dtuple_check_typed(node->entry));
3524
3525 err = row_ins_index_entry(node->index, node->entry, thr);
3526
3527 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3528 "after_row_ins_index_entry_step");
3529
3530 DBUG_RETURN(err);
3531}
3532
3533/***********************************************************//**
3534Allocates a row id for row and inits the node->index field. */
3535UNIV_INLINE
3536void
3537row_ins_alloc_row_id_step(
3538/*======================*/
3539 ins_node_t* node) /*!< in: row insert node */
3540{
3541 row_id_t row_id;
3542
3543 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3544
3545 if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3546
3547 /* No row id is stored if the clustered index is unique */
3548
3549 return;
3550 }
3551
3552 /* Fill in row id value to row */
3553
3554 row_id = dict_sys_get_new_row_id();
3555
3556 dict_sys_write_row_id(node->sys_buf, row_id);
3557}
3558
3559/***********************************************************//**
3560Gets a row to insert from the values list. */
3561UNIV_INLINE
3562void
3563row_ins_get_row_from_values(
3564/*========================*/
3565 ins_node_t* node) /*!< in: row insert node */
3566{
3567 que_node_t* list_node;
3568 dfield_t* dfield;
3569 dtuple_t* row;
3570 ulint i;
3571
3572 /* The field values are copied in the buffers of the select node and
3573 it is safe to use them until we fetch from select again: therefore
3574 we can just copy the pointers */
3575
3576 row = node->row;
3577
3578 i = 0;
3579 list_node = node->values_list;
3580
3581 while (list_node) {
3582 eval_exp(list_node);
3583
3584 dfield = dtuple_get_nth_field(row, i);
3585 dfield_copy_data(dfield, que_node_get_val(list_node));
3586
3587 i++;
3588 list_node = que_node_get_next(list_node);
3589 }
3590}
3591
3592/***********************************************************//**
3593Gets a row to insert from the select list. */
3594UNIV_INLINE
3595void
3596row_ins_get_row_from_select(
3597/*========================*/
3598 ins_node_t* node) /*!< in: row insert node */
3599{
3600 que_node_t* list_node;
3601 dfield_t* dfield;
3602 dtuple_t* row;
3603 ulint i;
3604
3605 /* The field values are copied in the buffers of the select node and
3606 it is safe to use them until we fetch from select again: therefore
3607 we can just copy the pointers */
3608
3609 row = node->row;
3610
3611 i = 0;
3612 list_node = node->select->select_list;
3613
3614 while (list_node) {
3615 dfield = dtuple_get_nth_field(row, i);
3616 dfield_copy_data(dfield, que_node_get_val(list_node));
3617
3618 i++;
3619 list_node = que_node_get_next(list_node);
3620 }
3621}
3622
3623/***********************************************************//**
3624Inserts a row to a table.
3625@return DB_SUCCESS if operation successfully completed, else error
3626code or DB_LOCK_WAIT */
3627static MY_ATTRIBUTE((nonnull, warn_unused_result))
3628dberr_t
3629row_ins(
3630/*====*/
3631 ins_node_t* node, /*!< in: row insert node */
3632 que_thr_t* thr) /*!< in: query thread */
3633{
3634 dberr_t err;
3635
3636 DBUG_ENTER("row_ins");
3637
3638 DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3639
3640 if (node->duplicate) {
3641 thr_get_trx(thr)->error_state = DB_DUPLICATE_KEY;
3642 }
3643
3644 if (node->state == INS_NODE_ALLOC_ROW_ID) {
3645
3646 row_ins_alloc_row_id_step(node);
3647
3648 node->index = dict_table_get_first_index(node->table);
3649 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3650
3651 if (node->ins_type == INS_SEARCHED) {
3652
3653 row_ins_get_row_from_select(node);
3654
3655 } else if (node->ins_type == INS_VALUES) {
3656
3657 row_ins_get_row_from_values(node);
3658 }
3659
3660 node->state = INS_NODE_INSERT_ENTRIES;
3661 }
3662
3663 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3664
3665 while (node->index != NULL) {
3666 if (node->index->type != DICT_FTS) {
3667 err = row_ins_index_entry_step(node, thr);
3668
3669 switch (err) {
3670 case DB_SUCCESS:
3671 break;
3672 case DB_NO_REFERENCED_ROW:
3673 if (!dict_index_is_unique(node->index)) {
3674 DBUG_RETURN(err);
3675 }
3676 /* fall through */
3677 case DB_DUPLICATE_KEY:
3678 ut_ad(dict_index_is_unique(node->index));
3679
3680 if (thr_get_trx(thr)->isolation_level
3681 >= TRX_ISO_REPEATABLE_READ
3682 && thr_get_trx(thr)->duplicates) {
3683
3684 /* When we are in REPLACE statement or
3685 INSERT .. ON DUPLICATE UPDATE
3686 statement, we process all the
3687 unique secondary indexes, even after we
3688 encounter a duplicate error. This is
3689 done to take necessary gap locks in
3690 secondary indexes to block concurrent
3691 transactions from inserting the
3692 searched records. */
3693 if (err == DB_NO_REFERENCED_ROW
3694 && node->duplicate) {
3695 /* A foreign key check on a
3696 unique index may fail to
3697 find the record.
3698
3699 Consider as a example
3700 following:
3701 create table child(a int not null
3702 primary key, b int not null,
3703 c int,
3704 unique key (b),
3705 foreign key (b) references
3706 parent (id)) engine=innodb;
3707
3708 insert into child values
3709 (1,1,2);
3710
3711 insert into child(a) values
3712 (1) on duplicate key update
3713 c = 3;
3714
3715 Now primary key value 1
3716 naturally causes duplicate
3717 key error that will be
3718 stored on node->duplicate.
3719 If there was no duplicate
3720 key error, we should return
3721 the actual no referenced
3722 row error.
3723
3724 As value for
3725 column b used in both unique
3726 key and foreign key is not
3727 provided, server uses 0 as a
3728 search value. This is
3729 naturally, not found leading
3730 to DB_NO_REFERENCED_ROW.
3731 But, we should update the
3732 row with primay key value 1
3733 anyway.
3734
3735 Return the
3736 original DB_DUPLICATE_KEY
3737 error after
3738 placing all gaplocks. */
3739 err = DB_DUPLICATE_KEY;
3740 break;
3741 } else if (!node->duplicate) {
3742 /* Save 1st dup error. Ignore
3743 subsequent dup errors. */
3744 node->duplicate = node->index;
3745 thr_get_trx(thr)->error_state
3746 = DB_DUPLICATE_KEY;
3747 }
3748 break;
3749 }
3750 // fall through
3751 default:
3752 DBUG_RETURN(err);
3753 }
3754 }
3755
3756 if (node->duplicate && node->table->is_temporary()) {
3757 ut_ad(thr_get_trx(thr)->error_state
3758 == DB_DUPLICATE_KEY);
3759 /* For TEMPORARY TABLE, we won't lock anything,
3760 so we can simply break here instead of requiring
3761 GAP locks for other unique secondary indexes,
3762 pretending we have consumed all indexes. */
3763 node->index = NULL;
3764 node->entry = NULL;
3765 break;
3766 }
3767
3768 node->index = dict_table_get_next_index(node->index);
3769 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3770
3771 DBUG_EXECUTE_IF(
3772 "row_ins_skip_sec",
3773 node->index = NULL; node->entry = NULL; break;);
3774
3775 /* Skip corrupted secondary index and its entry */
3776 while (node->index && node->index->is_corrupted()) {
3777 node->index = dict_table_get_next_index(node->index);
3778 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3779 }
3780
3781 /* After encountering a duplicate key error, we process
3782 remaining indexes just to place gap locks and no actual
3783 insertion will take place. These gap locks are needed
3784 only for unique indexes. So skipping non-unique indexes. */
3785 if (node->duplicate) {
3786 while (node->index
3787 && !dict_index_is_unique(node->index)) {
3788
3789 node->index = dict_table_get_next_index(
3790 node->index);
3791 node->entry = UT_LIST_GET_NEXT(tuple_list,
3792 node->entry);
3793 }
3794 thr_get_trx(thr)->error_state = DB_DUPLICATE_KEY;
3795 }
3796 }
3797
3798 ut_ad(node->entry == NULL);
3799
3800 thr_get_trx(thr)->error_info = node->duplicate;
3801 node->state = INS_NODE_ALLOC_ROW_ID;
3802
3803 DBUG_RETURN(node->duplicate ? DB_DUPLICATE_KEY : DB_SUCCESS);
3804}
3805
3806/***********************************************************//**
3807Inserts a row to a table. This is a high-level function used in SQL execution
3808graphs.
3809@return query thread to run next or NULL */
3810que_thr_t*
3811row_ins_step(
3812/*=========*/
3813 que_thr_t* thr) /*!< in: query thread */
3814{
3815 ins_node_t* node;
3816 que_node_t* parent;
3817 sel_node_t* sel_node;
3818 trx_t* trx;
3819 dberr_t err;
3820
3821 ut_ad(thr);
3822
3823 DEBUG_SYNC_C("innodb_row_ins_step_enter");
3824
3825 trx = thr_get_trx(thr);
3826
3827 node = static_cast<ins_node_t*>(thr->run_node);
3828
3829 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3830
3831 parent = que_node_get_parent(node);
3832 sel_node = node->select;
3833
3834 if (thr->prev_node == parent) {
3835 node->state = INS_NODE_SET_IX_LOCK;
3836 }
3837
3838 /* If this is the first time this node is executed (or when
3839 execution resumes after wait for the table IX lock), set an
3840 IX lock on the table and reset the possible select node. MySQL's
3841 partitioned table code may also call an insert within the same
3842 SQL statement AFTER it has used this table handle to do a search.
3843 This happens, for example, when a row update moves it to another
3844 partition. In that case, we have already set the IX lock on the
3845 table during the search operation, and there is no need to set
3846 it again here. But we must write trx->id to node->sys_buf. */
3847
3848 if (node->table->no_rollback()) {
3849 /* No-rollback tables should only be written to by a
3850 single thread at a time, but there can be multiple
3851 concurrent readers. We must hold an open table handle. */
3852 DBUG_ASSERT(node->table->n_ref_count > 0);
3853 DBUG_ASSERT(node->ins_type == INS_DIRECT);
3854 /* No-rollback tables can consist only of a single index. */
3855 DBUG_ASSERT(UT_LIST_GET_LEN(node->entry_list) == 1);
3856 DBUG_ASSERT(UT_LIST_GET_LEN(node->table->indexes) == 1);
3857 /* There should be no possibility for interruption and
3858 restarting here. In theory, we could allow resumption
3859 from the INS_NODE_INSERT_ENTRIES state here. */
3860 DBUG_ASSERT(node->state == INS_NODE_SET_IX_LOCK);
3861 node->index = dict_table_get_first_index(node->table);
3862 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3863 node->state = INS_NODE_INSERT_ENTRIES;
3864 goto do_insert;
3865 }
3866
3867 if (UNIV_LIKELY(!node->table->skip_alter_undo)) {
3868 trx_write_trx_id(&node->sys_buf[DATA_ROW_ID_LEN], trx->id);
3869 }
3870
3871 if (node->state == INS_NODE_SET_IX_LOCK) {
3872
3873 node->state = INS_NODE_ALLOC_ROW_ID;
3874
3875 /* It may be that the current session has not yet started
3876 its transaction, or it has been committed: */
3877
3878 if (trx->id == node->trx_id) {
3879 /* No need to do IX-locking */
3880
3881 goto same_trx;
3882 }
3883
3884 err = lock_table(0, node->table, LOCK_IX, thr);
3885
3886 DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3887 err = DB_LOCK_WAIT;);
3888
3889 if (err != DB_SUCCESS) {
3890
3891 goto error_handling;
3892 }
3893
3894 node->trx_id = trx->id;
3895same_trx:
3896 if (node->ins_type == INS_SEARCHED) {
3897 /* Reset the cursor */
3898 sel_node->state = SEL_NODE_OPEN;
3899
3900 /* Fetch a row to insert */
3901
3902 thr->run_node = sel_node;
3903
3904 return(thr);
3905 }
3906 }
3907
3908 if ((node->ins_type == INS_SEARCHED)
3909 && (sel_node->state != SEL_NODE_FETCH)) {
3910
3911 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3912
3913 /* No more rows to insert */
3914 thr->run_node = parent;
3915
3916 return(thr);
3917 }
3918do_insert:
3919 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3920
3921 err = row_ins(node, thr);
3922
3923error_handling:
3924 trx->error_state = err;
3925
3926 if (err != DB_SUCCESS) {
3927 /* err == DB_LOCK_WAIT or SQL error detected */
3928 return(NULL);
3929 }
3930
3931 /* DO THE TRIGGER ACTIONS HERE */
3932
3933 if (node->ins_type == INS_SEARCHED) {
3934 /* Fetch a row to insert */
3935
3936 thr->run_node = sel_node;
3937 } else {
3938 thr->run_node = que_node_get_parent(node);
3939 }
3940
3941 return(thr);
3942}
3943