1/*****************************************************************************
2
3Copyright (c) 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file gis/gis0rtree.cc
22InnoDB R-tree interfaces
23
24Created 2013/03/27 Allen Lai and Jimmy Yang
25***********************************************************************/
26
27#include "fsp0fsp.h"
28#include "page0page.h"
29#include "page0cur.h"
30#include "page0zip.h"
31#include "gis0rtree.h"
32#include "btr0cur.h"
33#include "btr0sea.h"
34#include "btr0pcur.h"
35#include "rem0cmp.h"
36#include "lock0lock.h"
37#include "ibuf0ibuf.h"
38#include "trx0undo.h"
39#include "srv0mon.h"
40#include "gis0geo.h"
41
42/*************************************************************//**
43Initial split nodes info for R-tree split.
44@return initialized split nodes array */
45static
46rtr_split_node_t*
47rtr_page_split_initialize_nodes(
48/*============================*/
49 mem_heap_t* heap, /*!< in: pointer to memory heap, or NULL */
50 btr_cur_t* cursor, /*!< in: cursor at which to insert; when the
51 function returns, the cursor is positioned
52 on the predecessor of the inserted record */
53 ulint** offsets,/*!< in: offsets on inserted record */
54 const dtuple_t* tuple, /*!< in: tuple to insert */
55 double** buf_pos)/*!< in/out: current buffer position */
56{
57 rtr_split_node_t* split_node_array;
58 double* buf;
59 ulint n_recs;
60 rtr_split_node_t* task;
61 rtr_split_node_t* stop;
62 rtr_split_node_t* cur;
63 rec_t* rec;
64 buf_block_t* block;
65 page_t* page;
66 ulint n_uniq;
67 ulint len;
68 byte* source_cur;
69
70 block = btr_cur_get_block(cursor);
71 page = buf_block_get_frame(block);
72 n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
73
74 n_recs = ulint(page_get_n_recs(page)) + 1;
75
76 /*We reserve 2 MBRs memory space for temp result of split
77 algrithm. And plus the new mbr that need to insert, we
78 need (n_recs + 3)*MBR size for storing all MBRs.*/
79 buf = static_cast<double*>(mem_heap_alloc(
80 heap, DATA_MBR_LEN * (n_recs + 3)
81 + sizeof(rtr_split_node_t) * (n_recs + 1)));
82
83 split_node_array = (rtr_split_node_t*)(buf + SPDIMS * 2 * (n_recs + 3));
84 task = split_node_array;
85 *buf_pos = buf;
86 stop = task + n_recs;
87
88 rec = page_rec_get_next(page_get_infimum_rec(page));
89 const bool is_leaf = page_is_leaf(page);
90 *offsets = rec_get_offsets(rec, cursor->index, *offsets, is_leaf,
91 n_uniq, &heap);
92
93 source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
94
95 for (cur = task; cur < stop - 1; ++cur) {
96 cur->coords = reserve_coords(buf_pos, SPDIMS);
97 cur->key = rec;
98
99 memcpy(cur->coords, source_cur, DATA_MBR_LEN);
100
101 rec = page_rec_get_next(rec);
102 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
103 is_leaf, n_uniq, &heap);
104 source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
105 }
106
107 /* Put the insert key to node list */
108 source_cur = static_cast<byte*>(dfield_get_data(
109 dtuple_get_nth_field(tuple, 0)));
110 cur->coords = reserve_coords(buf_pos, SPDIMS);
111 rec = (byte*) mem_heap_alloc(
112 heap, rec_get_converted_size(cursor->index, tuple, 0));
113
114 rec = rec_convert_dtuple_to_rec(rec, cursor->index, tuple, 0);
115 cur->key = rec;
116
117 memcpy(cur->coords, source_cur, DATA_MBR_LEN);
118
119 return split_node_array;
120}
121
122/**********************************************************************//**
123Builds a Rtree node pointer out of a physical record and a page number.
124Note: For Rtree, we just keep the mbr and page no field in non-leaf level
125page. It's different with Btree, Btree still keeps PK fields so far.
126@return own: node pointer */
127dtuple_t*
128rtr_index_build_node_ptr(
129/*=====================*/
130 const dict_index_t* index, /*!< in: index */
131 const rtr_mbr_t* mbr, /*!< in: mbr of lower page */
132 const rec_t* rec, /*!< in: record for which to build node
133 pointer */
134 ulint page_no,/*!< in: page number to put in node
135 pointer */
136 mem_heap_t* heap) /*!< in: memory heap where pointer
137 created */
138{
139 dtuple_t* tuple;
140 dfield_t* field;
141 byte* buf;
142 ulint n_unique;
143 ulint info_bits;
144
145 ut_ad(dict_index_is_spatial(index));
146
147 n_unique = DICT_INDEX_SPATIAL_NODEPTR_SIZE;
148
149 tuple = dtuple_create(heap, n_unique + 1);
150
151 /* For rtree internal node, we need to compare page number
152 fields. */
153 dtuple_set_n_fields_cmp(tuple, n_unique + 1);
154
155 dict_index_copy_types(tuple, index, n_unique);
156
157 /* Write page no field */
158 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
159
160 mach_write_to_4(buf, page_no);
161
162 field = dtuple_get_nth_field(tuple, n_unique);
163 dfield_set_data(field, buf, 4);
164
165 dtype_set(dfield_get_type(field), DATA_SYS_CHILD, DATA_NOT_NULL, 4);
166
167 /* Set info bits. */
168 info_bits = rec_get_info_bits(rec, dict_table_is_comp(index->table));
169 dtuple_set_info_bits(tuple, info_bits | REC_STATUS_NODE_PTR);
170
171 /* Set mbr as index entry data */
172 field = dtuple_get_nth_field(tuple, 0);
173
174 buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_MBR_LEN));
175
176 rtr_write_mbr(buf, mbr);
177
178 dfield_set_data(field, buf, DATA_MBR_LEN);
179
180 ut_ad(dtuple_check_typed(tuple));
181
182 return(tuple);
183}
184
185/**************************************************************//**
186In-place update the mbr field of a spatial index row.
187@return true if update is successful */
188static
189bool
190rtr_update_mbr_field_in_place(
191/*==========================*/
192 dict_index_t* index, /*!< in: spatial index. */
193 rec_t* rec, /*!< in/out: rec to be modified.*/
194 ulint* offsets, /*!< in/out: offsets on rec. */
195 rtr_mbr_t* mbr, /*!< in: the new mbr. */
196 mtr_t* mtr) /*!< in: mtr */
197{
198 void* new_mbr_ptr;
199 double new_mbr[SPDIMS * 2];
200 byte* log_ptr;
201 page_t* page = page_align(rec);
202 ulint len = DATA_MBR_LEN;
203 ulint flags = BTR_NO_UNDO_LOG_FLAG
204 | BTR_NO_LOCKING_FLAG
205 | BTR_KEEP_SYS_FLAG;
206 ulint rec_info;
207
208 rtr_write_mbr(reinterpret_cast<byte*>(&new_mbr), mbr);
209 new_mbr_ptr = static_cast<void*>(new_mbr);
210 /* Otherwise, set the mbr to the new_mbr. */
211 rec_set_nth_field(rec, offsets, 0, new_mbr_ptr, len);
212
213 rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
214
215 /* Write redo log. */
216 /* For now, we use LOG_REC_UPDATE_IN_PLACE to log this enlarge.
217 In the future, we may need to add a new log type for this. */
218 log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page)
219 ? MLOG_COMP_REC_UPDATE_IN_PLACE
220 : MLOG_REC_UPDATE_IN_PLACE,
221 1 + DATA_ROLL_PTR_LEN + 14 + 2
222 + MLOG_BUF_MARGIN);
223
224 if (!log_ptr) {
225 /* Logging in mtr is switched off during
226 crash recovery */
227 return(false);
228 }
229
230 /* Flags */
231 mach_write_to_1(log_ptr, flags);
232 log_ptr++;
233 /* TRX_ID Position */
234 log_ptr += mach_write_compressed(log_ptr, 0);
235 /* ROLL_PTR */
236 trx_write_roll_ptr(log_ptr, 0);
237 log_ptr += DATA_ROLL_PTR_LEN;
238 /* TRX_ID */
239 log_ptr += mach_u64_write_compressed(log_ptr, 0);
240
241 /* Offset */
242 mach_write_to_2(log_ptr, page_offset(rec));
243 log_ptr += 2;
244 /* Info bits */
245 mach_write_to_1(log_ptr, rec_info);
246 log_ptr++;
247 /* N fields */
248 log_ptr += mach_write_compressed(log_ptr, 1);
249 /* Field no, len */
250 log_ptr += mach_write_compressed(log_ptr, 0);
251 log_ptr += mach_write_compressed(log_ptr, len);
252 /* Data */
253 memcpy(log_ptr, new_mbr_ptr, len);
254 log_ptr += len;
255
256 mlog_close(mtr, log_ptr);
257
258 return(true);
259}
260
261/**************************************************************//**
262Update the mbr field of a spatial index row.
263@return true if update is successful */
264bool
265rtr_update_mbr_field(
266/*=================*/
267 btr_cur_t* cursor, /*!< in/out: cursor pointed to rec.*/
268 ulint* offsets, /*!< in/out: offsets on rec. */
269 btr_cur_t* cursor2, /*!< in/out: cursor pointed to rec
270 that should be deleted.
271 this cursor is for btr_compress to
272 delete the merged page's father rec.*/
273 page_t* child_page, /*!< in: child page. */
274 rtr_mbr_t* mbr, /*!< in: the new mbr. */
275 rec_t* new_rec, /*!< in: rec to use */
276 mtr_t* mtr) /*!< in: mtr */
277{
278 dict_index_t* index = cursor->index;
279 mem_heap_t* heap;
280 page_t* page;
281 rec_t* rec;
282 ulint flags = BTR_NO_UNDO_LOG_FLAG
283 | BTR_NO_LOCKING_FLAG
284 | BTR_KEEP_SYS_FLAG;
285 dberr_t err;
286 big_rec_t* dummy_big_rec;
287 buf_block_t* block;
288 rec_t* child_rec;
289 ulint up_match = 0;
290 ulint low_match = 0;
291 ulint child;
292 ulint rec_info;
293 page_zip_des_t* page_zip;
294 bool ins_suc = true;
295 ulint cur2_pos = 0;
296 ulint del_page_no = 0;
297 ulint* offsets2;
298
299 rec = btr_cur_get_rec(cursor);
300 page = page_align(rec);
301
302 rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
303
304 heap = mem_heap_create(100);
305 block = btr_cur_get_block(cursor);
306 ut_ad(page == buf_block_get_frame(block));
307 page_zip = buf_block_get_page_zip(block);
308
309 child = btr_node_ptr_get_child_page_no(rec, offsets);
310 const bool is_leaf = page_is_leaf(block->frame);
311
312 if (new_rec) {
313 child_rec = new_rec;
314 } else {
315 child_rec = page_rec_get_next(page_get_infimum_rec(child_page));
316 }
317
318 dtuple_t* node_ptr = rtr_index_build_node_ptr(
319 index, mbr, child_rec, child, heap);
320
321 /* We need to remember the child page no of cursor2, since page could be
322 reorganized or insert a new rec before it. */
323 if (cursor2) {
324 rec_t* del_rec = btr_cur_get_rec(cursor2);
325 offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
326 index, NULL, false,
327 ULINT_UNDEFINED, &heap);
328 del_page_no = btr_node_ptr_get_child_page_no(del_rec, offsets2);
329 cur2_pos = page_rec_get_n_recs_before(btr_cur_get_rec(cursor2));
330 }
331
332 if (rec_info & REC_INFO_MIN_REC_FLAG) {
333 /* When the rec is minimal rec in this level, we do
334 in-place update for avoiding it move to other place. */
335
336 if (page_zip) {
337 /* Check if there's enough space for in-place
338 update the zip page. */
339 if (!btr_cur_update_alloc_zip(
340 page_zip,
341 btr_cur_get_page_cur(cursor),
342 index, offsets,
343 rec_offs_size(offsets),
344 false, mtr)) {
345
346 /* If there's not enought space for
347 inplace update zip page, we do delete
348 insert. */
349 ins_suc = false;
350
351 /* Since btr_cur_update_alloc_zip could
352 reorganize the page, we need to repositon
353 cursor2. */
354 if (cursor2) {
355 cursor2->page_cur.rec =
356 page_rec_get_nth(page,
357 cur2_pos);
358 }
359
360 goto update_mbr;
361 }
362
363 /* Record could be repositioned */
364 rec = btr_cur_get_rec(cursor);
365
366#ifdef UNIV_DEBUG
367 /* Make sure it is still the first record */
368 rec_info = rec_get_info_bits(
369 rec, rec_offs_comp(offsets));
370 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
371#endif /* UNIV_DEBUG */
372 }
373
374 if (!rtr_update_mbr_field_in_place(index, rec,
375 offsets, mbr, mtr)) {
376 return(false);
377 }
378
379 if (page_zip) {
380 page_zip_write_rec(page_zip, rec, index, offsets, 0);
381 }
382
383 if (cursor2) {
384 ulint* offsets2;
385
386 if (page_zip) {
387 cursor2->page_cur.rec
388 = page_rec_get_nth(page, cur2_pos);
389 }
390 offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
391 index, NULL, false,
392 ULINT_UNDEFINED, &heap);
393 ut_ad(del_page_no == btr_node_ptr_get_child_page_no(
394 cursor2->page_cur.rec,
395 offsets2));
396
397 page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
398 index, offsets2, mtr);
399 }
400 } else if (page_get_n_recs(page) == 1) {
401 /* When there's only one rec in the page, we do insert/delete to
402 avoid page merge. */
403
404 page_cur_t page_cur;
405 rec_t* insert_rec;
406 ulint* insert_offsets = NULL;
407 ulint old_pos;
408 rec_t* old_rec;
409
410 ut_ad(cursor2 == NULL);
411
412 /* Insert the new mbr rec. */
413 old_pos = page_rec_get_n_recs_before(rec);
414
415 err = btr_cur_optimistic_insert(
416 flags,
417 cursor, &insert_offsets, &heap,
418 node_ptr, &insert_rec, &dummy_big_rec, 0, NULL, mtr);
419
420 ut_ad(err == DB_SUCCESS);
421
422 btr_cur_position(index, insert_rec, block, cursor);
423
424 /* Delete the old mbr rec. */
425 old_rec = page_rec_get_nth(page, old_pos);
426 ut_ad(old_rec != insert_rec);
427
428 page_cur_position(old_rec, block, &page_cur);
429 offsets2 = rec_get_offsets(old_rec, index, NULL, is_leaf,
430 ULINT_UNDEFINED, &heap);
431 page_cur_delete_rec(&page_cur, index, offsets2, mtr);
432
433 } else {
434update_mbr:
435 /* When there're not only 1 rec in the page, we do delete/insert
436 to avoid page split. */
437 rec_t* insert_rec;
438 ulint* insert_offsets = NULL;
439 rec_t* next_rec;
440
441 /* Delete the rec which cursor point to. */
442 next_rec = page_rec_get_next(rec);
443 page_cur_delete_rec(btr_cur_get_page_cur(cursor),
444 index, offsets, mtr);
445 if (!ins_suc) {
446 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
447
448 btr_set_min_rec_mark(next_rec, mtr);
449 }
450
451 /* If there's more than 1 rec left in the page, delete
452 the rec which cursor2 point to. Otherwise, delete it later.*/
453 if (cursor2 && page_get_n_recs(page) > 1) {
454 ulint cur2_rec_info;
455 rec_t* cur2_rec;
456
457 cur2_rec = cursor2->page_cur.rec;
458 offsets2 = rec_get_offsets(cur2_rec, index, NULL,
459 is_leaf,
460 ULINT_UNDEFINED, &heap);
461
462 cur2_rec_info = rec_get_info_bits(cur2_rec,
463 rec_offs_comp(offsets2));
464 if (cur2_rec_info & REC_INFO_MIN_REC_FLAG) {
465 /* If we delete the leftmost node
466 pointer on a non-leaf level, we must
467 mark the new leftmost node pointer as
468 the predefined minimum record */
469 rec_t* next_rec = page_rec_get_next(cur2_rec);
470 btr_set_min_rec_mark(next_rec, mtr);
471 }
472
473 ut_ad(del_page_no
474 == btr_node_ptr_get_child_page_no(cur2_rec,
475 offsets2));
476 page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
477 index, offsets2, mtr);
478 cursor2 = NULL;
479 }
480
481 /* Insert the new rec. */
482 page_cur_search_with_match(block, index, node_ptr,
483 PAGE_CUR_LE , &up_match, &low_match,
484 btr_cur_get_page_cur(cursor), NULL);
485
486 err = btr_cur_optimistic_insert(flags, cursor, &insert_offsets,
487 &heap, node_ptr, &insert_rec,
488 &dummy_big_rec, 0, NULL, mtr);
489
490 if (!ins_suc && err == DB_SUCCESS) {
491 ins_suc = true;
492 }
493
494 /* If optimistic insert fail, try reorganize the page
495 and insert again. */
496 if (err != DB_SUCCESS && ins_suc) {
497 btr_page_reorganize(btr_cur_get_page_cur(cursor),
498 index, mtr);
499
500 err = btr_cur_optimistic_insert(flags,
501 cursor,
502 &insert_offsets,
503 &heap,
504 node_ptr,
505 &insert_rec,
506 &dummy_big_rec,
507 0, NULL, mtr);
508
509 /* Will do pessimistic insert */
510 if (err != DB_SUCCESS) {
511 ins_suc = false;
512 }
513 }
514
515 /* Insert succeed, position cursor the inserted rec.*/
516 if (ins_suc) {
517 btr_cur_position(index, insert_rec, block, cursor);
518 offsets = rec_get_offsets(insert_rec,
519 index, offsets, is_leaf,
520 ULINT_UNDEFINED, &heap);
521 }
522
523 /* Delete the rec which cursor2 point to. */
524 if (cursor2) {
525 ulint cur2_pno;
526 rec_t* cur2_rec;
527
528 cursor2->page_cur.rec = page_rec_get_nth(page,
529 cur2_pos);
530
531 cur2_rec = btr_cur_get_rec(cursor2);
532
533 offsets2 = rec_get_offsets(cur2_rec, index, NULL,
534 is_leaf,
535 ULINT_UNDEFINED, &heap);
536
537 /* If the cursor2 position is on a wrong rec, we
538 need to reposition it. */
539 cur2_pno = btr_node_ptr_get_child_page_no(cur2_rec, offsets2);
540 if ((del_page_no != cur2_pno)
541 || (cur2_rec == insert_rec)) {
542 cur2_rec = page_rec_get_next(
543 page_get_infimum_rec(page));
544
545 while (!page_rec_is_supremum(cur2_rec)) {
546 offsets2 = rec_get_offsets(cur2_rec, index,
547 NULL,
548 is_leaf,
549 ULINT_UNDEFINED,
550 &heap);
551 cur2_pno = btr_node_ptr_get_child_page_no(
552 cur2_rec, offsets2);
553 if (cur2_pno == del_page_no) {
554 if (insert_rec != cur2_rec) {
555 cursor2->page_cur.rec =
556 cur2_rec;
557 break;
558 }
559 }
560 cur2_rec = page_rec_get_next(cur2_rec);
561 }
562
563 ut_ad(!page_rec_is_supremum(cur2_rec));
564 }
565
566 rec_info = rec_get_info_bits(cur2_rec,
567 rec_offs_comp(offsets2));
568 if (rec_info & REC_INFO_MIN_REC_FLAG) {
569 /* If we delete the leftmost node
570 pointer on a non-leaf level, we must
571 mark the new leftmost node pointer as
572 the predefined minimum record */
573 rec_t* next_rec = page_rec_get_next(cur2_rec);
574 btr_set_min_rec_mark(next_rec, mtr);
575 }
576
577 ut_ad(cur2_pno == del_page_no && cur2_rec != insert_rec);
578
579 page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
580 index, offsets2, mtr);
581 }
582
583 if (!ins_suc) {
584 mem_heap_t* new_heap = NULL;
585
586 err = btr_cur_pessimistic_insert(
587 flags,
588 cursor, &insert_offsets, &new_heap,
589 node_ptr, &insert_rec, &dummy_big_rec,
590 0, NULL, mtr);
591
592 ut_ad(err == DB_SUCCESS);
593
594 if (new_heap) {
595 mem_heap_free(new_heap);
596 }
597
598 }
599
600 if (cursor2) {
601 btr_cur_compress_if_useful(cursor, FALSE, mtr);
602 }
603 }
604
605#ifdef UNIV_DEBUG
606 ulint left_page_no = btr_page_get_prev(page, mtr);
607
608 if (left_page_no == FIL_NULL) {
609
610 ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
611 page_rec_get_next(page_get_infimum_rec(page)),
612 page_is_comp(page)));
613 }
614#endif /* UNIV_DEBUG */
615
616 mem_heap_free(heap);
617
618 return(true);
619}
620
621/**************************************************************//**
622Update parent page's MBR and Predicate lock information during a split */
623static MY_ATTRIBUTE((nonnull))
624void
625rtr_adjust_upper_level(
626/*===================*/
627 btr_cur_t* sea_cur, /*!< in: search cursor */
628 ulint flags, /*!< in: undo logging and
629 locking flags */
630 buf_block_t* block, /*!< in/out: page to be split */
631 buf_block_t* new_block, /*!< in/out: the new half page */
632 rtr_mbr_t* mbr, /*!< in: MBR on the old page */
633 rtr_mbr_t* new_mbr, /*!< in: MBR on the new page */
634 mtr_t* mtr) /*!< in: mtr */
635{
636 page_t* page;
637 page_t* new_page;
638 ulint page_no;
639 ulint new_page_no;
640 page_zip_des_t* page_zip;
641 page_zip_des_t* new_page_zip;
642 dict_index_t* index = sea_cur->index;
643 btr_cur_t cursor;
644 ulint* offsets;
645 mem_heap_t* heap;
646 ulint level;
647 dtuple_t* node_ptr_upper;
648 ulint prev_page_no;
649 ulint next_page_no;
650 ulint space;
651 page_cur_t* page_cursor;
652 lock_prdt_t prdt;
653 lock_prdt_t new_prdt;
654 dberr_t err;
655 big_rec_t* dummy_big_rec;
656 rec_t* rec;
657
658 /* Create a memory heap where the data tuple is stored */
659 heap = mem_heap_create(1024);
660 memset(&cursor, 0, sizeof(cursor));
661
662 cursor.thr = sea_cur->thr;
663
664 /* Get the level of the split pages */
665 level = btr_page_get_level(buf_block_get_frame(block));
666 ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block)));
667
668 page = buf_block_get_frame(block);
669 page_no = block->page.id.page_no();
670 page_zip = buf_block_get_page_zip(block);
671
672 new_page = buf_block_get_frame(new_block);
673 new_page_no = new_block->page.id.page_no();
674 new_page_zip = buf_block_get_page_zip(new_block);
675
676 /* Set new mbr for the old page on the upper level. */
677 /* Look up the index for the node pointer to page */
678 offsets = rtr_page_get_father_block(
679 NULL, heap, index, block, mtr, sea_cur, &cursor);
680
681 page_cursor = btr_cur_get_page_cur(&cursor);
682
683 rtr_update_mbr_field(&cursor, offsets, NULL, page, mbr, NULL, mtr);
684
685 /* Already updated parent MBR, reset in our path */
686 if (sea_cur->rtr_info) {
687 node_visit_t* node_visit = rtr_get_parent_node(
688 sea_cur, level + 1, true);
689 if (node_visit) {
690 node_visit->mbr_inc = 0;
691 }
692 }
693
694 /* Insert the node for the new page. */
695 node_ptr_upper = rtr_index_build_node_ptr(
696 index, new_mbr,
697 page_rec_get_next(page_get_infimum_rec(new_page)),
698 new_page_no, heap);
699
700 ulint up_match = 0;
701 ulint low_match = 0;
702
703 buf_block_t* father_block = btr_cur_get_block(&cursor);
704
705 page_cur_search_with_match(
706 father_block, index, node_ptr_upper,
707 PAGE_CUR_LE , &up_match, &low_match,
708 btr_cur_get_page_cur(&cursor), NULL);
709
710 err = btr_cur_optimistic_insert(
711 flags
712 | BTR_NO_LOCKING_FLAG
713 | BTR_KEEP_SYS_FLAG
714 | BTR_NO_UNDO_LOG_FLAG,
715 &cursor, &offsets, &heap,
716 node_ptr_upper, &rec, &dummy_big_rec, 0, NULL, mtr);
717
718 if (err == DB_FAIL) {
719 cursor.rtr_info = sea_cur->rtr_info;
720 cursor.tree_height = sea_cur->tree_height;
721
722 err = btr_cur_pessimistic_insert(flags
723 | BTR_NO_LOCKING_FLAG
724 | BTR_KEEP_SYS_FLAG
725 | BTR_NO_UNDO_LOG_FLAG,
726 &cursor, &offsets, &heap,
727 node_ptr_upper, &rec,
728 &dummy_big_rec, 0, NULL, mtr);
729 cursor.rtr_info = NULL;
730 ut_a(err == DB_SUCCESS);
731 }
732
733 prdt.data = static_cast<void*>(mbr);
734 prdt.op = 0;
735 new_prdt.data = static_cast<void*>(new_mbr);
736 new_prdt.op = 0;
737
738 lock_prdt_update_parent(block, new_block, &prdt, &new_prdt,
739 index->table->space->id,
740 page_cursor->block->page.id.page_no());
741
742 mem_heap_free(heap);
743
744 /* Get the previous and next pages of page */
745 prev_page_no = btr_page_get_prev(page, mtr);
746 next_page_no = btr_page_get_next(page, mtr);
747 space = block->page.id.space();
748 const page_size_t& page_size = dict_table_page_size(index->table);
749
750 /* Update page links of the level */
751 if (prev_page_no != FIL_NULL) {
752 page_id_t prev_page_id(space, prev_page_no);
753
754 buf_block_t* prev_block = btr_block_get(
755 prev_page_id, page_size, RW_X_LATCH, index, mtr);
756#ifdef UNIV_BTR_DEBUG
757 ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
758 ut_a(btr_page_get_next(prev_block->frame, mtr)
759 == block->page.id.page_no());
760#endif /* UNIV_BTR_DEBUG */
761
762 btr_page_set_next(buf_block_get_frame(prev_block),
763 buf_block_get_page_zip(prev_block),
764 page_no, mtr);
765 }
766
767 if (next_page_no != FIL_NULL) {
768 page_id_t next_page_id(space, next_page_no);
769
770 buf_block_t* next_block = btr_block_get(
771 next_page_id, page_size, RW_X_LATCH, index, mtr);
772#ifdef UNIV_BTR_DEBUG
773 ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
774 ut_a(btr_page_get_prev(next_block->frame, mtr)
775 == page_get_page_no(page));
776#endif /* UNIV_BTR_DEBUG */
777
778 btr_page_set_prev(buf_block_get_frame(next_block),
779 buf_block_get_page_zip(next_block),
780 new_page_no, mtr);
781 }
782
783 btr_page_set_prev(page, page_zip, prev_page_no, mtr);
784 btr_page_set_next(page, page_zip, new_page_no, mtr);
785
786 btr_page_set_prev(new_page, new_page_zip, page_no, mtr);
787 btr_page_set_next(new_page, new_page_zip, next_page_no, mtr);
788}
789
790/*************************************************************//**
791Moves record list to another page for rtree splitting.
792
793IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
794if new_block is a compressed leaf page in a secondary index.
795This has to be done either within the same mini-transaction,
796or by invoking ibuf_reset_free_bits() before mtr_commit().
797
798@return TRUE on success; FALSE on compression failure */
799static
800ibool
801rtr_split_page_move_rec_list(
802/*=========================*/
803 rtr_split_node_t* node_array, /*!< in: split node array. */
804 int first_rec_group,/*!< in: group number of the
805 first rec. */
806 buf_block_t* new_block, /*!< in/out: index page
807 where to move */
808 buf_block_t* block, /*!< in/out: page containing
809 split_rec */
810 rec_t* first_rec, /*!< in: first record not to
811 move */
812 dict_index_t* index, /*!< in: record descriptor */
813 mem_heap_t* heap, /*!< in: pointer to memory
814 heap, or NULL */
815 mtr_t* mtr) /*!< in: mtr */
816{
817 rtr_split_node_t* cur_split_node;
818 rtr_split_node_t* end_split_node;
819 page_cur_t page_cursor;
820 page_cur_t new_page_cursor;
821 page_t* page;
822 page_t* new_page;
823 ulint offsets_[REC_OFFS_NORMAL_SIZE];
824 ulint* offsets = offsets_;
825 page_zip_des_t* new_page_zip
826 = buf_block_get_page_zip(new_block);
827 rec_t* rec;
828 rec_t* ret;
829 ulint moved = 0;
830 ulint max_to_move = 0;
831 rtr_rec_move_t* rec_move = NULL;
832
833 ut_ad(!dict_index_is_ibuf(index));
834 ut_ad(dict_index_is_spatial(index));
835
836 rec_offs_init(offsets_);
837
838 page_cur_set_before_first(block, &page_cursor);
839 page_cur_set_before_first(new_block, &new_page_cursor);
840
841 page = buf_block_get_frame(block);
842 new_page = buf_block_get_frame(new_block);
843 ret = page_rec_get_prev(page_get_supremum_rec(new_page));
844
845 end_split_node = node_array + page_get_n_recs(page);
846
847 mtr_log_t log_mode = MTR_LOG_NONE;
848
849 if (new_page_zip) {
850 log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
851 }
852
853 max_to_move = page_get_n_recs(
854 buf_block_get_frame(block));
855 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
856 heap,
857 sizeof (*rec_move) * max_to_move));
858 const bool is_leaf = page_is_leaf(page);
859
860 /* Insert the recs in group 2 to new page. */
861 for (cur_split_node = node_array;
862 cur_split_node < end_split_node; ++cur_split_node) {
863 if (cur_split_node->n_node != first_rec_group) {
864 lock_rec_store_on_page_infimum(
865 block, cur_split_node->key);
866
867 offsets = rec_get_offsets(cur_split_node->key,
868 index, offsets, is_leaf,
869 ULINT_UNDEFINED, &heap);
870
871 ut_ad(!is_leaf || cur_split_node->key != first_rec);
872
873 rec = page_cur_insert_rec_low(
874 page_cur_get_rec(&new_page_cursor),
875 index,
876 cur_split_node->key,
877 offsets,
878 mtr);
879
880 ut_a(rec);
881
882 lock_rec_restore_from_page_infimum(
883 new_block, rec, block);
884
885 page_cur_move_to_next(&new_page_cursor);
886
887 rec_move[moved].new_rec = rec;
888 rec_move[moved].old_rec = cur_split_node->key;
889 rec_move[moved].moved = false;
890 moved++;
891
892 if (moved > max_to_move) {
893 ut_ad(0);
894 break;
895 }
896 }
897 }
898
899 /* Update PAGE_MAX_TRX_ID on the uncompressed page.
900 Modifications will be redo logged and copied to the compressed
901 page in page_zip_compress() or page_zip_reorganize() below.
902 Multiple transactions cannot simultaneously operate on the
903 same temp-table in parallel.
904 max_trx_id is ignored for temp tables because it not required
905 for MVCC. */
906 if (is_leaf && !index->table->is_temporary()) {
907 page_update_max_trx_id(new_block, NULL,
908 page_get_max_trx_id(page),
909 mtr);
910 }
911
912 if (new_page_zip) {
913 mtr_set_log_mode(mtr, log_mode);
914
915 if (!page_zip_compress(new_page_zip, new_page, index,
916 page_zip_level, NULL, mtr)) {
917 ulint ret_pos;
918
919 /* Before trying to reorganize the page,
920 store the number of preceding records on the page. */
921 ret_pos = page_rec_get_n_recs_before(ret);
922 /* Before copying, "ret" was the predecessor
923 of the predefined supremum record. If it was
924 the predefined infimum record, then it would
925 still be the infimum, and we would have
926 ret_pos == 0. */
927
928 if (UNIV_UNLIKELY
929 (!page_zip_reorganize(new_block, index, mtr))) {
930
931 if (UNIV_UNLIKELY
932 (!page_zip_decompress(new_page_zip,
933 new_page, FALSE))) {
934 ut_error;
935 }
936#ifdef UNIV_GIS_DEBUG
937 ut_ad(page_validate(new_page, index));
938#endif
939
940 return(false);
941 }
942
943 /* The page was reorganized: Seek to ret_pos. */
944 ret = page_rec_get_nth(new_page, ret_pos);
945 }
946 }
947
948 /* Update the lock table */
949 lock_rtr_move_rec_list(new_block, block, rec_move, moved);
950
951 /* Delete recs in second group from the old page. */
952 for (cur_split_node = node_array;
953 cur_split_node < end_split_node; ++cur_split_node) {
954 if (cur_split_node->n_node != first_rec_group) {
955 page_cur_position(cur_split_node->key,
956 block, &page_cursor);
957 offsets = rec_get_offsets(
958 page_cur_get_rec(&page_cursor), index,
959 offsets, is_leaf, ULINT_UNDEFINED,
960 &heap);
961 page_cur_delete_rec(&page_cursor,
962 index, offsets, mtr);
963 }
964 }
965
966 return(true);
967}
968
969/*************************************************************//**
970Splits an R-tree index page to halves and inserts the tuple. It is assumed
971that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
972released within this function! NOTE that the operation of this
973function must always succeed, we cannot reverse it: therefore enough
974free disk space (2 pages) must be guaranteed to be available before
975this function is called.
976@return inserted record */
977rec_t*
978rtr_page_split_and_insert(
979/*======================*/
980 ulint flags, /*!< in: undo logging and locking flags */
981 btr_cur_t* cursor, /*!< in/out: cursor at which to insert; when the
982 function returns, the cursor is positioned
983 on the predecessor of the inserted record */
984 ulint** offsets,/*!< out: offsets on inserted record */
985 mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
986 const dtuple_t* tuple, /*!< in: tuple to insert */
987 ulint n_ext, /*!< in: number of externally stored columns */
988 mtr_t* mtr) /*!< in: mtr */
989{
990 buf_block_t* block;
991 page_t* page;
992 page_t* new_page;
993 ulint page_no;
994 ulint hint_page_no;
995 buf_block_t* new_block;
996 page_zip_des_t* page_zip;
997 page_zip_des_t* new_page_zip;
998 buf_block_t* insert_block;
999 page_cur_t* page_cursor;
1000 rec_t* rec = 0;
1001 ulint n_recs;
1002 ulint total_data;
1003 ulint insert_size;
1004 rtr_split_node_t* rtr_split_node_array;
1005 rtr_split_node_t* cur_split_node;
1006 rtr_split_node_t* end_split_node;
1007 double* buf_pos;
1008 ulint page_level;
1009 node_seq_t current_ssn;
1010 node_seq_t next_ssn;
1011 buf_block_t* root_block;
1012 rtr_mbr_t mbr;
1013 rtr_mbr_t new_mbr;
1014 lock_prdt_t prdt;
1015 lock_prdt_t new_prdt;
1016 rec_t* first_rec = NULL;
1017 int first_rec_group = 1;
1018 ulint n_iterations = 0;
1019
1020 if (!*heap) {
1021 *heap = mem_heap_create(1024);
1022 }
1023
1024func_start:
1025 mem_heap_empty(*heap);
1026 *offsets = NULL;
1027
1028 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
1029 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
1030 ut_ad(!dict_index_is_online_ddl(cursor->index)
1031 || (flags & BTR_CREATE_FLAG)
1032 || dict_index_is_clust(cursor->index));
1033 ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
1034 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1035
1036 block = btr_cur_get_block(cursor);
1037 page = buf_block_get_frame(block);
1038 page_zip = buf_block_get_page_zip(block);
1039 page_level = btr_page_get_level(page);
1040 current_ssn = page_get_ssn_id(page);
1041
1042 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1043 ut_ad(page_get_n_recs(page) >= 1);
1044
1045 page_no = block->page.id.page_no();
1046
1047 if (!page_has_prev(page) && !page_is_leaf(page)) {
1048 first_rec = page_rec_get_next(
1049 page_get_infimum_rec(buf_block_get_frame(block)));
1050 }
1051
1052 /* Initial split nodes array. */
1053 rtr_split_node_array = rtr_page_split_initialize_nodes(
1054 *heap, cursor, offsets, tuple, &buf_pos);
1055
1056 /* Divide all mbrs to two groups. */
1057 n_recs = ulint(page_get_n_recs(page)) + 1;
1058
1059 end_split_node = rtr_split_node_array + n_recs;
1060
1061#ifdef UNIV_GIS_DEBUG
1062 fprintf(stderr, "Before split a page:\n");
1063 for (cur_split_node = rtr_split_node_array;
1064 cur_split_node < end_split_node; ++cur_split_node) {
1065 for (int i = 0; i < SPDIMS * 2; i++) {
1066 fprintf(stderr, "%.2lf ",
1067 *(cur_split_node->coords + i));
1068 }
1069 fprintf(stderr, "\n");
1070 }
1071#endif
1072
1073 insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
1074 total_data = page_get_data_size(page) + insert_size;
1075 first_rec_group = split_rtree_node(rtr_split_node_array,
1076 static_cast<int>(n_recs),
1077 static_cast<int>(total_data),
1078 static_cast<int>(insert_size),
1079 0, 2, 2, &buf_pos, SPDIMS,
1080 static_cast<uchar*>(first_rec));
1081
1082 /* Allocate a new page to the index */
1083 hint_page_no = page_no + 1;
1084 new_block = btr_page_alloc(cursor->index, hint_page_no, FSP_UP,
1085 page_level, mtr, mtr);
1086 new_page_zip = buf_block_get_page_zip(new_block);
1087 btr_page_create(new_block, new_page_zip, cursor->index,
1088 page_level, mtr);
1089
1090 new_page = buf_block_get_frame(new_block);
1091 ut_ad(page_get_ssn_id(new_page) == 0);
1092
1093 /* Set new ssn to the new page and page. */
1094 page_set_ssn_id(new_block, new_page_zip, current_ssn, mtr);
1095 next_ssn = rtr_get_new_ssn_id(cursor->index);
1096
1097 page_set_ssn_id(block, page_zip, next_ssn, mtr);
1098
1099 /* Keep recs in first group to the old page, move recs in second
1100 groups to the new page. */
1101 if (0
1102#ifdef UNIV_ZIP_COPY
1103 || page_zip
1104#endif
1105 || !rtr_split_page_move_rec_list(rtr_split_node_array,
1106 first_rec_group,
1107 new_block, block, first_rec,
1108 cursor->index, *heap, mtr)) {
1109 ulint n = 0;
1110 rec_t* rec;
1111 ulint moved = 0;
1112 ulint max_to_move = 0;
1113 rtr_rec_move_t* rec_move = NULL;
1114 ulint pos;
1115
1116 /* For some reason, compressing new_page failed,
1117 even though it should contain fewer records than
1118 the original page. Copy the page byte for byte
1119 and then delete the records from both pages
1120 as appropriate. Deleting will always succeed. */
1121 ut_a(new_page_zip);
1122
1123 page_zip_copy_recs(new_page_zip, new_page,
1124 page_zip, page, cursor->index, mtr);
1125
1126 page_cursor = btr_cur_get_page_cur(cursor);
1127
1128 /* Move locks on recs. */
1129 max_to_move = page_get_n_recs(page);
1130 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
1131 *heap,
1132 sizeof (*rec_move) * max_to_move));
1133
1134 /* Init the rec_move array for moving lock on recs. */
1135 for (cur_split_node = rtr_split_node_array;
1136 cur_split_node < end_split_node - 1; ++cur_split_node) {
1137 if (cur_split_node->n_node != first_rec_group) {
1138 pos = page_rec_get_n_recs_before(
1139 cur_split_node->key);
1140 rec = page_rec_get_nth(new_page, pos);
1141 ut_a(rec);
1142
1143 rec_move[moved].new_rec = rec;
1144 rec_move[moved].old_rec = cur_split_node->key;
1145 rec_move[moved].moved = false;
1146 moved++;
1147
1148 if (moved > max_to_move) {
1149 ut_ad(0);
1150 break;
1151 }
1152 }
1153 }
1154
1155 /* Update the lock table */
1156 lock_rtr_move_rec_list(new_block, block, rec_move, moved);
1157
1158 /* Delete recs in first group from the new page. */
1159 for (cur_split_node = rtr_split_node_array;
1160 cur_split_node < end_split_node - 1; ++cur_split_node) {
1161 if (cur_split_node->n_node == first_rec_group) {
1162 ulint pos;
1163
1164 pos = page_rec_get_n_recs_before(
1165 cur_split_node->key);
1166 ut_a(pos > 0);
1167 rec_t* new_rec = page_rec_get_nth(new_page,
1168 pos - n);
1169
1170 ut_a(new_rec && page_rec_is_user_rec(new_rec));
1171 page_cur_position(new_rec, new_block,
1172 page_cursor);
1173
1174 *offsets = rec_get_offsets(
1175 page_cur_get_rec(page_cursor),
1176 cursor->index, *offsets, !page_level,
1177 ULINT_UNDEFINED, heap);
1178
1179 page_cur_delete_rec(page_cursor,
1180 cursor->index, *offsets, mtr);
1181 n++;
1182 }
1183 }
1184
1185 /* Delete recs in second group from the old page. */
1186 for (cur_split_node = rtr_split_node_array;
1187 cur_split_node < end_split_node - 1; ++cur_split_node) {
1188 if (cur_split_node->n_node != first_rec_group) {
1189 page_cur_position(cur_split_node->key,
1190 block, page_cursor);
1191 *offsets = rec_get_offsets(
1192 page_cur_get_rec(page_cursor),
1193 cursor->index, *offsets, !page_level,
1194 ULINT_UNDEFINED, heap);
1195 page_cur_delete_rec(page_cursor,
1196 cursor->index, *offsets, mtr);
1197 }
1198 }
1199
1200#ifdef UNIV_GIS_DEBUG
1201 ut_ad(page_validate(new_page, cursor->index));
1202 ut_ad(page_validate(page, cursor->index));
1203#endif
1204 }
1205
1206 /* Insert the new rec to the proper page. */
1207 cur_split_node = end_split_node - 1;
1208 if (cur_split_node->n_node != first_rec_group) {
1209 insert_block = new_block;
1210 } else {
1211 insert_block = block;
1212 }
1213
1214 /* Reposition the cursor for insert and try insertion */
1215 page_cursor = btr_cur_get_page_cur(cursor);
1216
1217 page_cur_search(insert_block, cursor->index, tuple,
1218 PAGE_CUR_LE, page_cursor);
1219
1220 /* It's possible that the new record is too big to be inserted into
1221 the page, and it'll need the second round split in this case.
1222 We test this scenario here*/
1223 DBUG_EXECUTE_IF("rtr_page_need_second_split",
1224 if (n_iterations == 0) {
1225 rec = NULL;
1226 goto after_insert; }
1227 );
1228
1229 rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
1230 offsets, heap, n_ext, mtr);
1231
1232 /* If insert did not fit, try page reorganization.
1233 For compressed pages, page_cur_tuple_insert() will have
1234 attempted this already. */
1235 if (rec == NULL) {
1236 if (!page_cur_get_page_zip(page_cursor)
1237 && btr_page_reorganize(page_cursor, cursor->index, mtr)) {
1238 rec = page_cur_tuple_insert(page_cursor, tuple,
1239 cursor->index, offsets,
1240 heap, n_ext, mtr);
1241
1242 }
1243 /* If insert fail, we will try to split the insert_block
1244 again. */
1245 }
1246
1247#ifdef UNIV_DEBUG
1248after_insert:
1249#endif
1250 /* Calculate the mbr on the upper half-page, and the mbr on
1251 original page. */
1252 rtr_page_cal_mbr(cursor->index, block, &mbr, *heap);
1253 rtr_page_cal_mbr(cursor->index, new_block, &new_mbr, *heap);
1254 prdt.data = &mbr;
1255 new_prdt.data = &new_mbr;
1256
1257 /* Check any predicate locks need to be moved/copied to the
1258 new page */
1259 lock_prdt_update_split(new_block, &prdt, &new_prdt,
1260 cursor->index->table->space->id, page_no);
1261
1262 /* Adjust the upper level. */
1263 rtr_adjust_upper_level(cursor, flags, block, new_block,
1264 &mbr, &new_mbr, mtr);
1265
1266 /* Save the new ssn to the root page, since we need to reinit
1267 the first ssn value from it after restart server. */
1268
1269 root_block = btr_root_block_get(cursor->index, RW_SX_LATCH, mtr);
1270
1271 page_zip = buf_block_get_page_zip(root_block);
1272 page_set_ssn_id(root_block, page_zip, next_ssn, mtr);
1273
1274 /* Insert fit on the page: update the free bits for the
1275 left and right pages in the same mtr */
1276
1277 if (page_is_leaf(page)) {
1278 ibuf_update_free_bits_for_two_pages_low(
1279 block, new_block, mtr);
1280 }
1281
1282
1283 /* If the new res insert fail, we need to do another split
1284 again. */
1285 if (!rec) {
1286 /* We play safe and reset the free bits for new_page */
1287 if (!dict_index_is_clust(cursor->index)
1288 && !cursor->index->table->is_temporary()) {
1289 ibuf_reset_free_bits(new_block);
1290 ibuf_reset_free_bits(block);
1291 }
1292
1293 /* We need to clean the parent path here and search father
1294 node later, otherwise, it's possible that find a wrong
1295 parent. */
1296 rtr_clean_rtr_info(cursor->rtr_info, true);
1297 cursor->rtr_info = NULL;
1298 n_iterations++;
1299
1300 rec_t* i_rec = page_rec_get_next(page_get_infimum_rec(
1301 buf_block_get_frame(block)));
1302 btr_cur_position(cursor->index, i_rec, block, cursor);
1303
1304 goto func_start;
1305 }
1306
1307#ifdef UNIV_GIS_DEBUG
1308 ut_ad(page_validate(buf_block_get_frame(block), cursor->index));
1309 ut_ad(page_validate(buf_block_get_frame(new_block), cursor->index));
1310
1311 ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
1312#endif
1313 MONITOR_INC(MONITOR_INDEX_SPLIT);
1314
1315 return(rec);
1316}
1317
1318/****************************************************************//**
1319Following the right link to find the proper block for insert.
1320@return the proper block.*/
1321dberr_t
1322rtr_ins_enlarge_mbr(
1323/*================*/
1324 btr_cur_t* btr_cur, /*!< in: btr cursor */
1325 mtr_t* mtr) /*!< in: mtr */
1326{
1327 dberr_t err = DB_SUCCESS;
1328 rtr_mbr_t new_mbr;
1329 buf_block_t* block;
1330 mem_heap_t* heap;
1331 dict_index_t* index = btr_cur->index;
1332 page_cur_t* page_cursor;
1333 ulint* offsets;
1334 node_visit_t* node_visit;
1335 btr_cur_t cursor;
1336 page_t* page;
1337
1338 ut_ad(dict_index_is_spatial(index));
1339
1340 /* If no rtr_info or rtree is one level tree, return. */
1341 if (!btr_cur->rtr_info || btr_cur->tree_height == 1) {
1342 return(err);
1343 }
1344
1345 /* Check path info is not empty. */
1346 ut_ad(!btr_cur->rtr_info->parent_path->empty());
1347
1348 /* Create a memory heap. */
1349 heap = mem_heap_create(1024);
1350
1351 /* Leaf level page is stored in cursor */
1352 page_cursor = btr_cur_get_page_cur(btr_cur);
1353 block = page_cur_get_block(page_cursor);
1354
1355 for (ulint i = 1; i < btr_cur->tree_height; i++) {
1356 node_visit = rtr_get_parent_node(btr_cur, i, true);
1357 ut_ad(node_visit != NULL);
1358
1359 /* If there's no mbr enlarge, return.*/
1360 if (node_visit->mbr_inc == 0) {
1361 block = btr_pcur_get_block(node_visit->cursor);
1362 continue;
1363 }
1364
1365 /* Calculate the mbr of the child page. */
1366 rtr_page_cal_mbr(index, block, &new_mbr, heap);
1367
1368 /* Get father block. */
1369 memset(&cursor, 0, sizeof(cursor));
1370 offsets = rtr_page_get_father_block(
1371 NULL, heap, index, block, mtr, btr_cur, &cursor);
1372
1373 page = buf_block_get_frame(block);
1374
1375 /* Update the mbr field of the rec. */
1376 if (!rtr_update_mbr_field(&cursor, offsets, NULL, page,
1377 &new_mbr, NULL, mtr)) {
1378 err = DB_ERROR;
1379 break;
1380 }
1381
1382 page_cursor = btr_cur_get_page_cur(&cursor);
1383 block = page_cur_get_block(page_cursor);
1384 }
1385
1386 mem_heap_free(heap);
1387
1388 return(err);
1389}
1390
1391/*************************************************************//**
1392Copy recs from a page to new_block of rtree.
1393Differs from page_copy_rec_list_end, because this function does not
1394touch the lock table and max trx id on page or compress the page.
1395
1396IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1397if new_block is a compressed leaf page in a secondary index.
1398This has to be done either within the same mini-transaction,
1399or by invoking ibuf_reset_free_bits() before mtr_commit(). */
1400void
1401rtr_page_copy_rec_list_end_no_locks(
1402/*================================*/
1403 buf_block_t* new_block, /*!< in: index page to copy to */
1404 buf_block_t* block, /*!< in: index page of rec */
1405 rec_t* rec, /*!< in: record on page */
1406 dict_index_t* index, /*!< in: record descriptor */
1407 mem_heap_t* heap, /*!< in/out: heap memory */
1408 rtr_rec_move_t* rec_move, /*!< in: recording records moved */
1409 ulint max_move, /*!< in: num of rec to move */
1410 ulint* num_moved, /*!< out: num of rec to move */
1411 mtr_t* mtr) /*!< in: mtr */
1412{
1413 page_t* new_page = buf_block_get_frame(new_block);
1414 page_cur_t page_cur;
1415 page_cur_t cur1;
1416 rec_t* cur_rec;
1417 ulint offsets_1[REC_OFFS_NORMAL_SIZE];
1418 ulint* offsets1 = offsets_1;
1419 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1420 ulint* offsets2 = offsets_2;
1421 ulint moved = 0;
1422 bool is_leaf = page_is_leaf(new_page);
1423
1424 rec_offs_init(offsets_1);
1425 rec_offs_init(offsets_2);
1426
1427 page_cur_position(rec, block, &cur1);
1428
1429 if (page_cur_is_before_first(&cur1)) {
1430 page_cur_move_to_next(&cur1);
1431 }
1432
1433 btr_assert_not_corrupted(new_block, index);
1434 ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
1435 ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint)
1436 (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
1437
1438 cur_rec = page_rec_get_next(
1439 page_get_infimum_rec(buf_block_get_frame(new_block)));
1440 page_cur_position(cur_rec, new_block, &page_cur);
1441
1442 /* Copy records from the original page to the new page */
1443 while (!page_cur_is_after_last(&cur1)) {
1444 rec_t* cur1_rec = page_cur_get_rec(&cur1);
1445 rec_t* ins_rec;
1446
1447 if (page_rec_is_infimum(cur_rec)) {
1448 cur_rec = page_rec_get_next(cur_rec);
1449 }
1450
1451 offsets1 = rec_get_offsets(cur1_rec, index, offsets1, is_leaf,
1452 ULINT_UNDEFINED, &heap);
1453 while (!page_rec_is_supremum(cur_rec)) {
1454 ulint cur_matched_fields = 0;
1455 int cmp;
1456
1457 offsets2 = rec_get_offsets(cur_rec, index, offsets2,
1458 is_leaf,
1459 ULINT_UNDEFINED, &heap);
1460 cmp = cmp_rec_rec_with_match(cur1_rec, cur_rec,
1461 offsets1, offsets2,
1462 index, FALSE,
1463 &cur_matched_fields);
1464 if (cmp < 0) {
1465 page_cur_move_to_prev(&page_cur);
1466 break;
1467 } else if (cmp > 0) {
1468 /* Skip small recs. */
1469 page_cur_move_to_next(&page_cur);
1470 cur_rec = page_cur_get_rec(&page_cur);
1471 } else if (is_leaf) {
1472 if (rec_get_deleted_flag(cur1_rec,
1473 dict_table_is_comp(index->table))) {
1474 goto next;
1475 } else {
1476 /* We have two identical leaf records,
1477 skip copying the undeleted one, and
1478 unmark deleted on the current page */
1479 btr_rec_set_deleted_flag(
1480 cur_rec, NULL, FALSE);
1481 goto next;
1482 }
1483 }
1484 }
1485
1486 /* If position is on suprenum rec, need to move to
1487 previous rec. */
1488 if (page_rec_is_supremum(cur_rec)) {
1489 page_cur_move_to_prev(&page_cur);
1490 }
1491
1492 cur_rec = page_cur_get_rec(&page_cur);
1493
1494 offsets1 = rec_get_offsets(cur1_rec, index, offsets1, is_leaf,
1495 ULINT_UNDEFINED, &heap);
1496
1497 ins_rec = page_cur_insert_rec_low(cur_rec, index,
1498 cur1_rec, offsets1, mtr);
1499 if (UNIV_UNLIKELY(!ins_rec)) {
1500 fprintf(stderr, "page number %ld and %ld\n",
1501 (long)new_block->page.id.page_no(),
1502 (long)block->page.id.page_no());
1503
1504 ib::fatal() << "rec offset " << page_offset(rec)
1505 << ", cur1 offset "
1506 << page_offset(page_cur_get_rec(&cur1))
1507 << ", cur_rec offset "
1508 << page_offset(cur_rec);
1509 }
1510
1511 rec_move[moved].new_rec = ins_rec;
1512 rec_move[moved].old_rec = cur1_rec;
1513 rec_move[moved].moved = false;
1514 moved++;
1515next:
1516 if (moved > max_move) {
1517 ut_ad(0);
1518 break;
1519 }
1520
1521 page_cur_move_to_next(&cur1);
1522 }
1523
1524 *num_moved = moved;
1525}
1526
1527/*************************************************************//**
1528Copy recs till a specified rec from a page to new_block of rtree. */
1529void
1530rtr_page_copy_rec_list_start_no_locks(
1531/*==================================*/
1532 buf_block_t* new_block, /*!< in: index page to copy to */
1533 buf_block_t* block, /*!< in: index page of rec */
1534 rec_t* rec, /*!< in: record on page */
1535 dict_index_t* index, /*!< in: record descriptor */
1536 mem_heap_t* heap, /*!< in/out: heap memory */
1537 rtr_rec_move_t* rec_move, /*!< in: recording records moved */
1538 ulint max_move, /*!< in: num of rec to move */
1539 ulint* num_moved, /*!< out: num of rec to move */
1540 mtr_t* mtr) /*!< in: mtr */
1541{
1542 page_cur_t cur1;
1543 rec_t* cur_rec;
1544 ulint offsets_1[REC_OFFS_NORMAL_SIZE];
1545 ulint* offsets1 = offsets_1;
1546 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1547 ulint* offsets2 = offsets_2;
1548 page_cur_t page_cur;
1549 ulint moved = 0;
1550 bool is_leaf = page_is_leaf(buf_block_get_frame(block));
1551
1552 rec_offs_init(offsets_1);
1553 rec_offs_init(offsets_2);
1554
1555 page_cur_set_before_first(block, &cur1);
1556 page_cur_move_to_next(&cur1);
1557
1558 cur_rec = page_rec_get_next(
1559 page_get_infimum_rec(buf_block_get_frame(new_block)));
1560 page_cur_position(cur_rec, new_block, &page_cur);
1561
1562 while (page_cur_get_rec(&cur1) != rec) {
1563 rec_t* cur1_rec = page_cur_get_rec(&cur1);
1564 rec_t* ins_rec;
1565
1566 if (page_rec_is_infimum(cur_rec)) {
1567 cur_rec = page_rec_get_next(cur_rec);
1568 }
1569
1570 offsets1 = rec_get_offsets(cur1_rec, index, offsets1, is_leaf,
1571 ULINT_UNDEFINED, &heap);
1572
1573 while (!page_rec_is_supremum(cur_rec)) {
1574 ulint cur_matched_fields = 0;
1575 int cmp;
1576
1577 offsets2 = rec_get_offsets(cur_rec, index, offsets2,
1578 is_leaf,
1579 ULINT_UNDEFINED, &heap);
1580 cmp = cmp_rec_rec_with_match(cur1_rec, cur_rec,
1581 offsets1, offsets2,
1582 index, FALSE,
1583 &cur_matched_fields);
1584 if (cmp < 0) {
1585 page_cur_move_to_prev(&page_cur);
1586 cur_rec = page_cur_get_rec(&page_cur);
1587 break;
1588 } else if (cmp > 0) {
1589 /* Skip small recs. */
1590 page_cur_move_to_next(&page_cur);
1591 cur_rec = page_cur_get_rec(&page_cur);
1592 } else if (is_leaf) {
1593 if (rec_get_deleted_flag(
1594 cur1_rec,
1595 dict_table_is_comp(index->table))) {
1596 goto next;
1597 } else {
1598 /* We have two identical leaf records,
1599 skip copying the undeleted one, and
1600 unmark deleted on the current page */
1601 btr_rec_set_deleted_flag(
1602 cur_rec, NULL, FALSE);
1603 goto next;
1604 }
1605 }
1606 }
1607
1608 /* If position is on suprenum rec, need to move to
1609 previous rec. */
1610 if (page_rec_is_supremum(cur_rec)) {
1611 page_cur_move_to_prev(&page_cur);
1612 }
1613
1614 cur_rec = page_cur_get_rec(&page_cur);
1615
1616 offsets1 = rec_get_offsets(cur1_rec, index, offsets1, is_leaf,
1617 ULINT_UNDEFINED, &heap);
1618
1619 ins_rec = page_cur_insert_rec_low(cur_rec, index,
1620 cur1_rec, offsets1, mtr);
1621 if (UNIV_UNLIKELY(!ins_rec)) {
1622 fprintf(stderr, "page number %ld and %ld\n",
1623 (long)new_block->page.id.page_no(),
1624 (long)block->page.id.page_no());
1625
1626 ib::fatal() << "rec offset " << page_offset(rec)
1627 << ", cur1 offset "
1628 << page_offset(page_cur_get_rec(&cur1))
1629 << ", cur_rec offset "
1630 << page_offset(cur_rec);
1631 }
1632
1633 rec_move[moved].new_rec = ins_rec;
1634 rec_move[moved].old_rec = cur1_rec;
1635 rec_move[moved].moved = false;
1636 moved++;
1637next:
1638 if (moved > max_move) {
1639 ut_ad(0);
1640 break;
1641 }
1642
1643 page_cur_move_to_next(&cur1);
1644 }
1645
1646 *num_moved = moved;
1647}
1648
1649/****************************************************************//**
1650Check two MBRs are identical or need to be merged */
1651bool
1652rtr_merge_mbr_changed(
1653/*==================*/
1654 btr_cur_t* cursor, /*!< in/out: cursor */
1655 btr_cur_t* cursor2, /*!< in: the other cursor */
1656 ulint* offsets, /*!< in: rec offsets */
1657 ulint* offsets2, /*!< in: rec offsets */
1658 rtr_mbr_t* new_mbr) /*!< out: MBR to update */
1659{
1660 double* mbr;
1661 double mbr1[SPDIMS * 2];
1662 double mbr2[SPDIMS * 2];
1663 rec_t* rec;
1664 ulint len;
1665 bool changed = false;
1666
1667 ut_ad(dict_index_is_spatial(cursor->index));
1668
1669 rec = btr_cur_get_rec(cursor);
1670
1671 rtr_read_mbr(rec_get_nth_field(rec, offsets, 0, &len),
1672 reinterpret_cast<rtr_mbr_t*>(mbr1));
1673
1674 rec = btr_cur_get_rec(cursor2);
1675
1676 rtr_read_mbr(rec_get_nth_field(rec, offsets2, 0, &len),
1677 reinterpret_cast<rtr_mbr_t*>(mbr2));
1678
1679 mbr = reinterpret_cast<double*>(new_mbr);
1680
1681 for (int i = 0; i < SPDIMS * 2; i += 2) {
1682 changed = (changed || mbr1[i] != mbr2[i]);
1683 *mbr = mbr1[i] < mbr2[i] ? mbr1[i] : mbr2[i];
1684 mbr++;
1685 changed = (changed || mbr1[i + 1] != mbr2 [i + 1]);
1686 *mbr = mbr1[i + 1] > mbr2[i + 1] ? mbr1[i + 1] : mbr2[i + 1];
1687 mbr++;
1688 }
1689
1690 return(changed);
1691}
1692
1693/****************************************************************//**
1694Merge 2 mbrs and update the the mbr that cursor is on. */
1695dberr_t
1696rtr_merge_and_update_mbr(
1697/*=====================*/
1698 btr_cur_t* cursor, /*!< in/out: cursor */
1699 btr_cur_t* cursor2, /*!< in: the other cursor */
1700 ulint* offsets, /*!< in: rec offsets */
1701 ulint* offsets2, /*!< in: rec offsets */
1702 page_t* child_page, /*!< in: the page. */
1703 mtr_t* mtr) /*!< in: mtr */
1704{
1705 dberr_t err = DB_SUCCESS;
1706 rtr_mbr_t new_mbr;
1707 bool changed = false;
1708
1709 ut_ad(dict_index_is_spatial(cursor->index));
1710
1711 changed = rtr_merge_mbr_changed(cursor, cursor2, offsets, offsets2,
1712 &new_mbr);
1713
1714 /* Update the mbr field of the rec. And will delete the record
1715 pointed by cursor2 */
1716 if (changed) {
1717 if (!rtr_update_mbr_field(cursor, offsets, cursor2, child_page,
1718 &new_mbr, NULL, mtr)) {
1719 err = DB_ERROR;
1720 }
1721 } else {
1722 rtr_node_ptr_delete(cursor2, mtr);
1723 }
1724
1725 return(err);
1726}
1727
1728/*************************************************************//**
1729Deletes on the upper level the node pointer to a page. */
1730void
1731rtr_node_ptr_delete(
1732/*================*/
1733 btr_cur_t* cursor, /*!< in: search cursor, contains information
1734 about parent nodes in search */
1735 mtr_t* mtr) /*!< in: mtr */
1736{
1737 ibool compressed;
1738 dberr_t err;
1739
1740 compressed = btr_cur_pessimistic_delete(&err, TRUE, cursor,
1741 BTR_CREATE_FLAG, false, mtr);
1742 ut_a(err == DB_SUCCESS);
1743
1744 if (!compressed) {
1745 btr_cur_compress_if_useful(cursor, FALSE, mtr);
1746 }
1747}
1748
1749/**************************************************************//**
1750Check whether a Rtree page is child of a parent page
1751@return true if there is child/parent relationship */
1752bool
1753rtr_check_same_block(
1754/*================*/
1755 dict_index_t* index, /*!< in: index tree */
1756 btr_cur_t* cursor, /*!< in/out: position at the parent entry
1757 pointing to the child if successful */
1758 buf_block_t* parentb,/*!< in: parent page to check */
1759 buf_block_t* childb, /*!< in: child Page */
1760 mem_heap_t* heap) /*!< in: memory heap */
1761
1762{
1763 ulint page_no = childb->page.id.page_no();
1764 ulint* offsets;
1765 rec_t* rec = page_rec_get_next(page_get_infimum_rec(
1766 buf_block_get_frame(parentb)));
1767
1768 while (!page_rec_is_supremum(rec)) {
1769 offsets = rec_get_offsets(
1770 rec, index, NULL, false, ULINT_UNDEFINED, &heap);
1771
1772 if (btr_node_ptr_get_child_page_no(rec, offsets) == page_no) {
1773 btr_cur_position(index, rec, parentb, cursor);
1774 return(true);
1775 }
1776
1777 rec = page_rec_get_next(rec);
1778 }
1779
1780 return(false);
1781}
1782
1783/****************************************************************//**
1784Calculate the area increased for a new record
1785@return area increased */
1786double
1787rtr_rec_cal_increase(
1788/*=================*/
1789 const dtuple_t* dtuple, /*!< in: data tuple to insert, which
1790 cause area increase */
1791 const rec_t* rec, /*!< in: physical record which differs from
1792 dtuple in some of the common fields, or which
1793 has an equal number or more fields than
1794 dtuple */
1795 const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
1796 double* area) /*!< out: increased area */
1797{
1798 const dfield_t* dtuple_field;
1799 ulint dtuple_f_len;
1800 ulint rec_f_len;
1801 const byte* rec_b_ptr;
1802 double ret = 0;
1803
1804 ut_ad(!page_rec_is_supremum(rec));
1805 ut_ad(!page_rec_is_infimum(rec));
1806
1807 dtuple_field = dtuple_get_nth_field(dtuple, 0);
1808 dtuple_f_len = dfield_get_len(dtuple_field);
1809
1810 rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1811 ret = rtree_area_increase(
1812 rec_b_ptr,
1813 static_cast<const byte*>(dfield_get_data(dtuple_field)),
1814 static_cast<int>(dtuple_f_len), area);
1815
1816 return(ret);
1817}
1818
1819/** Estimates the number of rows in a given area.
1820@param[in] index index
1821@param[in] tuple range tuple containing mbr, may also be empty tuple
1822@param[in] mode search mode
1823@return estimated number of rows */
1824ha_rows
1825rtr_estimate_n_rows_in_range(
1826 dict_index_t* index,
1827 const dtuple_t* tuple,
1828 page_cur_mode_t mode)
1829{
1830 ut_ad(dict_index_is_spatial(index));
1831
1832 /* Check tuple & mode */
1833 if (tuple->n_fields == 0) {
1834 return(HA_POS_ERROR);
1835 }
1836
1837 switch (mode) {
1838 case PAGE_CUR_DISJOINT:
1839 case PAGE_CUR_CONTAIN:
1840 case PAGE_CUR_INTERSECT:
1841 case PAGE_CUR_WITHIN:
1842 case PAGE_CUR_MBR_EQUAL:
1843 break;
1844 default:
1845 return(HA_POS_ERROR);
1846 }
1847
1848 DBUG_EXECUTE_IF("rtr_pcur_move_to_next_return",
1849 return(2);
1850 );
1851
1852 /* Read mbr from tuple. */
1853 rtr_mbr_t range_mbr;
1854 double range_area;
1855 const byte* range_mbr_ptr;
1856
1857 const dfield_t* dtuple_field = dtuple_get_nth_field(tuple, 0);
1858 ut_ad(dfield_get_len(dtuple_field) >= DATA_MBR_LEN);
1859 range_mbr_ptr = reinterpret_cast<const byte*>(
1860 dfield_get_data(dtuple_field));
1861
1862 rtr_read_mbr(range_mbr_ptr, &range_mbr);
1863 range_area = (range_mbr.xmax - range_mbr.xmin)
1864 * (range_mbr.ymax - range_mbr.ymin);
1865
1866 /* Get index root page. */
1867 mtr_t mtr;
1868
1869 mtr.start();
1870 index->set_modified(mtr);
1871 mtr_s_lock(&index->lock, &mtr);
1872
1873 buf_block_t* block = btr_block_get(
1874 page_id_t(index->table->space->id, index->page),
1875 page_size_t(index->table->space->flags),
1876 RW_S_LATCH, index, &mtr);
1877 const page_t* page = buf_block_get_frame(block);
1878 const unsigned n_recs = page_header_get_field(page, PAGE_N_RECS);
1879
1880 if (n_recs == 0) {
1881 mtr.commit();
1882 return(HA_POS_ERROR);
1883 }
1884
1885 /* Scan records in root page and calculate area. */
1886 double area = 0;
1887 for (const rec_t* rec = page_rec_get_next(
1888 page_get_infimum_rec(block->frame));
1889 !page_rec_is_supremum(rec);
1890 rec = page_rec_get_next_const(rec)) {
1891 rtr_mbr_t mbr;
1892 double rec_area;
1893
1894 rtr_read_mbr(rec, &mbr);
1895
1896 rec_area = (mbr.xmax - mbr.xmin) * (mbr.ymax - mbr.ymin);
1897
1898 if (rec_area == 0) {
1899 switch (mode) {
1900 case PAGE_CUR_CONTAIN:
1901 case PAGE_CUR_INTERSECT:
1902 area += 1;
1903 break;
1904
1905 case PAGE_CUR_DISJOINT:
1906 break;
1907
1908 case PAGE_CUR_WITHIN:
1909 case PAGE_CUR_MBR_EQUAL:
1910 if (rtree_key_cmp(
1911 PAGE_CUR_WITHIN, range_mbr_ptr,
1912 DATA_MBR_LEN, rec, DATA_MBR_LEN)
1913 == 0) {
1914 area += 1;
1915 }
1916
1917 break;
1918
1919 default:
1920 ut_error;
1921 }
1922 } else {
1923 switch (mode) {
1924 case PAGE_CUR_CONTAIN:
1925 case PAGE_CUR_INTERSECT:
1926 area += rtree_area_overlapping(
1927 range_mbr_ptr, rec, DATA_MBR_LEN)
1928 / rec_area;
1929 break;
1930
1931 case PAGE_CUR_DISJOINT:
1932 area += 1;
1933 area -= rtree_area_overlapping(
1934 range_mbr_ptr, rec, DATA_MBR_LEN)
1935 / rec_area;
1936 break;
1937
1938 case PAGE_CUR_WITHIN:
1939 case PAGE_CUR_MBR_EQUAL:
1940 if (!rtree_key_cmp(
1941 PAGE_CUR_WITHIN, range_mbr_ptr,
1942 DATA_MBR_LEN, rec, DATA_MBR_LEN)) {
1943 area += range_area / rec_area;
1944 }
1945
1946 break;
1947 default:
1948 ut_error;
1949 }
1950 }
1951 }
1952
1953 mtr.commit();
1954
1955 if (!std::isfinite(area)) {
1956 return(HA_POS_ERROR);
1957 }
1958
1959 area /= n_recs;
1960 return ha_rows(dict_table_get_n_rows(index->table) * area);
1961}
1962