1/*****************************************************************************
2
3Copyright (c) 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2017, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file gis/gis0sea.cc
22InnoDB R-tree search interfaces
23
24Created 2014/01/16 Jimmy Yang
25***********************************************************************/
26
27#include "fsp0fsp.h"
28#include "page0page.h"
29#include "page0cur.h"
30#include "page0zip.h"
31#include "gis0rtree.h"
32#include "btr0cur.h"
33#include "btr0sea.h"
34#include "btr0pcur.h"
35#include "rem0cmp.h"
36#include "lock0lock.h"
37#include "ibuf0ibuf.h"
38#include "trx0trx.h"
39#include "srv0mon.h"
40#include "que0que.h"
41#include "gis0geo.h"
42
43/** Restore the stored position of a persistent cursor bufferfixing the page */
44static
45bool
46rtr_cur_restore_position(
47 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
48 btr_cur_t* cursor, /*!< in: detached persistent cursor */
49 ulint level, /*!< in: index level */
50 mtr_t* mtr); /*!< in: mtr */
51
52/*************************************************************//**
53Pop out used parent path entry, until we find the parent with matching
54page number */
55static
56void
57rtr_adjust_parent_path(
58/*===================*/
59 rtr_info_t* rtr_info, /* R-Tree info struct */
60 ulint page_no) /* page number to look for */
61{
62 while (!rtr_info->parent_path->empty()) {
63 if (rtr_info->parent_path->back().child_no == page_no) {
64 break;
65 } else {
66 if (rtr_info->parent_path->back().cursor) {
67 btr_pcur_close(
68 rtr_info->parent_path->back().cursor);
69 ut_free(rtr_info->parent_path->back().cursor);
70 }
71
72 rtr_info->parent_path->pop_back();
73 }
74 }
75}
76
77/*************************************************************//**
78Find the next matching record. This function is used by search
79or record locating during index delete/update.
80@return true if there is suitable record found, otherwise false */
81static
82bool
83rtr_pcur_getnext_from_path(
84/*=======================*/
85 const dtuple_t* tuple, /*!< in: data tuple */
86 page_cur_mode_t mode, /*!< in: cursor search mode */
87 btr_cur_t* btr_cur,/*!< in: persistent cursor; NOTE that the
88 function may release the page latch */
89 ulint target_level,
90 /*!< in: target level */
91 ulint latch_mode,
92 /*!< in: latch_mode */
93 bool index_locked,
94 /*!< in: index tree locked */
95 mtr_t* mtr) /*!< in: mtr */
96{
97 dict_index_t* index = btr_cur->index;
98 bool found = false;
99 page_cur_t* page_cursor;
100 ulint level = 0;
101 node_visit_t next_rec;
102 rtr_info_t* rtr_info = btr_cur->rtr_info;
103 node_seq_t page_ssn;
104 ulint my_latch_mode;
105 ulint skip_parent = false;
106 bool new_split = false;
107 bool need_parent;
108 bool for_delete = false;
109 bool for_undo_ins = false;
110
111 /* exhausted all the pages to be searched */
112 if (rtr_info->path->empty()) {
113 return(false);
114 }
115
116 ut_ad(dtuple_get_n_fields_cmp(tuple));
117
118 my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
119
120 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
121 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
122
123 /* There should be no insert coming to this function. Only
124 mode with BTR_MODIFY_* should be delete */
125 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
126 ut_ad(my_latch_mode == BTR_SEARCH_LEAF
127 || my_latch_mode == BTR_MODIFY_LEAF
128 || my_latch_mode == BTR_MODIFY_TREE
129 || my_latch_mode == BTR_CONT_MODIFY_TREE);
130
131 /* Whether need to track parent information. Only need so
132 when we do tree altering operations (such as index page merge) */
133 need_parent = ((my_latch_mode == BTR_MODIFY_TREE
134 || my_latch_mode == BTR_CONT_MODIFY_TREE)
135 && mode == PAGE_CUR_RTREE_LOCATE);
136
137 if (!index_locked) {
138 ut_ad(latch_mode & BTR_SEARCH_LEAF
139 || latch_mode & BTR_MODIFY_LEAF);
140 mtr_s_lock(dict_index_get_lock(index), mtr);
141 } else {
142 ut_ad(mtr_memo_contains_flagged(mtr, &index->lock,
143 MTR_MEMO_SX_LOCK
144 | MTR_MEMO_S_LOCK
145 | MTR_MEMO_X_LOCK));
146 }
147
148 const page_size_t page_size(index->table->space->flags);
149
150 /* Pop each node/page to be searched from "path" structure
151 and do a search on it. Please note, any pages that are in
152 the "path" structure are protected by "page" lock, so tey
153 cannot be shrunk away */
154 do {
155 buf_block_t* block;
156 node_seq_t path_ssn;
157 const page_t* page;
158 ulint rw_latch = RW_X_LATCH;
159 ulint tree_idx;
160
161 mutex_enter(&rtr_info->rtr_path_mutex);
162 next_rec = rtr_info->path->back();
163 rtr_info->path->pop_back();
164 level = next_rec.level;
165 path_ssn = next_rec.seq_no;
166 tree_idx = btr_cur->tree_height - level - 1;
167
168 /* Maintain the parent path info as well, if needed */
169 if (need_parent && !skip_parent && !new_split) {
170 ulint old_level;
171 ulint new_level;
172
173 ut_ad(!rtr_info->parent_path->empty());
174
175 /* Cleanup unused parent info */
176 if (rtr_info->parent_path->back().cursor) {
177 btr_pcur_close(
178 rtr_info->parent_path->back().cursor);
179 ut_free(rtr_info->parent_path->back().cursor);
180 }
181
182 old_level = rtr_info->parent_path->back().level;
183
184 rtr_info->parent_path->pop_back();
185
186 ut_ad(!rtr_info->parent_path->empty());
187
188 /* check whether there is a level change. If so,
189 the current parent path needs to pop enough
190 nodes to adjust to the new search page */
191 new_level = rtr_info->parent_path->back().level;
192
193 if (old_level < new_level) {
194 rtr_adjust_parent_path(
195 rtr_info, next_rec.page_no);
196 }
197
198 ut_ad(!rtr_info->parent_path->empty());
199
200 ut_ad(next_rec.page_no
201 == rtr_info->parent_path->back().child_no);
202 }
203
204 mutex_exit(&rtr_info->rtr_path_mutex);
205
206 skip_parent = false;
207 new_split = false;
208
209 /* Once we have pages in "path", these pages are
210 predicate page locked, so they can't be shrunk away.
211 They also have SSN (split sequence number) to detect
212 splits, so we can directly latch single page while
213 getting them. They can be unlatched if not qualified.
214 One reason for pre-latch is that we might need to position
215 some parent position (requires latch) during search */
216 if (level == 0) {
217 /* S latched for SEARCH_LEAF, and X latched
218 for MODIFY_LEAF */
219 if (my_latch_mode <= BTR_MODIFY_LEAF) {
220 rw_latch = my_latch_mode;
221 }
222
223 if (my_latch_mode == BTR_CONT_MODIFY_TREE
224 || my_latch_mode == BTR_MODIFY_TREE) {
225 rw_latch = RW_NO_LATCH;
226 }
227
228 } else if (level == target_level) {
229 rw_latch = RW_X_LATCH;
230 }
231
232 /* Release previous locked blocks */
233 if (my_latch_mode != BTR_SEARCH_LEAF) {
234 for (ulint idx = 0; idx < btr_cur->tree_height;
235 idx++) {
236 if (rtr_info->tree_blocks[idx]) {
237 mtr_release_block_at_savepoint(
238 mtr,
239 rtr_info->tree_savepoints[idx],
240 rtr_info->tree_blocks[idx]);
241 rtr_info->tree_blocks[idx] = NULL;
242 }
243 }
244 for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3;
245 idx++) {
246 if (rtr_info->tree_blocks[idx]) {
247 mtr_release_block_at_savepoint(
248 mtr,
249 rtr_info->tree_savepoints[idx],
250 rtr_info->tree_blocks[idx]);
251 rtr_info->tree_blocks[idx] = NULL;
252 }
253 }
254 }
255
256 /* set up savepoint to record any locks to be taken */
257 rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
258
259#ifdef UNIV_RTR_DEBUG
260 ut_ad(!(rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_X)
261 ||
262 rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_S))
263 || my_latch_mode == BTR_MODIFY_TREE
264 || my_latch_mode == BTR_CONT_MODIFY_TREE
265 || !page_is_leaf(buf_block_get_frame(
266 btr_cur->page_cur.block)));
267#endif /* UNIV_RTR_DEBUG */
268
269 dberr_t err = DB_SUCCESS;
270
271 block = buf_page_get_gen(
272 page_id_t(index->table->space->id,
273 next_rec.page_no), page_size,
274 rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr, &err);
275
276 if (block == NULL) {
277 continue;
278 } else if (rw_latch != RW_NO_LATCH) {
279 ut_ad(!dict_index_is_ibuf(index));
280 buf_block_dbg_add_level(block, SYNC_TREE_NODE);
281 }
282
283 rtr_info->tree_blocks[tree_idx] = block;
284
285 page = buf_block_get_frame(block);
286 page_ssn = page_get_ssn_id(page);
287
288 /* If there are splits, push the splitted page.
289 Note that we have SX lock on index->lock, there
290 should not be any split/shrink happening here */
291 if (page_ssn > path_ssn) {
292 ulint next_page_no = btr_page_get_next(page, mtr);
293 rtr_non_leaf_stack_push(
294 rtr_info->path, next_page_no, path_ssn,
295 level, 0, NULL, 0);
296
297 if (!srv_read_only_mode
298 && mode != PAGE_CUR_RTREE_INSERT
299 && mode != PAGE_CUR_RTREE_LOCATE) {
300 ut_ad(rtr_info->thr);
301 lock_place_prdt_page_lock(
302 index->table->space->id,
303 next_page_no, index,
304 rtr_info->thr);
305 }
306 new_split = true;
307#if defined(UNIV_GIS_DEBUG)
308 fprintf(stderr,
309 "GIS_DIAG: Splitted page found: %d, %ld\n",
310 static_cast<int>(need_parent), next_page_no);
311#endif
312 }
313
314 page_cursor = btr_cur_get_page_cur(btr_cur);
315 page_cursor->rec = NULL;
316
317 if (mode == PAGE_CUR_RTREE_LOCATE) {
318 if (level == target_level && level == 0) {
319 ulint low_match;
320
321 found = false;
322
323 low_match = page_cur_search(
324 block, index, tuple,
325 PAGE_CUR_LE,
326 btr_cur_get_page_cur(btr_cur));
327
328 if (low_match == dtuple_get_n_fields_cmp(
329 tuple)) {
330 rec_t* rec = btr_cur_get_rec(btr_cur);
331
332 if (!rec_get_deleted_flag(rec,
333 dict_table_is_comp(index->table))
334 || (!for_delete && !for_undo_ins)) {
335 found = true;
336 btr_cur->low_match = low_match;
337 } else {
338 /* mark we found deleted row */
339 btr_cur->rtr_info->fd_del
340 = true;
341 }
342 }
343 } else {
344 page_cur_mode_t page_mode = mode;
345
346 if (level == target_level
347 && target_level != 0) {
348 page_mode = PAGE_CUR_RTREE_GET_FATHER;
349 }
350 found = rtr_cur_search_with_match(
351 block, index, tuple, page_mode,
352 page_cursor, btr_cur->rtr_info);
353
354 /* Save the position of parent if needed */
355 if (found && need_parent) {
356 btr_pcur_t* r_cursor =
357 rtr_get_parent_cursor(
358 btr_cur, level, false);
359
360 rec_t* rec = page_cur_get_rec(
361 page_cursor);
362 page_cur_position(
363 rec, block,
364 btr_pcur_get_page_cur(r_cursor));
365 r_cursor->pos_state =
366 BTR_PCUR_IS_POSITIONED;
367 r_cursor->latch_mode = my_latch_mode;
368 btr_pcur_store_position(r_cursor, mtr);
369#ifdef UNIV_DEBUG
370 ulint num_stored =
371 rtr_store_parent_path(
372 block, btr_cur,
373 rw_latch, level, mtr);
374 ut_ad(num_stored > 0);
375#else
376 rtr_store_parent_path(
377 block, btr_cur, rw_latch,
378 level, mtr);
379#endif /* UNIV_DEBUG */
380 }
381 }
382 } else {
383 found = rtr_cur_search_with_match(
384 block, index, tuple, mode, page_cursor,
385 btr_cur->rtr_info);
386 }
387
388 /* Attach predicate lock if needed, no matter whether
389 there are matched records */
390 if (mode != PAGE_CUR_RTREE_INSERT
391 && mode != PAGE_CUR_RTREE_LOCATE
392 && mode >= PAGE_CUR_CONTAIN
393 && btr_cur->rtr_info->need_prdt_lock
394 && found) {
395 lock_prdt_t prdt;
396
397 trx_t* trx = thr_get_trx(
398 btr_cur->rtr_info->thr);
399 lock_mutex_enter();
400 lock_init_prdt_from_mbr(
401 &prdt, &btr_cur->rtr_info->mbr,
402 mode, trx->lock.lock_heap);
403 lock_mutex_exit();
404
405 if (rw_latch == RW_NO_LATCH) {
406 rw_lock_s_lock(&(block->lock));
407 }
408
409 lock_prdt_lock(block, &prdt, index, LOCK_S,
410 LOCK_PREDICATE, btr_cur->rtr_info->thr);
411
412 if (rw_latch == RW_NO_LATCH) {
413 rw_lock_s_unlock(&(block->lock));
414 }
415 }
416
417 if (found) {
418 if (level == target_level) {
419 page_cur_t* r_cur;;
420
421 if (my_latch_mode == BTR_MODIFY_TREE
422 && level == 0) {
423 ut_ad(rw_latch == RW_NO_LATCH);
424
425 btr_cur_latch_leaves(
426 block,
427 page_id_t(index->table->space->id,
428 block->page.id.page_no()),
429 page_size, BTR_MODIFY_TREE,
430 btr_cur, mtr);
431 }
432
433 r_cur = btr_cur_get_page_cur(btr_cur);
434
435 page_cur_position(
436 page_cur_get_rec(page_cursor),
437 page_cur_get_block(page_cursor),
438 r_cur);
439
440 btr_cur->low_match = level != 0 ?
441 DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
442 : btr_cur->low_match;
443 break;
444 }
445
446 /* Keep the parent path node, which points to
447 last node just located */
448 skip_parent = true;
449 } else {
450 /* Release latch on the current page */
451 ut_ad(rtr_info->tree_blocks[tree_idx]);
452
453 mtr_release_block_at_savepoint(
454 mtr, rtr_info->tree_savepoints[tree_idx],
455 rtr_info->tree_blocks[tree_idx]);
456 rtr_info->tree_blocks[tree_idx] = NULL;
457 }
458
459 } while (!rtr_info->path->empty());
460
461 const rec_t* rec = btr_cur_get_rec(btr_cur);
462
463 if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
464 mtr_commit(mtr);
465 mtr_start(mtr);
466 } else if (!index_locked) {
467 mtr_memo_release(mtr, dict_index_get_lock(index),
468 MTR_MEMO_X_LOCK);
469 }
470
471 return(found);
472}
473
474/*************************************************************//**
475Find the next matching record. This function will first exhaust
476the copied record listed in the rtr_info->matches vector before
477moving to the next page
478@return true if there is suitable record found, otherwise false */
479bool
480rtr_pcur_move_to_next(
481/*==================*/
482 const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
483 tuple must be set so that it cannot get
484 compared to the node ptr page number field! */
485 page_cur_mode_t mode, /*!< in: cursor search mode */
486 btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the
487 function may release the page latch */
488 ulint level, /*!< in: target level */
489 mtr_t* mtr) /*!< in: mtr */
490{
491 rtr_info_t* rtr_info = cursor->btr_cur.rtr_info;
492
493 ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
494
495 mutex_enter(&rtr_info->matches->rtr_match_mutex);
496 /* First retrieve the next record on the current page */
497 if (!rtr_info->matches->matched_recs->empty()) {
498 rtr_rec_t rec;
499 rec = rtr_info->matches->matched_recs->back();
500 rtr_info->matches->matched_recs->pop_back();
501 mutex_exit(&rtr_info->matches->rtr_match_mutex);
502
503 cursor->btr_cur.page_cur.rec = rec.r_rec;
504 cursor->btr_cur.page_cur.block = &rtr_info->matches->block;
505
506 DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
507 return(true);
508 }
509
510 mutex_exit(&rtr_info->matches->rtr_match_mutex);
511
512 /* Fetch the next page */
513 return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur,
514 level, cursor->latch_mode,
515 false, mtr));
516}
517
518/*************************************************************//**
519Check if the cursor holds record pointing to the specified child page
520@return true if it is (pointing to the child page) false otherwise */
521static
522bool
523rtr_compare_cursor_rec(
524/*===================*/
525 dict_index_t* index, /*!< in: index */
526 btr_cur_t* cursor, /*!< in: Cursor to check */
527 ulint page_no, /*!< in: desired child page number */
528 mem_heap_t** heap) /*!< in: memory heap */
529{
530 const rec_t* rec;
531 ulint* offsets;
532
533 rec = btr_cur_get_rec(cursor);
534
535 offsets = rec_get_offsets(
536 rec, index, NULL, false, ULINT_UNDEFINED, heap);
537
538 return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
539}
540
541/**************************************************************//**
542Initializes and opens a persistent cursor to an index tree. It should be
543closed with btr_pcur_close. Mainly called by row_search_index_entry() */
544void
545rtr_pcur_open_low(
546/*==============*/
547 dict_index_t* index, /*!< in: index */
548 ulint level, /*!< in: level in the rtree */
549 const dtuple_t* tuple, /*!< in: tuple on which search done */
550 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_LOCATE, ... */
551 ulint latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */
552 btr_pcur_t* cursor, /*!< in: memory buffer for persistent cursor */
553 const char* file, /*!< in: file name */
554 unsigned line, /*!< in: line where called */
555 mtr_t* mtr) /*!< in: mtr */
556{
557 btr_cur_t* btr_cursor;
558 ulint n_fields;
559 ulint low_match;
560 rec_t* rec;
561 bool tree_latched = false;
562 bool for_delete = false;
563 bool for_undo_ins = false;
564
565 ut_ad(level == 0);
566
567 ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
568 ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
569
570 /* Initialize the cursor */
571
572 btr_pcur_init(cursor);
573
574 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
575 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
576
577 cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
578 cursor->search_mode = mode;
579
580 /* Search with the tree cursor */
581
582 btr_cursor = btr_pcur_get_btr_cur(cursor);
583
584 btr_cursor->rtr_info = rtr_create_rtr_info(false, false,
585 btr_cursor, index);
586
587 /* Purge will SX lock the tree instead of take Page Locks */
588 if (btr_cursor->thr) {
589 btr_cursor->rtr_info->need_page_lock = true;
590 btr_cursor->rtr_info->thr = btr_cursor->thr;
591 }
592
593 btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode,
594 btr_cursor, 0, file, line, mtr);
595 cursor->pos_state = BTR_PCUR_IS_POSITIONED;
596
597 cursor->trx_if_known = NULL;
598
599 low_match = btr_pcur_get_low_match(cursor);
600
601 rec = btr_pcur_get_rec(cursor);
602
603 n_fields = dtuple_get_n_fields(tuple);
604
605 if (latch_mode & BTR_ALREADY_S_LATCHED) {
606 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
607 MTR_MEMO_S_LOCK));
608 tree_latched = true;
609 }
610
611 if (latch_mode & BTR_MODIFY_TREE) {
612 ut_ad(mtr_memo_contains_flagged(mtr, &index->lock,
613 MTR_MEMO_X_LOCK
614 | MTR_MEMO_SX_LOCK));
615 tree_latched = true;
616 }
617
618 if (page_rec_is_infimum(rec) || low_match != n_fields
619 || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
620 && (for_delete || for_undo_ins))) {
621
622 if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
623 && for_delete) {
624 btr_cursor->rtr_info->fd_del = true;
625 btr_cursor->low_match = 0;
626 }
627 /* Did not find matched row in first dive. Release
628 latched block if any before search more pages */
629 if (latch_mode & BTR_MODIFY_LEAF) {
630 ulint tree_idx = btr_cursor->tree_height - 1;
631 rtr_info_t* rtr_info = btr_cursor->rtr_info;
632
633 ut_ad(level == 0);
634
635 if (rtr_info->tree_blocks[tree_idx]) {
636 mtr_release_block_at_savepoint(
637 mtr,
638 rtr_info->tree_savepoints[tree_idx],
639 rtr_info->tree_blocks[tree_idx]);
640 rtr_info->tree_blocks[tree_idx] = NULL;
641 }
642 }
643
644 bool ret = rtr_pcur_getnext_from_path(
645 tuple, mode, btr_cursor, level, latch_mode,
646 tree_latched, mtr);
647
648 if (ret) {
649 low_match = btr_pcur_get_low_match(cursor);
650 ut_ad(low_match == n_fields);
651 }
652 }
653}
654
655/* Get the rtree page father.
656@param[in] index rtree index
657@param[in] block child page in the index
658@param[in] mtr mtr
659@param[in] sea_cur search cursor, contains information
660 about parent nodes in search
661@param[in] cursor cursor on node pointer record,
662 its page x-latched */
663void
664rtr_page_get_father(
665 dict_index_t* index,
666 buf_block_t* block,
667 mtr_t* mtr,
668 btr_cur_t* sea_cur,
669 btr_cur_t* cursor)
670{
671 mem_heap_t* heap = mem_heap_create(100);
672#ifdef UNIV_DEBUG
673 ulint* offsets;
674
675 offsets = rtr_page_get_father_block(
676 NULL, heap, index, block, mtr, sea_cur, cursor);
677
678 ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec,
679 offsets);
680
681 ut_ad(page_no == block->page.id.page_no());
682#else
683 rtr_page_get_father_block(
684 NULL, heap, index, block, mtr, sea_cur, cursor);
685#endif
686
687 mem_heap_free(heap);
688}
689
690/** Returns the upper level node pointer to a R-Tree page. It is assumed
691that mtr holds an SX-latch or X-latch on the tree.
692@return rec_get_offsets() of the node pointer record */
693static
694ulint*
695rtr_page_get_father_node_ptr(
696 ulint* offsets,/*!< in: work area for the return value */
697 mem_heap_t* heap, /*!< in: memory heap to use */
698 btr_cur_t* sea_cur,/*!< in: search cursor */
699 btr_cur_t* cursor, /*!< in: cursor pointing to user record,
700 out: cursor on node pointer record,
701 its page x-latched */
702 mtr_t* mtr) /*!< in: mtr */
703{
704 dtuple_t* tuple;
705 rec_t* user_rec;
706 rec_t* node_ptr;
707 ulint level;
708 ulint page_no;
709 dict_index_t* index;
710 rtr_mbr_t mbr;
711
712 page_no = btr_cur_get_block(cursor)->page.id.page_no();
713 index = btr_cur_get_index(cursor);
714
715 ut_ad(srv_read_only_mode
716 || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
717 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
718
719 ut_ad(dict_index_get_page(index) != page_no);
720
721 level = btr_page_get_level(btr_cur_get_page(cursor));
722
723 user_rec = btr_cur_get_rec(cursor);
724 ut_a(page_rec_is_user_rec(user_rec));
725
726 offsets = rec_get_offsets(user_rec, index, offsets, !level,
727 ULINT_UNDEFINED, &heap);
728 rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
729
730 tuple = rtr_index_build_node_ptr(
731 index, &mbr, user_rec, page_no, heap);
732
733 if (sea_cur && !sea_cur->rtr_info) {
734 sea_cur = NULL;
735 }
736
737 rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor,
738 page_no, mtr);
739
740 node_ptr = btr_cur_get_rec(cursor);
741 ut_ad(!page_rec_is_comp(node_ptr)
742 || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
743 offsets = rec_get_offsets(node_ptr, index, offsets, false,
744 ULINT_UNDEFINED, &heap);
745
746 ulint child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
747
748 if (child_page != page_no) {
749 const rec_t* print_rec;
750
751 ib::fatal error;
752
753 error << "Corruption of index " << index->name
754 << " of table " << index->table->name
755 << " parent page " << page_no
756 << " child page " << child_page;
757
758 print_rec = page_rec_get_next(
759 page_get_infimum_rec(page_align(user_rec)));
760 offsets = rec_get_offsets(print_rec, index, offsets,
761 page_rec_is_leaf(user_rec),
762 ULINT_UNDEFINED, &heap);
763 error << "; child ";
764 rec_print(error.m_oss, print_rec,
765 rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
766 offsets);
767 offsets = rec_get_offsets(node_ptr, index, offsets, false,
768 ULINT_UNDEFINED, &heap);
769 error << "; parent ";
770 rec_print(error.m_oss, print_rec,
771 rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
772 offsets);
773
774 error << ". You should dump + drop + reimport the table to"
775 " fix the corruption. If the crash happens at"
776 " database startup, see "
777 "https://mariadb.com/kb/en/library/xtradbinnodb-recovery-modes/"
778 " about forcing"
779 " recovery. Then dump + drop + reimport.";
780 }
781
782 return(offsets);
783}
784
785/************************************************************//**
786Returns the father block to a page. It is assumed that mtr holds
787an X or SX latch on the tree.
788@return rec_get_offsets() of the node pointer record */
789ulint*
790rtr_page_get_father_block(
791/*======================*/
792 ulint* offsets,/*!< in: work area for the return value */
793 mem_heap_t* heap, /*!< in: memory heap to use */
794 dict_index_t* index, /*!< in: b-tree index */
795 buf_block_t* block, /*!< in: child page in the index */
796 mtr_t* mtr, /*!< in: mtr */
797 btr_cur_t* sea_cur,/*!< in: search cursor, contains information
798 about parent nodes in search */
799 btr_cur_t* cursor) /*!< out: cursor on node pointer record,
800 its page x-latched */
801{
802 rec_t* rec = page_rec_get_next(
803 page_get_infimum_rec(buf_block_get_frame(block)));
804 btr_cur_position(index, rec, block, cursor);
805
806 return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur,
807 cursor, mtr));
808}
809
810/********************************************************************//**
811Returns the upper level node pointer to a R-Tree page. It is assumed
812that mtr holds an x-latch on the tree. */
813void
814rtr_get_father_node(
815/*================*/
816 dict_index_t* index, /*!< in: index */
817 ulint level, /*!< in: the tree level of search */
818 const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
819 tuple must be set so that it cannot get
820 compared to the node ptr page number field! */
821 btr_cur_t* sea_cur,/*!< in: search cursor */
822 btr_cur_t* btr_cur,/*!< in/out: tree cursor; the cursor page is
823 s- or x-latched, but see also above! */
824 ulint page_no,/*!< Current page no */
825 mtr_t* mtr) /*!< in: mtr */
826{
827 mem_heap_t* heap = NULL;
828 bool ret = false;
829 const rec_t* rec;
830 ulint n_fields;
831 bool new_rtr = false;
832
833 /* Try to optimally locate the parent node. Level should always
834 less than sea_cur->tree_height unless the root is splitting */
835 if (sea_cur && sea_cur->tree_height > level) {
836
837 ut_ad(mtr_memo_contains_flagged(mtr,
838 dict_index_get_lock(index),
839 MTR_MEMO_X_LOCK
840 | MTR_MEMO_SX_LOCK));
841 ret = rtr_cur_restore_position(
842 BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
843
844 /* Once we block shrink tree nodes while there are
845 active search on it, this optimal locating should always
846 succeeds */
847 ut_ad(ret);
848
849 if (ret) {
850 btr_pcur_t* r_cursor = rtr_get_parent_cursor(
851 sea_cur, level, false);
852
853 rec = btr_pcur_get_rec(r_cursor);
854
855 ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
856 page_cur_position(rec,
857 btr_pcur_get_block(r_cursor),
858 btr_cur_get_page_cur(btr_cur));
859 btr_cur->rtr_info = sea_cur->rtr_info;
860 btr_cur->tree_height = sea_cur->tree_height;
861 ut_ad(rtr_compare_cursor_rec(
862 index, btr_cur, page_no, &heap));
863 goto func_exit;
864 }
865 }
866
867 /* We arrive here in one of two scenario
868 1) check table and btr_valide
869 2) index root page being raised */
870 ut_ad(!sea_cur || sea_cur->tree_height == level);
871
872 if (btr_cur->rtr_info) {
873 rtr_clean_rtr_info(btr_cur->rtr_info, true);
874 } else {
875 new_rtr = true;
876 }
877
878 btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
879
880 if (sea_cur && sea_cur->tree_height == level) {
881 /* root split, and search the new root */
882 btr_cur_search_to_nth_level(
883 index, level, tuple, PAGE_CUR_RTREE_LOCATE,
884 BTR_CONT_MODIFY_TREE, btr_cur, 0,
885 __FILE__, __LINE__, mtr);
886
887 } else {
888 /* btr_validate */
889 ut_ad(level >= 1);
890 ut_ad(!sea_cur);
891
892 btr_cur_search_to_nth_level(
893 index, level, tuple, PAGE_CUR_RTREE_LOCATE,
894 BTR_CONT_MODIFY_TREE, btr_cur, 0,
895 __FILE__, __LINE__, mtr);
896
897 rec = btr_cur_get_rec(btr_cur);
898 n_fields = dtuple_get_n_fields_cmp(tuple);
899
900 if (page_rec_is_infimum(rec)
901 || (btr_cur->low_match != n_fields)) {
902 ret = rtr_pcur_getnext_from_path(
903 tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
904 level, BTR_CONT_MODIFY_TREE,
905 true, mtr);
906
907 ut_ad(ret && btr_cur->low_match == n_fields);
908 }
909 }
910
911 ret = rtr_compare_cursor_rec(
912 index, btr_cur, page_no, &heap);
913
914 ut_ad(ret);
915
916func_exit:
917 if (heap) {
918 mem_heap_free(heap);
919 }
920
921 if (new_rtr && btr_cur->rtr_info) {
922 rtr_clean_rtr_info(btr_cur->rtr_info, true);
923 btr_cur->rtr_info = NULL;
924 }
925}
926
927/*******************************************************************//**
928Create a RTree search info structure */
929rtr_info_t*
930rtr_create_rtr_info(
931/******************/
932 bool need_prdt, /*!< in: Whether predicate lock
933 is needed */
934 bool init_matches, /*!< in: Whether to initiate the
935 "matches" structure for collecting
936 matched leaf records */
937 btr_cur_t* cursor, /*!< in: tree search cursor */
938 dict_index_t* index) /*!< in: index struct */
939{
940 rtr_info_t* rtr_info;
941
942 index = index ? index : cursor->index;
943 ut_ad(index);
944
945 rtr_info = static_cast<rtr_info_t*>(ut_zalloc_nokey(sizeof(*rtr_info)));
946
947 rtr_info->allocated = true;
948 rtr_info->cursor = cursor;
949 rtr_info->index = index;
950
951 if (init_matches) {
952 rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
953 rtr_info->matches = static_cast<matched_rec_t*>(
954 mem_heap_zalloc(
955 rtr_info->heap,
956 sizeof(*rtr_info->matches)));
957
958 rtr_info->matches->matched_recs
959 = UT_NEW_NOKEY(rtr_rec_vector());
960
961 rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf
962 + UNIV_PAGE_SIZE_MAX + 1);
963 mutex_create(LATCH_ID_RTR_MATCH_MUTEX,
964 &rtr_info->matches->rtr_match_mutex);
965 rw_lock_create(PFS_NOT_INSTRUMENTED,
966 &(rtr_info->matches->block.lock),
967 SYNC_LEVEL_VARYING);
968 }
969
970 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
971 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
972 rtr_info->need_prdt_lock = need_prdt;
973 mutex_create(LATCH_ID_RTR_PATH_MUTEX,
974 &rtr_info->rtr_path_mutex);
975
976 mutex_enter(&index->rtr_track->rtr_active_mutex);
977 index->rtr_track->rtr_active->push_back(rtr_info);
978 mutex_exit(&index->rtr_track->rtr_active_mutex);
979 return(rtr_info);
980}
981
982/*******************************************************************//**
983Update a btr_cur_t with rtr_info */
984void
985rtr_info_update_btr(
986/******************/
987 btr_cur_t* cursor, /*!< in/out: tree cursor */
988 rtr_info_t* rtr_info) /*!< in: rtr_info to set to the
989 cursor */
990{
991 ut_ad(rtr_info);
992
993 cursor->rtr_info = rtr_info;
994}
995
996/*******************************************************************//**
997Initialize a R-Tree Search structure */
998void
999rtr_init_rtr_info(
1000/****************/
1001 rtr_info_t* rtr_info, /*!< in: rtr_info to set to the
1002 cursor */
1003 bool need_prdt, /*!< in: Whether predicate lock is
1004 needed */
1005 btr_cur_t* cursor, /*!< in: tree search cursor */
1006 dict_index_t* index, /*!< in: index structure */
1007 bool reinit) /*!< in: Whether this is a reinit */
1008{
1009 ut_ad(rtr_info);
1010
1011 if (!reinit) {
1012 /* Reset all members. */
1013 rtr_info->path = NULL;
1014 rtr_info->parent_path = NULL;
1015 rtr_info->matches = NULL;
1016
1017 mutex_create(LATCH_ID_RTR_PATH_MUTEX,
1018 &rtr_info->rtr_path_mutex);
1019
1020 memset(rtr_info->tree_blocks, 0x0,
1021 sizeof(rtr_info->tree_blocks));
1022 memset(rtr_info->tree_savepoints, 0x0,
1023 sizeof(rtr_info->tree_savepoints));
1024 rtr_info->mbr.xmin = 0.0;
1025 rtr_info->mbr.xmax = 0.0;
1026 rtr_info->mbr.ymin = 0.0;
1027 rtr_info->mbr.ymax = 0.0;
1028 rtr_info->thr = NULL;
1029 rtr_info->heap = NULL;
1030 rtr_info->cursor = NULL;
1031 rtr_info->index = NULL;
1032 rtr_info->need_prdt_lock = false;
1033 rtr_info->need_page_lock = false;
1034 rtr_info->allocated = false;
1035 rtr_info->mbr_adj = false;
1036 rtr_info->fd_del = false;
1037 rtr_info->search_tuple = NULL;
1038 rtr_info->search_mode = PAGE_CUR_UNSUPP;
1039 }
1040
1041 ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
1042
1043 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
1044 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1045 rtr_info->need_prdt_lock = need_prdt;
1046 rtr_info->cursor = cursor;
1047 rtr_info->index = index;
1048
1049 mutex_enter(&index->rtr_track->rtr_active_mutex);
1050 index->rtr_track->rtr_active->push_back(rtr_info);
1051 mutex_exit(&index->rtr_track->rtr_active_mutex);
1052}
1053
1054/**************************************************************//**
1055Clean up R-Tree search structure */
1056void
1057rtr_clean_rtr_info(
1058/*===============*/
1059 rtr_info_t* rtr_info, /*!< in: RTree search info */
1060 bool free_all) /*!< in: need to free rtr_info itself */
1061{
1062 dict_index_t* index;
1063 bool initialized = false;
1064
1065 if (!rtr_info) {
1066 return;
1067 }
1068
1069 index = rtr_info->index;
1070
1071 if (index) {
1072 mutex_enter(&index->rtr_track->rtr_active_mutex);
1073 }
1074
1075 while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
1076 btr_pcur_t* cur = rtr_info->parent_path->back().cursor;
1077 rtr_info->parent_path->pop_back();
1078
1079 if (cur) {
1080 btr_pcur_close(cur);
1081 ut_free(cur);
1082 }
1083 }
1084
1085 UT_DELETE(rtr_info->parent_path);
1086 rtr_info->parent_path = NULL;
1087
1088 if (rtr_info->path != NULL) {
1089 UT_DELETE(rtr_info->path);
1090 rtr_info->path = NULL;
1091 initialized = true;
1092 }
1093
1094 if (rtr_info->matches) {
1095 rtr_info->matches->used = false;
1096 rtr_info->matches->locked = false;
1097 rtr_info->matches->valid = false;
1098 rtr_info->matches->matched_recs->clear();
1099 }
1100
1101 if (index) {
1102 index->rtr_track->rtr_active->remove(rtr_info);
1103 mutex_exit(&index->rtr_track->rtr_active_mutex);
1104 }
1105
1106 if (free_all) {
1107 if (rtr_info->matches) {
1108 if (rtr_info->matches->matched_recs != NULL) {
1109 UT_DELETE(rtr_info->matches->matched_recs);
1110 }
1111
1112 rw_lock_free(&(rtr_info->matches->block.lock));
1113
1114 mutex_destroy(&rtr_info->matches->rtr_match_mutex);
1115 }
1116
1117 if (rtr_info->heap) {
1118 mem_heap_free(rtr_info->heap);
1119 }
1120
1121 if (initialized) {
1122 mutex_destroy(&rtr_info->rtr_path_mutex);
1123 }
1124
1125 if (rtr_info->allocated) {
1126 ut_free(rtr_info);
1127 }
1128 }
1129}
1130
1131/**************************************************************//**
1132Rebuilt the "path" to exclude the removing page no */
1133static
1134void
1135rtr_rebuild_path(
1136/*=============*/
1137 rtr_info_t* rtr_info, /*!< in: RTree search info */
1138 ulint page_no) /*!< in: need to free rtr_info itself */
1139{
1140 rtr_node_path_t* new_path
1141 = UT_NEW_NOKEY(rtr_node_path_t());
1142
1143 rtr_node_path_t::iterator rit;
1144#ifdef UNIV_DEBUG
1145 ulint before_size = rtr_info->path->size();
1146#endif /* UNIV_DEBUG */
1147
1148 for (rit = rtr_info->path->begin();
1149 rit != rtr_info->path->end(); ++rit) {
1150 node_visit_t next_rec = *rit;
1151
1152 if (next_rec.page_no == page_no) {
1153 continue;
1154 }
1155
1156 new_path->push_back(next_rec);
1157#ifdef UNIV_DEBUG
1158 node_visit_t rec = new_path->back();
1159 ut_ad(rec.level < rtr_info->cursor->tree_height
1160 && rec.page_no > 0);
1161#endif /* UNIV_DEBUG */
1162 }
1163
1164 UT_DELETE(rtr_info->path);
1165
1166 ut_ad(new_path->size() == before_size - 1);
1167
1168 rtr_info->path = new_path;
1169
1170 if (!rtr_info->parent_path->empty()) {
1171 rtr_node_path_t* new_parent_path = UT_NEW_NOKEY(
1172 rtr_node_path_t());
1173
1174 for (rit = rtr_info->parent_path->begin();
1175 rit != rtr_info->parent_path->end(); ++rit) {
1176 node_visit_t next_rec = *rit;
1177
1178 if (next_rec.child_no == page_no) {
1179 btr_pcur_t* cur = next_rec.cursor;
1180
1181 if (cur) {
1182 btr_pcur_close(cur);
1183 ut_free(cur);
1184 }
1185
1186 continue;
1187 }
1188
1189 new_parent_path->push_back(next_rec);
1190 }
1191 UT_DELETE(rtr_info->parent_path);
1192 rtr_info->parent_path = new_parent_path;
1193 }
1194
1195}
1196
1197/**************************************************************//**
1198Check whether a discarding page is in anyone's search path */
1199void
1200rtr_check_discard_page(
1201/*===================*/
1202 dict_index_t* index, /*!< in: index */
1203 btr_cur_t* cursor, /*!< in: cursor on the page to discard: not on
1204 the root page */
1205 buf_block_t* block) /*!< in: block of page to be discarded */
1206{
1207 ulint pageno = block->page.id.page_no();
1208 rtr_info_t* rtr_info;
1209 rtr_info_active::iterator it;
1210
1211 mutex_enter(&index->rtr_track->rtr_active_mutex);
1212
1213 for (it = index->rtr_track->rtr_active->begin();
1214 it != index->rtr_track->rtr_active->end(); ++it) {
1215 rtr_info = *it;
1216 rtr_node_path_t::iterator rit;
1217 bool found = false;
1218
1219 if (cursor && rtr_info == cursor->rtr_info) {
1220 continue;
1221 }
1222
1223 mutex_enter(&rtr_info->rtr_path_mutex);
1224 for (rit = rtr_info->path->begin();
1225 rit != rtr_info->path->end(); ++rit) {
1226 node_visit_t node = *rit;
1227
1228 if (node.page_no == pageno) {
1229 found = true;
1230 break;
1231 }
1232 }
1233
1234 if (found) {
1235 rtr_rebuild_path(rtr_info, pageno);
1236 }
1237 mutex_exit(&rtr_info->rtr_path_mutex);
1238
1239 if (rtr_info->matches) {
1240 mutex_enter(&rtr_info->matches->rtr_match_mutex);
1241
1242 if ((&rtr_info->matches->block)->page.id.page_no()
1243 == pageno) {
1244 if (!rtr_info->matches->matched_recs->empty()) {
1245 rtr_info->matches->matched_recs->clear();
1246 }
1247 ut_ad(rtr_info->matches->matched_recs->empty());
1248 rtr_info->matches->valid = false;
1249 }
1250
1251 mutex_exit(&rtr_info->matches->rtr_match_mutex);
1252 }
1253 }
1254
1255 mutex_exit(&index->rtr_track->rtr_active_mutex);
1256
1257 lock_mutex_enter();
1258 lock_prdt_page_free_from_discard(block, lock_sys.prdt_hash);
1259 lock_prdt_page_free_from_discard(block, lock_sys.prdt_page_hash);
1260 lock_mutex_exit();
1261}
1262
1263/** Restore the stored position of a persistent cursor bufferfixing the page */
1264static
1265bool
1266rtr_cur_restore_position(
1267 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
1268 btr_cur_t* btr_cur, /*!< in: detached persistent cursor */
1269 ulint level, /*!< in: index level */
1270 mtr_t* mtr) /*!< in: mtr */
1271{
1272 dict_index_t* index;
1273 mem_heap_t* heap;
1274 btr_pcur_t* r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1275 dtuple_t* tuple;
1276 bool ret = false;
1277
1278 ut_ad(mtr);
1279 ut_ad(r_cursor);
1280 ut_ad(mtr->is_active());
1281
1282 index = btr_cur_get_index(btr_cur);
1283
1284 if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
1285 || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1286 return(false);
1287 }
1288
1289 DBUG_EXECUTE_IF(
1290 "rtr_pessimistic_position",
1291 r_cursor->modify_clock = 100;
1292 );
1293
1294 ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1295
1296 if (!buf_pool_is_obsolete(r_cursor->withdraw_clock)
1297 && buf_page_optimistic_get(RW_X_LATCH,
1298 r_cursor->block_when_stored,
1299 r_cursor->modify_clock,
1300 __FILE__, __LINE__, mtr)) {
1301 ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED);
1302
1303 ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
1304#ifdef UNIV_DEBUG
1305 do {
1306 const rec_t* rec;
1307 const ulint* offsets1;
1308 const ulint* offsets2;
1309 ulint comp;
1310
1311 rec = btr_pcur_get_rec(r_cursor);
1312
1313 heap = mem_heap_create(256);
1314 offsets1 = rec_get_offsets(
1315 r_cursor->old_rec, index, NULL, !level,
1316 r_cursor->old_n_fields, &heap);
1317 offsets2 = rec_get_offsets(
1318 rec, index, NULL, !level,
1319 r_cursor->old_n_fields, &heap);
1320
1321 comp = rec_offs_comp(offsets1);
1322
1323 if (rec_get_info_bits(r_cursor->old_rec, comp)
1324 & REC_INFO_MIN_REC_FLAG) {
1325 ut_ad(rec_get_info_bits(rec, comp)
1326 & REC_INFO_MIN_REC_FLAG);
1327 } else {
1328
1329 ut_ad(!cmp_rec_rec(r_cursor->old_rec,
1330 rec, offsets1, offsets2,
1331 index));
1332 }
1333
1334 mem_heap_free(heap);
1335 } while (0);
1336#endif /* UNIV_DEBUG */
1337
1338 return(true);
1339 }
1340
1341 /* Page has changed, for R-Tree, the page cannot be shrunk away,
1342 so we search the page and its right siblings */
1343 buf_block_t* block;
1344 node_seq_t page_ssn;
1345 const page_t* page;
1346 page_cur_t* page_cursor;
1347 node_visit_t* node = rtr_get_parent_node(btr_cur, level, false);
1348 node_seq_t path_ssn = node->seq_no;
1349 const page_size_t page_size(index->table->space->flags);
1350
1351 ulint page_no = node->page_no;
1352
1353 heap = mem_heap_create(256);
1354
1355 tuple = dict_index_build_data_tuple(r_cursor->old_rec, index, !level,
1356 r_cursor->old_n_fields, heap);
1357
1358 page_cursor = btr_pcur_get_page_cur(r_cursor);
1359 ut_ad(r_cursor == node->cursor);
1360
1361search_again:
1362 dberr_t err = DB_SUCCESS;
1363
1364 block = buf_page_get_gen(
1365 page_id_t(index->table->space->id, page_no),
1366 page_size, RW_X_LATCH, NULL,
1367 BUF_GET, __FILE__, __LINE__, mtr, &err);
1368
1369 ut_ad(block);
1370
1371 /* Get the page SSN */
1372 page = buf_block_get_frame(block);
1373 page_ssn = page_get_ssn_id(page);
1374
1375 ulint low_match = page_cur_search(
1376 block, index, tuple, PAGE_CUR_LE, page_cursor);
1377
1378 if (low_match == r_cursor->old_n_fields) {
1379 const rec_t* rec;
1380 const ulint* offsets1;
1381 const ulint* offsets2;
1382 ulint comp;
1383
1384 rec = btr_pcur_get_rec(r_cursor);
1385
1386 offsets1 = rec_get_offsets(
1387 r_cursor->old_rec, index, NULL, !level,
1388 r_cursor->old_n_fields, &heap);
1389 offsets2 = rec_get_offsets(
1390 rec, index, NULL, !level,
1391 r_cursor->old_n_fields, &heap);
1392
1393 comp = rec_offs_comp(offsets1);
1394
1395 if ((rec_get_info_bits(r_cursor->old_rec, comp)
1396 & REC_INFO_MIN_REC_FLAG)
1397 && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1398 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1399 ret = true;
1400 } else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2,
1401 index)) {
1402 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1403 ret = true;
1404 }
1405 }
1406
1407 /* Check the page SSN to see if it has been splitted, if so, search
1408 the right page */
1409 if (!ret && page_ssn > path_ssn) {
1410 page_no = btr_page_get_next(page, mtr);
1411 goto search_again;
1412 }
1413
1414 mem_heap_free(heap);
1415
1416 return(ret);
1417}
1418
1419/****************************************************************//**
1420Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
1421static
1422void
1423rtr_leaf_push_match_rec(
1424/*====================*/
1425 const rec_t* rec, /*!< in: record to copy */
1426 rtr_info_t* rtr_info, /*!< in/out: search stack */
1427 ulint* offsets, /*!< in: offsets */
1428 bool is_comp) /*!< in: is compact format */
1429{
1430 byte* buf;
1431 matched_rec_t* match_rec = rtr_info->matches;
1432 rec_t* copy;
1433 ulint data_len;
1434 rtr_rec_t rtr_rec;
1435
1436 buf = match_rec->block.frame + match_rec->used;
1437 ut_ad(page_rec_is_leaf(rec));
1438
1439 copy = rec_copy(buf, rec, offsets);
1440
1441 if (is_comp) {
1442 rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1443 } else {
1444 rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1445 }
1446
1447 rtr_rec.r_rec = copy;
1448 rtr_rec.locked = false;
1449
1450 match_rec->matched_recs->push_back(rtr_rec);
1451 match_rec->valid = true;
1452
1453 data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1454 match_rec->used += data_len;
1455
1456 ut_ad(match_rec->used < srv_page_size);
1457}
1458
1459/**************************************************************//**
1460Store the parent path cursor
1461@return number of cursor stored */
1462ulint
1463rtr_store_parent_path(
1464/*==================*/
1465 const buf_block_t* block, /*!< in: block of the page */
1466 btr_cur_t* btr_cur,/*!< in/out: persistent cursor */
1467 ulint latch_mode,
1468 /*!< in: latch_mode */
1469 ulint level, /*!< in: index level */
1470 mtr_t* mtr) /*!< in: mtr */
1471{
1472 ulint num = btr_cur->rtr_info->parent_path->size();
1473 ulint num_stored = 0;
1474
1475 while (num >= 1) {
1476 node_visit_t* node = &(*btr_cur->rtr_info->parent_path)[
1477 num - 1];
1478 btr_pcur_t* r_cursor = node->cursor;
1479 buf_block_t* cur_block;
1480
1481 if (node->level > level) {
1482 break;
1483 }
1484
1485 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1486 r_cursor->latch_mode = latch_mode;
1487
1488 cur_block = btr_pcur_get_block(r_cursor);
1489
1490 if (cur_block == block) {
1491 btr_pcur_store_position(r_cursor, mtr);
1492 num_stored++;
1493 } else {
1494 break;
1495 }
1496
1497 num--;
1498 }
1499
1500 return(num_stored);
1501}
1502/**************************************************************//**
1503push a nonleaf index node to the search path for insertion */
1504static
1505void
1506rtr_non_leaf_insert_stack_push(
1507/*===========================*/
1508 dict_index_t* index, /*!< in: index descriptor */
1509 rtr_node_path_t* path, /*!< in/out: search path */
1510 ulint level, /*!< in: index page level */
1511 ulint child_no,/*!< in: child page no */
1512 const buf_block_t* block, /*!< in: block of the page */
1513 const rec_t* rec, /*!< in: positioned record */
1514 double mbr_inc)/*!< in: MBR needs to be enlarged */
1515{
1516 node_seq_t new_seq;
1517 btr_pcur_t* my_cursor;
1518 ulint page_no = block->page.id.page_no();
1519
1520 my_cursor = static_cast<btr_pcur_t*>(
1521 ut_malloc_nokey(sizeof(*my_cursor)));
1522
1523 btr_pcur_init(my_cursor);
1524
1525 page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1526
1527 (btr_pcur_get_btr_cur(my_cursor))->index = index;
1528
1529 new_seq = rtr_get_current_ssn_id(index);
1530 rtr_non_leaf_stack_push(path, page_no, new_seq, level, child_no,
1531 my_cursor, mbr_inc);
1532}
1533
1534/** Copy a buf_block_t strcuture, except "block->lock" and "block->mutex".
1535@param[in,out] matches copy to match->block
1536@param[in] block block to copy */
1537static
1538void
1539rtr_copy_buf(
1540 matched_rec_t* matches,
1541 const buf_block_t* block)
1542{
1543 /* Copy all members of "block" to "matches->block" except "mutex"
1544 and "lock". We skip "mutex" and "lock" because they are not used
1545 from the dummy buf_block_t we create here and because memcpy()ing
1546 them generates (valid) compiler warnings that the vtable pointer
1547 will be copied. It is also undefined what will happen with the
1548 newly memcpy()ed mutex if the source mutex was acquired by
1549 (another) thread while it was copied. */
1550 memcpy(&matches->block.page, &block->page, sizeof(buf_page_t));
1551 matches->block.frame = block->frame;
1552 matches->block.unzip_LRU = block->unzip_LRU;
1553
1554 ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1555 ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1556
1557 /* Skip buf_block_t::mutex */
1558 /* Skip buf_block_t::lock */
1559 matches->block.lock_hash_val = block->lock_hash_val;
1560 matches->block.modify_clock = block->modify_clock;
1561#ifdef BTR_CUR_HASH_ADAPT
1562 matches->block.n_hash_helps = block->n_hash_helps;
1563 matches->block.n_fields = block->n_fields;
1564 matches->block.left_side = block->left_side;
1565#if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1566 matches->block.n_pointers = block->n_pointers;
1567#endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1568 matches->block.curr_n_fields = block->curr_n_fields;
1569 matches->block.curr_left_side = block->curr_left_side;
1570 matches->block.index = block->index;
1571#endif /* BTR_CUR_HASH_ADAPT */
1572 ut_d(matches->block.debug_latch = block->debug_latch);
1573
1574}
1575
1576/****************************************************************//**
1577Generate a shadow copy of the page block header to save the
1578matched records */
1579static
1580void
1581rtr_init_match(
1582/*===========*/
1583 matched_rec_t* matches,/*!< in/out: match to initialize */
1584 const buf_block_t* block, /*!< in: buffer block */
1585 const page_t* page) /*!< in: buffer page */
1586{
1587 ut_ad(matches->matched_recs->empty());
1588 matches->locked = false;
1589 rtr_copy_buf(matches, block);
1590 matches->block.frame = matches->bufp;
1591 matches->valid = false;
1592 /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1593 use infimum/supremum of this page as normal btr page for search. */
1594 memcpy(matches->block.frame, page, page_is_comp(page)
1595 ? PAGE_NEW_SUPREMUM_END
1596 : PAGE_OLD_SUPREMUM_END);
1597 matches->used = page_is_comp(page)
1598 ? PAGE_NEW_SUPREMUM_END
1599 : PAGE_OLD_SUPREMUM_END;
1600#ifdef RTR_SEARCH_DIAGNOSTIC
1601 ulint pageno = page_get_page_no(page);
1602 fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1603 static_cast<int>(pageno));
1604#endif /* RTR_SEARCH_DIAGNOSTIC */
1605}
1606
1607/****************************************************************//**
1608Get the bounding box content from an index record */
1609void
1610rtr_get_mbr_from_rec(
1611/*=================*/
1612 const rec_t* rec, /*!< in: data tuple */
1613 const ulint* offsets,/*!< in: offsets array */
1614 rtr_mbr_t* mbr) /*!< out MBR */
1615{
1616 ulint rec_f_len;
1617 const byte* data;
1618
1619 data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1620
1621 rtr_read_mbr(data, mbr);
1622}
1623
1624/****************************************************************//**
1625Get the bounding box content from a MBR data record */
1626void
1627rtr_get_mbr_from_tuple(
1628/*===================*/
1629 const dtuple_t* dtuple, /*!< in: data tuple */
1630 rtr_mbr* mbr) /*!< out: mbr to fill */
1631{
1632 const dfield_t* dtuple_field;
1633 ulint dtuple_f_len;
1634 byte* data;
1635
1636 dtuple_field = dtuple_get_nth_field(dtuple, 0);
1637 dtuple_f_len = dfield_get_len(dtuple_field);
1638 ut_a(dtuple_f_len >= 4 * sizeof(double));
1639
1640 data = static_cast<byte*>(dfield_get_data(dtuple_field));
1641
1642 rtr_read_mbr(data, mbr);
1643}
1644
1645/****************************************************************//**
1646Searches the right position in rtree for a page cursor. */
1647bool
1648rtr_cur_search_with_match(
1649/*======================*/
1650 const buf_block_t* block, /*!< in: buffer block */
1651 dict_index_t* index, /*!< in: index descriptor */
1652 const dtuple_t* tuple, /*!< in: data tuple */
1653 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_INSERT,
1654 PAGE_CUR_RTREE_LOCATE etc. */
1655 page_cur_t* cursor, /*!< in/out: page cursor */
1656 rtr_info_t* rtr_info)/*!< in/out: search stack */
1657{
1658 bool found = false;
1659 const page_t* page;
1660 const rec_t* rec;
1661 const rec_t* last_rec;
1662 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1663 ulint* offsets = offsets_;
1664 mem_heap_t* heap = NULL;
1665 int cmp = 1;
1666 double least_inc = DBL_MAX;
1667 const rec_t* best_rec;
1668 const rec_t* last_match_rec = NULL;
1669 bool match_init = false;
1670 ulint space = block->page.id.space();
1671 page_cur_mode_t orig_mode = mode;
1672 const rec_t* first_rec = NULL;
1673
1674 rec_offs_init(offsets_);
1675
1676 ut_ad(RTREE_SEARCH_MODE(mode));
1677
1678 ut_ad(dict_index_is_spatial(index));
1679
1680 page = buf_block_get_frame(block);
1681
1682 const ulint level = btr_page_get_level(page);
1683 const bool is_leaf = !level;
1684
1685 if (mode == PAGE_CUR_RTREE_LOCATE) {
1686 ut_ad(level != 0);
1687 mode = PAGE_CUR_WITHIN;
1688 }
1689
1690 rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1691
1692 last_rec = rec;
1693 best_rec = rec;
1694
1695 if (page_rec_is_infimum(rec)) {
1696 rec = page_rec_get_next_const(rec);
1697 }
1698
1699 /* Check insert tuple size is larger than first rec, and try to
1700 avoid it if possible */
1701 if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1702
1703 ulint new_rec_size = rec_get_converted_size(index, tuple, 0);
1704
1705 offsets = rec_get_offsets(rec, index, offsets, is_leaf,
1706 dtuple_get_n_fields_cmp(tuple),
1707 &heap);
1708
1709 if (rec_offs_size(offsets) < new_rec_size) {
1710 first_rec = rec;
1711 }
1712
1713 /* If this is the left-most page of this index level
1714 and the table is a compressed table, try to avoid
1715 first page as much as possible, as there will be problem
1716 when update MIN_REC rec in compress table */
1717 if (buf_block_get_page_zip(block)
1718 && !page_has_prev(page)
1719 && page_get_n_recs(page) >= 2) {
1720
1721 rec = page_rec_get_next_const(rec);
1722 }
1723 }
1724
1725 while (!page_rec_is_supremum(rec)) {
1726 offsets = rec_get_offsets(rec, index, offsets, is_leaf,
1727 dtuple_get_n_fields_cmp(tuple),
1728 &heap);
1729 if (!is_leaf) {
1730 switch (mode) {
1731 case PAGE_CUR_CONTAIN:
1732 case PAGE_CUR_INTERSECT:
1733 case PAGE_CUR_MBR_EQUAL:
1734 /* At non-leaf level, we will need to check
1735 both CONTAIN and INTERSECT for either of
1736 the search mode */
1737 cmp = cmp_dtuple_rec_with_gis(
1738 tuple, rec, offsets, PAGE_CUR_CONTAIN);
1739
1740 if (cmp != 0) {
1741 cmp = cmp_dtuple_rec_with_gis(
1742 tuple, rec, offsets,
1743 PAGE_CUR_INTERSECT);
1744 }
1745 break;
1746 case PAGE_CUR_DISJOINT:
1747 cmp = cmp_dtuple_rec_with_gis(
1748 tuple, rec, offsets, mode);
1749
1750 if (cmp != 0) {
1751 cmp = cmp_dtuple_rec_with_gis(
1752 tuple, rec, offsets,
1753 PAGE_CUR_INTERSECT);
1754 }
1755 break;
1756 case PAGE_CUR_RTREE_INSERT:
1757 double increase;
1758 double area;
1759
1760 cmp = cmp_dtuple_rec_with_gis(
1761 tuple, rec, offsets, PAGE_CUR_WITHIN);
1762
1763 if (cmp != 0) {
1764 increase = rtr_rec_cal_increase(
1765 tuple, rec, offsets, &area);
1766 /* Once it goes beyond DBL_MAX,
1767 it would not make sense to record
1768 such value, just make it
1769 DBL_MAX / 2 */
1770 if (increase >= DBL_MAX) {
1771 increase = DBL_MAX / 2;
1772 }
1773
1774 if (increase < least_inc) {
1775 least_inc = increase;
1776 best_rec = rec;
1777 } else if (best_rec
1778 && best_rec == first_rec) {
1779 /* if first_rec is set,
1780 we will try to avoid it */
1781 least_inc = increase;
1782 best_rec = rec;
1783 }
1784 }
1785 break;
1786 case PAGE_CUR_RTREE_GET_FATHER:
1787 cmp = cmp_dtuple_rec_with_gis_internal(
1788 tuple, rec, offsets);
1789 break;
1790 default:
1791 /* WITHIN etc. */
1792 cmp = cmp_dtuple_rec_with_gis(
1793 tuple, rec, offsets, mode);
1794 }
1795 } else {
1796 /* At leaf level, INSERT should translate to LE */
1797 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1798
1799 cmp = cmp_dtuple_rec_with_gis(
1800 tuple, rec, offsets, mode);
1801 }
1802
1803 if (cmp == 0) {
1804 found = true;
1805
1806 /* If located, the matching node/rec will be pushed
1807 to rtr_info->path for non-leaf nodes, or
1808 rtr_info->matches for leaf nodes */
1809 if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1810 if (!is_leaf) {
1811 ulint page_no;
1812 node_seq_t new_seq;
1813 bool is_loc;
1814
1815 is_loc = (orig_mode
1816 == PAGE_CUR_RTREE_LOCATE
1817 || orig_mode
1818 == PAGE_CUR_RTREE_GET_FATHER);
1819
1820 offsets = rec_get_offsets(
1821 rec, index, offsets, false,
1822 ULINT_UNDEFINED, &heap);
1823
1824 page_no = btr_node_ptr_get_child_page_no(
1825 rec, offsets);
1826
1827 ut_ad(level >= 1);
1828
1829 /* Get current SSN, before we insert
1830 it into the path stack */
1831 new_seq = rtr_get_current_ssn_id(index);
1832
1833 rtr_non_leaf_stack_push(
1834 rtr_info->path,
1835 page_no,
1836 new_seq, level - 1, 0,
1837 NULL, 0);
1838
1839 if (is_loc) {
1840 rtr_non_leaf_insert_stack_push(
1841 index,
1842 rtr_info->parent_path,
1843 level, page_no, block,
1844 rec, 0);
1845 }
1846
1847 if (!srv_read_only_mode
1848 && (rtr_info->need_page_lock
1849 || !is_loc)) {
1850
1851 /* Lock the page, preventing it
1852 from being shrunk */
1853 lock_place_prdt_page_lock(
1854 space, page_no, index,
1855 rtr_info->thr);
1856 }
1857 } else {
1858 ut_ad(orig_mode
1859 != PAGE_CUR_RTREE_LOCATE);
1860
1861 if (!match_init) {
1862 rtr_init_match(
1863 rtr_info->matches,
1864 block, page);
1865 match_init = true;
1866 }
1867
1868 /* Collect matched records on page */
1869 offsets = rec_get_offsets(
1870 rec, index, offsets, true,
1871 ULINT_UNDEFINED, &heap);
1872 rtr_leaf_push_match_rec(
1873 rec, rtr_info, offsets,
1874 page_is_comp(page));
1875 }
1876
1877 last_match_rec = rec;
1878 } else {
1879 /* This is the insertion case, it will break
1880 once it finds the first MBR that can accomodate
1881 the inserting rec */
1882 break;
1883 }
1884 }
1885
1886 last_rec = rec;
1887
1888 rec = page_rec_get_next_const(rec);
1889 }
1890
1891 /* All records on page are searched */
1892 if (page_rec_is_supremum(rec)) {
1893 if (!is_leaf) {
1894 if (!found) {
1895 /* No match case, if it is for insertion,
1896 then we select the record that result in
1897 least increased area */
1898 if (mode == PAGE_CUR_RTREE_INSERT) {
1899 ulint child_no;
1900 ut_ad(least_inc < DBL_MAX);
1901 offsets = rec_get_offsets(
1902 best_rec, index, offsets,
1903 false, ULINT_UNDEFINED, &heap);
1904 child_no =
1905 btr_node_ptr_get_child_page_no(
1906 best_rec, offsets);
1907
1908 rtr_non_leaf_insert_stack_push(
1909 index, rtr_info->parent_path,
1910 level, child_no, block,
1911 best_rec, least_inc);
1912
1913 page_cur_position(best_rec, block,
1914 cursor);
1915 rtr_info->mbr_adj = true;
1916 } else {
1917 /* Position at the last rec of the
1918 page, if it is not the leaf page */
1919 page_cur_position(last_rec, block,
1920 cursor);
1921 }
1922 } else {
1923 /* There are matching records, position
1924 in the last matching records */
1925 if (rtr_info) {
1926 rec = last_match_rec;
1927 page_cur_position(
1928 rec, block, cursor);
1929 }
1930 }
1931 } else if (rtr_info) {
1932 /* Leaf level, no match, position at the
1933 last (supremum) rec */
1934 if (!last_match_rec) {
1935 page_cur_position(rec, block, cursor);
1936 goto func_exit;
1937 }
1938
1939 /* There are matched records */
1940 matched_rec_t* match_rec = rtr_info->matches;
1941
1942 rtr_rec_t test_rec;
1943
1944 test_rec = match_rec->matched_recs->back();
1945#ifdef UNIV_DEBUG
1946 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1947 ulint* offsets2 = offsets_2;
1948 rec_offs_init(offsets_2);
1949
1950 ut_ad(found);
1951
1952 /* Verify the record to be positioned is the same
1953 as the last record in matched_rec vector */
1954 offsets2 = rec_get_offsets(test_rec.r_rec, index,
1955 offsets2, true,
1956 ULINT_UNDEFINED, &heap);
1957
1958 offsets = rec_get_offsets(last_match_rec, index,
1959 offsets, true,
1960 ULINT_UNDEFINED, &heap);
1961
1962 ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec,
1963 offsets2, offsets, index) == 0);
1964#endif /* UNIV_DEBUG */
1965 /* Pop the last match record and position on it */
1966 match_rec->matched_recs->pop_back();
1967 page_cur_position(test_rec.r_rec, &match_rec->block,
1968 cursor);
1969 }
1970 } else {
1971
1972 if (mode == PAGE_CUR_RTREE_INSERT) {
1973 ulint child_no;
1974 ut_ad(!last_match_rec && rec);
1975
1976 offsets = rec_get_offsets(
1977 rec, index, offsets, false,
1978 ULINT_UNDEFINED, &heap);
1979
1980 child_no = btr_node_ptr_get_child_page_no(rec, offsets);
1981
1982 rtr_non_leaf_insert_stack_push(
1983 index, rtr_info->parent_path, level, child_no,
1984 block, rec, 0);
1985
1986 } else if (rtr_info && found && !is_leaf) {
1987 rec = last_match_rec;
1988 }
1989
1990 page_cur_position(rec, block, cursor);
1991 }
1992
1993#ifdef UNIV_DEBUG
1994 /* Verify that we are positioned at the same child page as pushed in
1995 the path stack */
1996 if (!is_leaf && (!page_rec_is_supremum(rec) || found)
1997 && mode != PAGE_CUR_RTREE_INSERT) {
1998 ulint page_no;
1999
2000 offsets = rec_get_offsets(rec, index, offsets, false,
2001 ULINT_UNDEFINED, &heap);
2002 page_no = btr_node_ptr_get_child_page_no(rec, offsets);
2003
2004 if (rtr_info && found) {
2005 rtr_node_path_t* path = rtr_info->path;
2006 node_visit_t last_visit = path->back();
2007
2008 ut_ad(last_visit.page_no == page_no);
2009 }
2010 }
2011#endif /* UNIV_DEBUG */
2012
2013func_exit:
2014 if (UNIV_LIKELY_NULL(heap)) {
2015 mem_heap_free(heap);
2016 }
2017
2018 return(found);
2019}
2020