1/*****************************************************************************
2
3Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2012, Facebook Inc.
5Copyright (c) 2018, MariaDB Corporation.
6
7This program is free software; you can redistribute it and/or modify it under
8the terms of the GNU General Public License as published by the Free Software
9Foundation; version 2 of the License.
10
11This program is distributed in the hope that it will be useful, but WITHOUT
12ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License along with
16this program; if not, write to the Free Software Foundation, Inc.,
1751 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
18
19*****************************************************************************/
20
21/********************************************************************//**
22@file page/page0cur.cc
23The page cursor
24
25Created 10/4/1994 Heikki Tuuri
26*************************************************************************/
27
28#include "ha_prototypes.h"
29
30#include "page0cur.h"
31#include "page0zip.h"
32#include "btr0btr.h"
33#include "mtr0log.h"
34#include "log0recv.h"
35#include "rem0cmp.h"
36#include "gis0rtree.h"
37
38#include <algorithm>
39
40/*******************************************************************//**
41This is a linear congruential generator PRNG. Returns a pseudo random
42number between 0 and 2^64-1 inclusive. The formula and the constants
43being used are:
44X[n+1] = (a * X[n] + c) mod m
45where:
46X[0] = ut_time_us(NULL)
47a = 1103515245 (3^5 * 5 * 7 * 129749)
48c = 12345 (3 * 5 * 823)
49m = 18446744073709551616 (2^64)
50
51@return number between 0 and 2^64-1 */
52static
53ib_uint64_t
54page_cur_lcg_prng(void)
55/*===================*/
56{
57#define LCG_a 1103515245
58#define LCG_c 12345
59 static ib_uint64_t lcg_current = 0;
60 static ibool initialized = FALSE;
61
62 if (!initialized) {
63 lcg_current = (ib_uint64_t) ut_time_us(NULL);
64 initialized = TRUE;
65 }
66
67 /* no need to "% 2^64" explicitly because lcg_current is
68 64 bit and this will be done anyway */
69 lcg_current = LCG_a * lcg_current + LCG_c;
70
71 return(lcg_current);
72}
73
74#ifdef BTR_CUR_HASH_ADAPT
75# ifdef UNIV_SEARCH_PERF_STAT
76static ulint page_cur_short_succ;
77# endif /* UNIV_SEARCH_PERF_STAT */
78
79/** Try a search shortcut based on the last insert.
80@param[in] block index page
81@param[in] index index tree
82@param[in] tuple search key
83@param[in,out] iup_matched_fields already matched fields in the
84upper limit record
85@param[in,out] ilow_matched_fields already matched fields in the
86lower limit record
87@param[out] cursor page cursor
88@return true on success */
89UNIV_INLINE
90bool
91page_cur_try_search_shortcut(
92 const buf_block_t* block,
93 const dict_index_t* index,
94 const dtuple_t* tuple,
95 ulint* iup_matched_fields,
96 ulint* ilow_matched_fields,
97 page_cur_t* cursor)
98{
99 const rec_t* rec;
100 const rec_t* next_rec;
101 ulint low_match;
102 ulint up_match;
103 ibool success = FALSE;
104 const page_t* page = buf_block_get_frame(block);
105 mem_heap_t* heap = NULL;
106 ulint offsets_[REC_OFFS_NORMAL_SIZE];
107 ulint* offsets = offsets_;
108 rec_offs_init(offsets_);
109
110 ut_ad(dtuple_check_typed(tuple));
111 ut_ad(page_is_leaf(page));
112
113 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
114 offsets = rec_get_offsets(rec, index, offsets, true,
115 dtuple_get_n_fields(tuple), &heap);
116
117 ut_ad(rec);
118 ut_ad(page_rec_is_user_rec(rec));
119
120 low_match = up_match = std::min(*ilow_matched_fields,
121 *iup_matched_fields);
122
123 if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
124 goto exit_func;
125 }
126
127 next_rec = page_rec_get_next_const(rec);
128 if (!page_rec_is_supremum(next_rec)) {
129 offsets = rec_get_offsets(next_rec, index, offsets, true,
130 dtuple_get_n_fields(tuple), &heap);
131
132 if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
133 &up_match) >= 0) {
134 goto exit_func;
135 }
136
137 *iup_matched_fields = up_match;
138 }
139
140 page_cur_position(rec, block, cursor);
141
142 *ilow_matched_fields = low_match;
143
144#ifdef UNIV_SEARCH_PERF_STAT
145 page_cur_short_succ++;
146#endif
147 success = TRUE;
148exit_func:
149 if (UNIV_LIKELY_NULL(heap)) {
150 mem_heap_free(heap);
151 }
152 return(success);
153}
154
155/** Try a search shortcut based on the last insert.
156@param[in] block index page
157@param[in] index index tree
158@param[in] tuple search key
159@param[in,out] iup_matched_fields already matched fields in the
160upper limit record
161@param[in,out] iup_matched_bytes already matched bytes in the
162first partially matched field in the upper limit record
163@param[in,out] ilow_matched_fields already matched fields in the
164lower limit record
165@param[in,out] ilow_matched_bytes already matched bytes in the
166first partially matched field in the lower limit record
167@param[out] cursor page cursor
168@return true on success */
169UNIV_INLINE
170bool
171page_cur_try_search_shortcut_bytes(
172 const buf_block_t* block,
173 const dict_index_t* index,
174 const dtuple_t* tuple,
175 ulint* iup_matched_fields,
176 ulint* iup_matched_bytes,
177 ulint* ilow_matched_fields,
178 ulint* ilow_matched_bytes,
179 page_cur_t* cursor)
180{
181 const rec_t* rec;
182 const rec_t* next_rec;
183 ulint low_match;
184 ulint low_bytes;
185 ulint up_match;
186 ulint up_bytes;
187 ibool success = FALSE;
188 const page_t* page = buf_block_get_frame(block);
189 mem_heap_t* heap = NULL;
190 ulint offsets_[REC_OFFS_NORMAL_SIZE];
191 ulint* offsets = offsets_;
192 rec_offs_init(offsets_);
193
194 ut_ad(dtuple_check_typed(tuple));
195 ut_ad(page_is_leaf(page));
196
197 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
198 offsets = rec_get_offsets(rec, index, offsets, true,
199 dtuple_get_n_fields(tuple), &heap);
200
201 ut_ad(rec);
202 ut_ad(page_rec_is_user_rec(rec));
203 if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
204 *iup_matched_fields, *iup_matched_bytes) < 0) {
205 up_match = low_match = *ilow_matched_fields;
206 up_bytes = low_bytes = *ilow_matched_bytes;
207 } else {
208 up_match = low_match = *iup_matched_fields;
209 up_bytes = low_bytes = *iup_matched_bytes;
210 }
211
212 if (cmp_dtuple_rec_with_match_bytes(
213 tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
214 goto exit_func;
215 }
216
217 next_rec = page_rec_get_next_const(rec);
218 if (!page_rec_is_supremum(next_rec)) {
219 offsets = rec_get_offsets(next_rec, index, offsets, true,
220 dtuple_get_n_fields(tuple), &heap);
221
222 if (cmp_dtuple_rec_with_match_bytes(
223 tuple, next_rec, index, offsets,
224 &up_match, &up_bytes)
225 >= 0) {
226 goto exit_func;
227 }
228
229 *iup_matched_fields = up_match;
230 *iup_matched_bytes = up_bytes;
231 }
232
233 page_cur_position(rec, block, cursor);
234
235 *ilow_matched_fields = low_match;
236 *ilow_matched_bytes = low_bytes;
237
238#ifdef UNIV_SEARCH_PERF_STAT
239 page_cur_short_succ++;
240#endif
241 success = TRUE;
242exit_func:
243 if (UNIV_LIKELY_NULL(heap)) {
244 mem_heap_free(heap);
245 }
246 return(success);
247}
248#endif /* BTR_CUR_HASH_ADAPT */
249
250#ifdef PAGE_CUR_LE_OR_EXTENDS
251/****************************************************************//**
252Checks if the nth field in a record is a character type field which extends
253the nth field in tuple, i.e., the field is longer or equal in length and has
254common first characters.
255@return TRUE if rec field extends tuple field */
256static
257ibool
258page_cur_rec_field_extends(
259/*=======================*/
260 const dtuple_t* tuple, /*!< in: data tuple */
261 const rec_t* rec, /*!< in: record */
262 const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
263 ulint n) /*!< in: compare nth field */
264{
265 const dtype_t* type;
266 const dfield_t* dfield;
267 const byte* rec_f;
268 ulint rec_f_len;
269
270 ut_ad(rec_offs_validate(rec, NULL, offsets));
271 dfield = dtuple_get_nth_field(tuple, n);
272
273 type = dfield_get_type(dfield);
274
275 rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
276
277 if (type->mtype == DATA_VARCHAR
278 || type->mtype == DATA_CHAR
279 || type->mtype == DATA_FIXBINARY
280 || type->mtype == DATA_BINARY
281 || type->mtype == DATA_BLOB
282 || DATA_GEOMETRY_MTYPE(type->mtype)
283 || type->mtype == DATA_VARMYSQL
284 || type->mtype == DATA_MYSQL) {
285
286 if (dfield_get_len(dfield) != UNIV_SQL_NULL
287 && rec_f_len != UNIV_SQL_NULL
288 && rec_f_len >= dfield_get_len(dfield)
289 && !cmp_data_data(type->mtype, type->prtype,
290 dfield_get_data(dfield),
291 dfield_get_len(dfield),
292 rec_f, dfield_get_len(dfield))) {
293
294 return(TRUE);
295 }
296 }
297
298 return(FALSE);
299}
300#endif /* PAGE_CUR_LE_OR_EXTENDS */
301
302/****************************************************************//**
303Searches the right position for a page cursor. */
304void
305page_cur_search_with_match(
306/*=======================*/
307 const buf_block_t* block, /*!< in: buffer block */
308 const dict_index_t* index, /*!< in/out: record descriptor */
309 const dtuple_t* tuple, /*!< in: data tuple */
310 page_cur_mode_t mode, /*!< in: PAGE_CUR_L,
311 PAGE_CUR_LE, PAGE_CUR_G, or
312 PAGE_CUR_GE */
313 ulint* iup_matched_fields,
314 /*!< in/out: already matched
315 fields in upper limit record */
316 ulint* ilow_matched_fields,
317 /*!< in/out: already matched
318 fields in lower limit record */
319 page_cur_t* cursor, /*!< out: page cursor */
320 rtr_info_t* rtr_info)/*!< in/out: rtree search stack */
321{
322 ulint up;
323 ulint low;
324 ulint mid;
325 const page_t* page;
326 const page_dir_slot_t* slot;
327 const rec_t* up_rec;
328 const rec_t* low_rec;
329 const rec_t* mid_rec;
330 ulint up_matched_fields;
331 ulint low_matched_fields;
332 ulint cur_matched_fields;
333 int cmp;
334#ifdef UNIV_ZIP_DEBUG
335 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
336#endif /* UNIV_ZIP_DEBUG */
337 mem_heap_t* heap = NULL;
338 ulint offsets_[REC_OFFS_NORMAL_SIZE];
339 ulint* offsets = offsets_;
340 rec_offs_init(offsets_);
341
342 ut_ad(dtuple_validate(tuple));
343#ifdef UNIV_DEBUG
344# ifdef PAGE_CUR_DBG
345 if (mode != PAGE_CUR_DBG)
346# endif /* PAGE_CUR_DBG */
347# ifdef PAGE_CUR_LE_OR_EXTENDS
348 if (mode != PAGE_CUR_LE_OR_EXTENDS)
349# endif /* PAGE_CUR_LE_OR_EXTENDS */
350 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
351 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
352 || dict_index_is_spatial(index));
353#endif /* UNIV_DEBUG */
354 page = buf_block_get_frame(block);
355#ifdef UNIV_ZIP_DEBUG
356 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
357#endif /* UNIV_ZIP_DEBUG */
358
359 ut_d(page_check_dir(page));
360 const bool is_leaf = page_is_leaf(page);
361
362#ifdef BTR_CUR_HASH_ADAPT
363 if (is_leaf
364 && page_get_direction(page) == PAGE_RIGHT
365 && page_header_get_offs(page, PAGE_LAST_INSERT)
366 && mode == PAGE_CUR_LE
367 && !dict_index_is_spatial(index)
368 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
369 && page_cur_try_search_shortcut(
370 block, index, tuple,
371 iup_matched_fields, ilow_matched_fields, cursor)) {
372 return;
373 }
374# ifdef PAGE_CUR_DBG
375 if (mode == PAGE_CUR_DBG) {
376 mode = PAGE_CUR_LE;
377 }
378# endif
379#endif /* BTR_CUR_HASH_ADAPT */
380
381 /* If the mode is for R-tree indexes, use the special MBR
382 related compare functions */
383 if (dict_index_is_spatial(index) && mode > PAGE_CUR_LE) {
384 /* For leaf level insert, we still use the traditional
385 compare function for now */
386 if (mode == PAGE_CUR_RTREE_INSERT && is_leaf) {
387 mode = PAGE_CUR_LE;
388 } else {
389 rtr_cur_search_with_match(
390 block, (dict_index_t*)index, tuple, mode,
391 cursor, rtr_info);
392 return;
393 }
394 }
395
396 /* The following flag does not work for non-latin1 char sets because
397 cmp_full_field does not tell how many bytes matched */
398#ifdef PAGE_CUR_LE_OR_EXTENDS
399 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
400#endif /* PAGE_CUR_LE_OR_EXTENDS */
401
402 /* If mode PAGE_CUR_G is specified, we are trying to position the
403 cursor to answer a query of the form "tuple < X", where tuple is
404 the input parameter, and X denotes an arbitrary physical record on
405 the page. We want to position the cursor on the first X which
406 satisfies the condition. */
407
408 up_matched_fields = *iup_matched_fields;
409 low_matched_fields = *ilow_matched_fields;
410
411 /* Perform binary search. First the search is done through the page
412 directory, after that as a linear search in the list of records
413 owned by the upper limit directory slot. */
414
415 low = 0;
416 up = ulint(page_dir_get_n_slots(page)) - 1;
417
418 /* Perform binary search until the lower and upper limit directory
419 slots come to the distance 1 of each other */
420
421 while (up - low > 1) {
422 mid = (low + up) / 2;
423 slot = page_dir_get_nth_slot(page, mid);
424 mid_rec = page_dir_slot_get_rec(slot);
425
426 cur_matched_fields = std::min(low_matched_fields,
427 up_matched_fields);
428
429 offsets = offsets_;
430 offsets = rec_get_offsets(
431 mid_rec, index, offsets, is_leaf,
432 dtuple_get_n_fields_cmp(tuple), &heap);
433
434 cmp = cmp_dtuple_rec_with_match(
435 tuple, mid_rec, offsets, &cur_matched_fields);
436
437 if (cmp > 0) {
438low_slot_match:
439 low = mid;
440 low_matched_fields = cur_matched_fields;
441
442 } else if (cmp) {
443#ifdef PAGE_CUR_LE_OR_EXTENDS
444 if (mode == PAGE_CUR_LE_OR_EXTENDS
445 && page_cur_rec_field_extends(
446 tuple, mid_rec, offsets,
447 cur_matched_fields)) {
448
449 goto low_slot_match;
450 }
451#endif /* PAGE_CUR_LE_OR_EXTENDS */
452up_slot_match:
453 up = mid;
454 up_matched_fields = cur_matched_fields;
455
456 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
457#ifdef PAGE_CUR_LE_OR_EXTENDS
458 || mode == PAGE_CUR_LE_OR_EXTENDS
459#endif /* PAGE_CUR_LE_OR_EXTENDS */
460 ) {
461 goto low_slot_match;
462 } else {
463
464 goto up_slot_match;
465 }
466 }
467
468 slot = page_dir_get_nth_slot(page, low);
469 low_rec = page_dir_slot_get_rec(slot);
470 slot = page_dir_get_nth_slot(page, up);
471 up_rec = page_dir_slot_get_rec(slot);
472
473 /* Perform linear search until the upper and lower records come to
474 distance 1 of each other. */
475
476 while (page_rec_get_next_const(low_rec) != up_rec) {
477
478 mid_rec = page_rec_get_next_const(low_rec);
479
480 cur_matched_fields = std::min(low_matched_fields,
481 up_matched_fields);
482
483 offsets = offsets_;
484 offsets = rec_get_offsets(
485 mid_rec, index, offsets, is_leaf,
486 dtuple_get_n_fields_cmp(tuple), &heap);
487
488 cmp = cmp_dtuple_rec_with_match(
489 tuple, mid_rec, offsets, &cur_matched_fields);
490
491 if (cmp > 0) {
492low_rec_match:
493 low_rec = mid_rec;
494 low_matched_fields = cur_matched_fields;
495
496 } else if (cmp) {
497#ifdef PAGE_CUR_LE_OR_EXTENDS
498 if (mode == PAGE_CUR_LE_OR_EXTENDS
499 && page_cur_rec_field_extends(
500 tuple, mid_rec, offsets,
501 cur_matched_fields)) {
502
503 goto low_rec_match;
504 }
505#endif /* PAGE_CUR_LE_OR_EXTENDS */
506up_rec_match:
507 up_rec = mid_rec;
508 up_matched_fields = cur_matched_fields;
509 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
510#ifdef PAGE_CUR_LE_OR_EXTENDS
511 || mode == PAGE_CUR_LE_OR_EXTENDS
512#endif /* PAGE_CUR_LE_OR_EXTENDS */
513 ) {
514 if (!cmp && !cur_matched_fields) {
515#ifdef UNIV_DEBUG
516 mtr_t mtr;
517 mtr_start(&mtr);
518
519 /* We got a match, but cur_matched_fields is
520 0, it must have REC_INFO_MIN_REC_FLAG */
521 ulint rec_info = rec_get_info_bits(mid_rec,
522 rec_offs_comp(offsets));
523 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
524 ut_ad(!page_has_prev(page));
525 mtr_commit(&mtr);
526#endif
527
528 cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
529 }
530
531 goto low_rec_match;
532 } else {
533
534 goto up_rec_match;
535 }
536 }
537
538 if (mode <= PAGE_CUR_GE) {
539 page_cur_position(up_rec, block, cursor);
540 } else {
541 page_cur_position(low_rec, block, cursor);
542 }
543
544 *iup_matched_fields = up_matched_fields;
545 *ilow_matched_fields = low_matched_fields;
546 if (UNIV_LIKELY_NULL(heap)) {
547 mem_heap_free(heap);
548 }
549}
550
551#ifdef BTR_CUR_HASH_ADAPT
552/** Search the right position for a page cursor.
553@param[in] block buffer block
554@param[in] index index tree
555@param[in] tuple key to be searched for
556@param[in] mode search mode
557@param[in,out] iup_matched_fields already matched fields in the
558upper limit record
559@param[in,out] iup_matched_bytes already matched bytes in the
560first partially matched field in the upper limit record
561@param[in,out] ilow_matched_fields already matched fields in the
562lower limit record
563@param[in,out] ilow_matched_bytes already matched bytes in the
564first partially matched field in the lower limit record
565@param[out] cursor page cursor */
566void
567page_cur_search_with_match_bytes(
568 const buf_block_t* block,
569 const dict_index_t* index,
570 const dtuple_t* tuple,
571 page_cur_mode_t mode,
572 ulint* iup_matched_fields,
573 ulint* iup_matched_bytes,
574 ulint* ilow_matched_fields,
575 ulint* ilow_matched_bytes,
576 page_cur_t* cursor)
577{
578 ulint up;
579 ulint low;
580 ulint mid;
581 const page_t* page;
582 const page_dir_slot_t* slot;
583 const rec_t* up_rec;
584 const rec_t* low_rec;
585 const rec_t* mid_rec;
586 ulint up_matched_fields;
587 ulint up_matched_bytes;
588 ulint low_matched_fields;
589 ulint low_matched_bytes;
590 ulint cur_matched_fields;
591 ulint cur_matched_bytes;
592 int cmp;
593#ifdef UNIV_ZIP_DEBUG
594 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
595#endif /* UNIV_ZIP_DEBUG */
596 mem_heap_t* heap = NULL;
597 ulint offsets_[REC_OFFS_NORMAL_SIZE];
598 ulint* offsets = offsets_;
599 rec_offs_init(offsets_);
600
601 ut_ad(dtuple_validate(tuple));
602 ut_ad(!(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
603#ifdef UNIV_DEBUG
604# ifdef PAGE_CUR_DBG
605 if (mode != PAGE_CUR_DBG)
606# endif /* PAGE_CUR_DBG */
607# ifdef PAGE_CUR_LE_OR_EXTENDS
608 if (mode != PAGE_CUR_LE_OR_EXTENDS)
609# endif /* PAGE_CUR_LE_OR_EXTENDS */
610 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
611 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
612#endif /* UNIV_DEBUG */
613 page = buf_block_get_frame(block);
614#ifdef UNIV_ZIP_DEBUG
615 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
616#endif /* UNIV_ZIP_DEBUG */
617
618 ut_d(page_check_dir(page));
619
620#ifdef BTR_CUR_HASH_ADAPT
621 if (page_is_leaf(page)
622 && page_get_direction(page) == PAGE_RIGHT
623 && page_header_get_offs(page, PAGE_LAST_INSERT)
624 && mode == PAGE_CUR_LE
625 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
626 && page_cur_try_search_shortcut_bytes(
627 block, index, tuple,
628 iup_matched_fields, iup_matched_bytes,
629 ilow_matched_fields, ilow_matched_bytes,
630 cursor)) {
631 return;
632 }
633# ifdef PAGE_CUR_DBG
634 if (mode == PAGE_CUR_DBG) {
635 mode = PAGE_CUR_LE;
636 }
637# endif
638#endif /* BTR_CUR_HASH_ADAPT */
639
640 /* The following flag does not work for non-latin1 char sets because
641 cmp_full_field does not tell how many bytes matched */
642#ifdef PAGE_CUR_LE_OR_EXTENDS
643 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
644#endif /* PAGE_CUR_LE_OR_EXTENDS */
645
646 /* If mode PAGE_CUR_G is specified, we are trying to position the
647 cursor to answer a query of the form "tuple < X", where tuple is
648 the input parameter, and X denotes an arbitrary physical record on
649 the page. We want to position the cursor on the first X which
650 satisfies the condition. */
651
652 up_matched_fields = *iup_matched_fields;
653 up_matched_bytes = *iup_matched_bytes;
654 low_matched_fields = *ilow_matched_fields;
655 low_matched_bytes = *ilow_matched_bytes;
656
657 /* Perform binary search. First the search is done through the page
658 directory, after that as a linear search in the list of records
659 owned by the upper limit directory slot. */
660
661 low = 0;
662 up = ulint(page_dir_get_n_slots(page)) - 1;
663
664 /* Perform binary search until the lower and upper limit directory
665 slots come to the distance 1 of each other */
666 const bool is_leaf = page_is_leaf(page);
667
668 while (up - low > 1) {
669 mid = (low + up) / 2;
670 slot = page_dir_get_nth_slot(page, mid);
671 mid_rec = page_dir_slot_get_rec(slot);
672
673 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
674 low_matched_fields, low_matched_bytes,
675 up_matched_fields, up_matched_bytes);
676
677 offsets = rec_get_offsets(
678 mid_rec, index, offsets_, is_leaf,
679 dtuple_get_n_fields_cmp(tuple), &heap);
680
681 cmp = cmp_dtuple_rec_with_match_bytes(
682 tuple, mid_rec, index, offsets,
683 &cur_matched_fields, &cur_matched_bytes);
684
685 if (cmp > 0) {
686low_slot_match:
687 low = mid;
688 low_matched_fields = cur_matched_fields;
689 low_matched_bytes = cur_matched_bytes;
690
691 } else if (cmp) {
692#ifdef PAGE_CUR_LE_OR_EXTENDS
693 if (mode == PAGE_CUR_LE_OR_EXTENDS
694 && page_cur_rec_field_extends(
695 tuple, mid_rec, offsets,
696 cur_matched_fields)) {
697
698 goto low_slot_match;
699 }
700#endif /* PAGE_CUR_LE_OR_EXTENDS */
701up_slot_match:
702 up = mid;
703 up_matched_fields = cur_matched_fields;
704 up_matched_bytes = cur_matched_bytes;
705
706 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
707#ifdef PAGE_CUR_LE_OR_EXTENDS
708 || mode == PAGE_CUR_LE_OR_EXTENDS
709#endif /* PAGE_CUR_LE_OR_EXTENDS */
710 ) {
711 goto low_slot_match;
712 } else {
713
714 goto up_slot_match;
715 }
716 }
717
718 slot = page_dir_get_nth_slot(page, low);
719 low_rec = page_dir_slot_get_rec(slot);
720 slot = page_dir_get_nth_slot(page, up);
721 up_rec = page_dir_slot_get_rec(slot);
722
723 /* Perform linear search until the upper and lower records come to
724 distance 1 of each other. */
725
726 while (page_rec_get_next_const(low_rec) != up_rec) {
727
728 mid_rec = page_rec_get_next_const(low_rec);
729
730 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
731 low_matched_fields, low_matched_bytes,
732 up_matched_fields, up_matched_bytes);
733
734 if (UNIV_UNLIKELY(rec_get_info_bits(
735 mid_rec,
736 dict_table_is_comp(index->table))
737 & REC_INFO_MIN_REC_FLAG)) {
738 ut_ad(!page_has_prev(page_align(mid_rec)));
739 ut_ad(!page_rec_is_leaf(mid_rec)
740 || rec_is_default_row(mid_rec, index));
741 cmp = 1;
742 goto low_rec_match;
743 }
744
745 offsets = rec_get_offsets(
746 mid_rec, index, offsets_, is_leaf,
747 dtuple_get_n_fields_cmp(tuple), &heap);
748
749 cmp = cmp_dtuple_rec_with_match_bytes(
750 tuple, mid_rec, index, offsets,
751 &cur_matched_fields, &cur_matched_bytes);
752
753 if (cmp > 0) {
754low_rec_match:
755 low_rec = mid_rec;
756 low_matched_fields = cur_matched_fields;
757 low_matched_bytes = cur_matched_bytes;
758
759 } else if (cmp) {
760#ifdef PAGE_CUR_LE_OR_EXTENDS
761 if (mode == PAGE_CUR_LE_OR_EXTENDS
762 && page_cur_rec_field_extends(
763 tuple, mid_rec, offsets,
764 cur_matched_fields)) {
765
766 goto low_rec_match;
767 }
768#endif /* PAGE_CUR_LE_OR_EXTENDS */
769up_rec_match:
770 up_rec = mid_rec;
771 up_matched_fields = cur_matched_fields;
772 up_matched_bytes = cur_matched_bytes;
773 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
774#ifdef PAGE_CUR_LE_OR_EXTENDS
775 || mode == PAGE_CUR_LE_OR_EXTENDS
776#endif /* PAGE_CUR_LE_OR_EXTENDS */
777 ) {
778 goto low_rec_match;
779 } else {
780
781 goto up_rec_match;
782 }
783 }
784
785 if (mode <= PAGE_CUR_GE) {
786 page_cur_position(up_rec, block, cursor);
787 } else {
788 page_cur_position(low_rec, block, cursor);
789 }
790
791 *iup_matched_fields = up_matched_fields;
792 *iup_matched_bytes = up_matched_bytes;
793 *ilow_matched_fields = low_matched_fields;
794 *ilow_matched_bytes = low_matched_bytes;
795 if (UNIV_LIKELY_NULL(heap)) {
796 mem_heap_free(heap);
797 }
798}
799#endif /* BTR_CUR_HASH_ADAPT */
800
801/***********************************************************//**
802Positions a page cursor on a randomly chosen user record on a page. If there
803are no user records, sets the cursor on the infimum record. */
804void
805page_cur_open_on_rnd_user_rec(
806/*==========================*/
807 buf_block_t* block, /*!< in: page */
808 page_cur_t* cursor) /*!< out: page cursor */
809{
810 ulint rnd;
811 ulint n_recs = page_get_n_recs(buf_block_get_frame(block));
812
813 page_cur_set_before_first(block, cursor);
814
815 if (UNIV_UNLIKELY(n_recs == 0)) {
816
817 return;
818 }
819
820 rnd = (ulint) (page_cur_lcg_prng() % n_recs);
821
822 do {
823 page_cur_move_to_next(cursor);
824 } while (rnd--);
825}
826
827/***********************************************************//**
828Writes the log record of a record insert on a page. */
829static
830void
831page_cur_insert_rec_write_log(
832/*==========================*/
833 rec_t* insert_rec, /*!< in: inserted physical record */
834 ulint rec_size, /*!< in: insert_rec size */
835 rec_t* cursor_rec, /*!< in: record the
836 cursor is pointing to */
837 dict_index_t* index, /*!< in: record descriptor */
838 mtr_t* mtr) /*!< in: mini-transaction handle */
839{
840 ulint cur_rec_size;
841 ulint extra_size;
842 ulint cur_extra_size;
843 const byte* ins_ptr;
844 const byte* log_end;
845 ulint i;
846
847 if (index->table->is_temporary()) {
848 ut_ad(!mlog_open(mtr, 0));
849 return;
850 }
851
852 ut_a(rec_size < srv_page_size);
853 ut_ad(mtr->is_named_space(index->table->space));
854 ut_ad(page_align(insert_rec) == page_align(cursor_rec));
855 ut_ad(!page_rec_is_comp(insert_rec)
856 == !dict_table_is_comp(index->table));
857
858 const bool is_leaf = page_rec_is_leaf(cursor_rec);
859
860 {
861 mem_heap_t* heap = NULL;
862 ulint cur_offs_[REC_OFFS_NORMAL_SIZE];
863 ulint ins_offs_[REC_OFFS_NORMAL_SIZE];
864
865 ulint* cur_offs;
866 ulint* ins_offs;
867
868 rec_offs_init(cur_offs_);
869 rec_offs_init(ins_offs_);
870
871 cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
872 is_leaf, ULINT_UNDEFINED, &heap);
873 ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
874 is_leaf, ULINT_UNDEFINED, &heap);
875
876 extra_size = rec_offs_extra_size(ins_offs);
877 cur_extra_size = rec_offs_extra_size(cur_offs);
878 ut_ad(rec_size == rec_offs_size(ins_offs));
879 cur_rec_size = rec_offs_size(cur_offs);
880
881 if (UNIV_LIKELY_NULL(heap)) {
882 mem_heap_free(heap);
883 }
884 }
885
886 ins_ptr = insert_rec - extra_size;
887
888 i = 0;
889
890 if (cur_extra_size == extra_size) {
891 ulint min_rec_size = ut_min(cur_rec_size, rec_size);
892
893 const byte* cur_ptr = cursor_rec - cur_extra_size;
894
895 /* Find out the first byte in insert_rec which differs from
896 cursor_rec; skip the bytes in the record info */
897
898 do {
899 if (*ins_ptr == *cur_ptr) {
900 i++;
901 ins_ptr++;
902 cur_ptr++;
903 } else if ((i < extra_size)
904 && (i >= extra_size
905 - page_rec_get_base_extra_size
906 (insert_rec))) {
907 i = extra_size;
908 ins_ptr = insert_rec;
909 cur_ptr = cursor_rec;
910 } else {
911 break;
912 }
913 } while (i < min_rec_size);
914 }
915
916 byte* log_ptr;
917
918 if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
919
920 if (page_rec_is_comp(insert_rec)) {
921 log_ptr = mlog_open_and_write_index(
922 mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
923 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
924 if (UNIV_UNLIKELY(!log_ptr)) {
925 /* Logging in mtr is switched off
926 during crash recovery: in that case
927 mlog_open returns NULL */
928 return;
929 }
930 } else {
931 log_ptr = mlog_open(mtr, 11
932 + 2 + 5 + 1 + 5 + 5
933 + MLOG_BUF_MARGIN);
934 if (UNIV_UNLIKELY(!log_ptr)) {
935 /* Logging in mtr is switched off
936 during crash recovery: in that case
937 mlog_open returns NULL */
938 return;
939 }
940
941 log_ptr = mlog_write_initial_log_record_fast(
942 insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
943 }
944
945 log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
946 /* Write the cursor rec offset as a 2-byte ulint */
947 mach_write_to_2(log_ptr, page_offset(cursor_rec));
948 log_ptr += 2;
949 } else {
950 log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
951 if (!log_ptr) {
952 /* Logging in mtr is switched off during crash
953 recovery: in that case mlog_open returns NULL */
954 return;
955 }
956 log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
957 }
958
959 if (page_rec_is_comp(insert_rec)) {
960 if (UNIV_UNLIKELY
961 (rec_get_info_and_status_bits(insert_rec, TRUE)
962 != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
963
964 goto need_extra_info;
965 }
966 } else {
967 if (UNIV_UNLIKELY
968 (rec_get_info_and_status_bits(insert_rec, FALSE)
969 != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
970
971 goto need_extra_info;
972 }
973 }
974
975 if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
976need_extra_info:
977 /* Write the record end segment length
978 and the extra info storage flag */
979 log_ptr += mach_write_compressed(log_ptr,
980 2 * (rec_size - i) + 1);
981
982 /* Write the info bits */
983 mach_write_to_1(log_ptr,
984 rec_get_info_and_status_bits(
985 insert_rec,
986 page_rec_is_comp(insert_rec)));
987 log_ptr++;
988
989 /* Write the record origin offset */
990 log_ptr += mach_write_compressed(log_ptr, extra_size);
991
992 /* Write the mismatch index */
993 log_ptr += mach_write_compressed(log_ptr, i);
994
995 ut_a(i < srv_page_size);
996 ut_a(extra_size < srv_page_size);
997 } else {
998 /* Write the record end segment length
999 and the extra info storage flag */
1000 log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
1001 }
1002
1003 /* Write to the log the inserted index record end segment which
1004 differs from the cursor record */
1005
1006 rec_size -= i;
1007
1008 if (log_ptr + rec_size <= log_end) {
1009 memcpy(log_ptr, ins_ptr, rec_size);
1010 mlog_close(mtr, log_ptr + rec_size);
1011 } else {
1012 mlog_close(mtr, log_ptr);
1013 ut_a(rec_size < srv_page_size);
1014 mlog_catenate_string(mtr, ins_ptr, rec_size);
1015 }
1016}
1017
1018/***********************************************************//**
1019Parses a log record of a record insert on a page.
1020@return end of log record or NULL */
1021byte*
1022page_cur_parse_insert_rec(
1023/*======================*/
1024 ibool is_short,/*!< in: TRUE if short inserts */
1025 const byte* ptr, /*!< in: buffer */
1026 const byte* end_ptr,/*!< in: buffer end */
1027 buf_block_t* block, /*!< in: page or NULL */
1028 dict_index_t* index, /*!< in: record descriptor */
1029 mtr_t* mtr) /*!< in: mtr or NULL */
1030{
1031 ulint origin_offset = 0; /* remove warning */
1032 ulint end_seg_len;
1033 ulint mismatch_index = 0; /* remove warning */
1034 page_t* page;
1035 rec_t* cursor_rec;
1036 byte buf1[1024];
1037 byte* buf;
1038 const byte* ptr2 = ptr;
1039 ulint info_and_status_bits = 0; /* remove warning */
1040 page_cur_t cursor;
1041 mem_heap_t* heap = NULL;
1042 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1043 ulint* offsets = offsets_;
1044 rec_offs_init(offsets_);
1045
1046 page = block ? buf_block_get_frame(block) : NULL;
1047
1048 if (is_short) {
1049 cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
1050 } else {
1051 ulint offset;
1052
1053 /* Read the cursor rec offset as a 2-byte ulint */
1054
1055 if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
1056
1057 return(NULL);
1058 }
1059
1060 offset = mach_read_from_2(ptr);
1061 ptr += 2;
1062
1063 cursor_rec = page + offset;
1064
1065 if (offset >= srv_page_size) {
1066
1067 recv_sys->found_corrupt_log = TRUE;
1068
1069 return(NULL);
1070 }
1071 }
1072
1073 end_seg_len = mach_parse_compressed(&ptr, end_ptr);
1074
1075 if (ptr == NULL) {
1076
1077 return(NULL);
1078 }
1079
1080 if (end_seg_len >= srv_page_size << 1) {
1081 recv_sys->found_corrupt_log = TRUE;
1082
1083 return(NULL);
1084 }
1085
1086 if (end_seg_len & 0x1UL) {
1087 /* Read the info bits */
1088
1089 if (end_ptr < ptr + 1) {
1090
1091 return(NULL);
1092 }
1093
1094 info_and_status_bits = mach_read_from_1(ptr);
1095 ptr++;
1096
1097 origin_offset = mach_parse_compressed(&ptr, end_ptr);
1098
1099 if (ptr == NULL) {
1100
1101 return(NULL);
1102 }
1103
1104 ut_a(origin_offset < srv_page_size);
1105
1106 mismatch_index = mach_parse_compressed(&ptr, end_ptr);
1107
1108 if (ptr == NULL) {
1109
1110 return(NULL);
1111 }
1112
1113 ut_a(mismatch_index < srv_page_size);
1114 }
1115
1116 if (end_ptr < ptr + (end_seg_len >> 1)) {
1117
1118 return(NULL);
1119 }
1120
1121 if (!block) {
1122
1123 return(const_cast<byte*>(ptr + (end_seg_len >> 1)));
1124 }
1125
1126 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1127 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1128
1129 /* Read from the log the inserted index record end segment which
1130 differs from the cursor record */
1131
1132 const bool is_leaf = page_is_leaf(page);
1133
1134 offsets = rec_get_offsets(cursor_rec, index, offsets, is_leaf,
1135 ULINT_UNDEFINED, &heap);
1136
1137 if (!(end_seg_len & 0x1UL)) {
1138 info_and_status_bits = rec_get_info_and_status_bits(
1139 cursor_rec, page_is_comp(page));
1140 origin_offset = rec_offs_extra_size(offsets);
1141 mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
1142 }
1143
1144 end_seg_len >>= 1;
1145
1146 if (mismatch_index + end_seg_len < sizeof buf1) {
1147 buf = buf1;
1148 } else {
1149 buf = static_cast<byte*>(
1150 ut_malloc_nokey(mismatch_index + end_seg_len));
1151 }
1152
1153 /* Build the inserted record to buf */
1154
1155 if (UNIV_UNLIKELY(mismatch_index >= srv_page_size)) {
1156
1157 ib::fatal() << "is_short " << is_short << ", "
1158 << "info_and_status_bits " << info_and_status_bits
1159 << ", offset " << page_offset(cursor_rec) << ","
1160 " o_offset " << origin_offset << ", mismatch index "
1161 << mismatch_index << ", end_seg_len " << end_seg_len
1162 << " parsed len " << (ptr - ptr2);
1163 }
1164
1165 ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
1166 ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
1167
1168 if (page_is_comp(page)) {
1169 rec_set_heap_no_new(buf + origin_offset,
1170 PAGE_HEAP_NO_USER_LOW);
1171 rec_set_info_and_status_bits(buf + origin_offset,
1172 info_and_status_bits);
1173 } else {
1174 rec_set_heap_no_old(buf + origin_offset,
1175 PAGE_HEAP_NO_USER_LOW);
1176 rec_set_info_bits_old(buf + origin_offset,
1177 info_and_status_bits);
1178 }
1179
1180 page_cur_position(cursor_rec, block, &cursor);
1181
1182 offsets = rec_get_offsets(buf + origin_offset, index, offsets,
1183 is_leaf, ULINT_UNDEFINED, &heap);
1184 if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
1185 buf + origin_offset,
1186 index, offsets, mtr))) {
1187 /* The redo log record should only have been written
1188 after the write was successful. */
1189 ut_error;
1190 }
1191
1192 if (buf != buf1) {
1193
1194 ut_free(buf);
1195 }
1196
1197 if (UNIV_LIKELY_NULL(heap)) {
1198 mem_heap_free(heap);
1199 }
1200
1201 return(const_cast<byte*>(ptr + end_seg_len));
1202}
1203
1204/** Reset PAGE_DIRECTION and PAGE_N_DIRECTION.
1205@param[in,out] ptr the PAGE_DIRECTION_B field
1206@param[in,out] page index tree page frame
1207@param[in] page_zip compressed page descriptor, or NULL */
1208static inline
1209void
1210page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip)
1211{
1212 ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1213 page_ptr_set_direction(ptr, PAGE_NO_DIRECTION);
1214 if (page_zip) {
1215 page_zip_write_header(page_zip, ptr, 1, NULL);
1216 }
1217 ptr = PAGE_HEADER + PAGE_N_DIRECTION + page;
1218 *reinterpret_cast<uint16_t*>(ptr) = 0;
1219 if (page_zip) {
1220 page_zip_write_header(page_zip, ptr, 2, NULL);
1221 }
1222}
1223
1224/** Increment PAGE_N_DIRECTION.
1225@param[in,out] ptr the PAGE_DIRECTION_B field
1226@param[in,out] page index tree page frame
1227@param[in] page_zip compressed page descriptor, or NULL
1228@param[in] dir PAGE_RIGHT or PAGE_LEFT */
1229static inline
1230void
1231page_direction_increment(
1232 byte* ptr,
1233 page_t* page,
1234 page_zip_des_t* page_zip,
1235 uint dir)
1236{
1237 ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1238 ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT);
1239 page_ptr_set_direction(ptr, dir);
1240 if (page_zip) {
1241 page_zip_write_header(page_zip, ptr, 1, NULL);
1242 }
1243 page_header_set_field(
1244 page, page_zip, PAGE_N_DIRECTION,
1245 1U + page_header_get_field(page, PAGE_N_DIRECTION));
1246}
1247
1248/***********************************************************//**
1249Inserts a record next to page cursor on an uncompressed page.
1250Returns pointer to inserted record if succeed, i.e., enough
1251space available, NULL otherwise. The cursor stays at the same position.
1252@return pointer to record if succeed, NULL otherwise */
1253rec_t*
1254page_cur_insert_rec_low(
1255/*====================*/
1256 rec_t* current_rec,/*!< in: pointer to current record after
1257 which the new record is inserted */
1258 dict_index_t* index, /*!< in: record descriptor */
1259 const rec_t* rec, /*!< in: pointer to a physical record */
1260 ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1261 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1262{
1263 byte* insert_buf;
1264 ulint rec_size;
1265 page_t* page; /*!< the relevant page */
1266 rec_t* last_insert; /*!< cursor position at previous
1267 insert */
1268 rec_t* free_rec; /*!< a free record that was reused,
1269 or NULL */
1270 rec_t* insert_rec; /*!< inserted record */
1271 ulint heap_no; /*!< heap number of the inserted
1272 record */
1273
1274 ut_ad(rec_offs_validate(rec, index, offsets));
1275
1276 page = page_align(current_rec);
1277 ut_ad(dict_table_is_comp(index->table)
1278 == (ibool) !!page_is_comp(page));
1279 ut_ad(fil_page_index_page_check(page));
1280 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1281 || index->is_dummy
1282 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1283
1284 ut_ad(!page_rec_is_supremum(current_rec));
1285
1286 /* 1. Get the size of the physical record in the page */
1287 rec_size = rec_offs_size(offsets);
1288
1289#ifdef UNIV_DEBUG_VALGRIND
1290 {
1291 const void* rec_start
1292 = rec - rec_offs_extra_size(offsets);
1293 ulint extra_size
1294 = rec_offs_extra_size(offsets)
1295 - (rec_offs_comp(offsets)
1296 ? REC_N_NEW_EXTRA_BYTES
1297 : REC_N_OLD_EXTRA_BYTES);
1298
1299 /* All data bytes of the record must be valid. */
1300 UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1301 /* The variable-length header must be valid. */
1302 UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1303 }
1304#endif /* UNIV_DEBUG_VALGRIND */
1305
1306 /* 2. Try to find suitable space from page memory management */
1307
1308 free_rec = page_header_get_ptr(page, PAGE_FREE);
1309 if (UNIV_LIKELY_NULL(free_rec)) {
1310 /* Try to allocate from the head of the free list. */
1311 ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1312 ulint* foffsets = foffsets_;
1313 mem_heap_t* heap = NULL;
1314
1315 rec_offs_init(foffsets_);
1316
1317 foffsets = rec_get_offsets(
1318 free_rec, index, foffsets, page_is_leaf(page),
1319 ULINT_UNDEFINED, &heap);
1320 if (rec_offs_size(foffsets) < rec_size) {
1321 if (UNIV_LIKELY_NULL(heap)) {
1322 mem_heap_free(heap);
1323 }
1324
1325 goto use_heap;
1326 }
1327
1328 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1329
1330 if (page_is_comp(page)) {
1331 heap_no = rec_get_heap_no_new(free_rec);
1332 page_mem_alloc_free(page, NULL,
1333 rec_get_next_ptr(free_rec, TRUE),
1334 rec_size);
1335 } else {
1336 heap_no = rec_get_heap_no_old(free_rec);
1337 page_mem_alloc_free(page, NULL,
1338 rec_get_next_ptr(free_rec, FALSE),
1339 rec_size);
1340 }
1341
1342 if (UNIV_LIKELY_NULL(heap)) {
1343 mem_heap_free(heap);
1344 }
1345 } else {
1346use_heap:
1347 free_rec = NULL;
1348 insert_buf = page_mem_alloc_heap(page, NULL,
1349 rec_size, &heap_no);
1350
1351 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1352 return(NULL);
1353 }
1354 }
1355
1356 /* 3. Create the record */
1357 insert_rec = rec_copy(insert_buf, rec, offsets);
1358 rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1359
1360 /* 4. Insert the record in the linked list of records */
1361 ut_ad(current_rec != insert_rec);
1362
1363 {
1364 /* next record after current before the insertion */
1365 rec_t* next_rec = page_rec_get_next(current_rec);
1366#ifdef UNIV_DEBUG
1367 if (page_is_comp(page)) {
1368 switch (rec_get_status(current_rec)) {
1369 case REC_STATUS_ORDINARY:
1370 case REC_STATUS_NODE_PTR:
1371 case REC_STATUS_COLUMNS_ADDED:
1372 case REC_STATUS_INFIMUM:
1373 break;
1374 case REC_STATUS_SUPREMUM:
1375 ut_ad(!"wrong status on current_rec");
1376 }
1377 switch (rec_get_status(insert_rec)) {
1378 case REC_STATUS_ORDINARY:
1379 case REC_STATUS_NODE_PTR:
1380 case REC_STATUS_COLUMNS_ADDED:
1381 break;
1382 case REC_STATUS_INFIMUM:
1383 case REC_STATUS_SUPREMUM:
1384 ut_ad(!"wrong status on insert_rec");
1385 }
1386 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1387 }
1388#endif
1389 page_rec_set_next(insert_rec, next_rec);
1390 page_rec_set_next(current_rec, insert_rec);
1391 }
1392
1393 page_header_set_field(page, NULL, PAGE_N_RECS,
1394 1U + page_get_n_recs(page));
1395
1396 /* 5. Set the n_owned field in the inserted record to zero,
1397 and set the heap_no field */
1398 if (page_is_comp(page)) {
1399 rec_set_n_owned_new(insert_rec, NULL, 0);
1400 rec_set_heap_no_new(insert_rec, heap_no);
1401 } else {
1402 rec_set_n_owned_old(insert_rec, 0);
1403 rec_set_heap_no_old(insert_rec, heap_no);
1404 }
1405
1406 UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
1407 rec_offs_size(offsets));
1408 /* 6. Update the last insertion info in page header */
1409
1410 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1411 ut_ad(!last_insert || !page_is_comp(page)
1412 || rec_get_node_ptr_flag(last_insert)
1413 == rec_get_node_ptr_flag(insert_rec));
1414
1415 if (!dict_index_is_spatial(index)) {
1416 byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1417 if (UNIV_UNLIKELY(last_insert == NULL)) {
1418no_direction:
1419 page_direction_reset(ptr, page, NULL);
1420 } else if (last_insert == current_rec
1421 && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1422 page_direction_increment(ptr, page, NULL, PAGE_RIGHT);
1423 } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1424 && page_rec_get_next(insert_rec) == last_insert) {
1425 page_direction_increment(ptr, page, NULL, PAGE_LEFT);
1426 } else {
1427 goto no_direction;
1428 }
1429 }
1430
1431 page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1432
1433 /* 7. It remains to update the owner record. */
1434 {
1435 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1436 ulint n_owned;
1437 if (page_is_comp(page)) {
1438 n_owned = rec_get_n_owned_new(owner_rec);
1439 rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1440 } else {
1441 n_owned = rec_get_n_owned_old(owner_rec);
1442 rec_set_n_owned_old(owner_rec, n_owned + 1);
1443 }
1444
1445 /* 8. Now we have incremented the n_owned field of the owner
1446 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1447 we have to split the corresponding directory slot in two. */
1448
1449 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1450 page_dir_split_slot(
1451 page, NULL,
1452 page_dir_find_owner_slot(owner_rec));
1453 }
1454 }
1455
1456 /* 9. Write log record of the insert */
1457 if (UNIV_LIKELY(mtr != NULL)) {
1458 page_cur_insert_rec_write_log(insert_rec, rec_size,
1459 current_rec, index, mtr);
1460 }
1461
1462 return(insert_rec);
1463}
1464
1465/***********************************************************//**
1466Inserts a record next to page cursor on a compressed and uncompressed
1467page. Returns pointer to inserted record if succeed, i.e.,
1468enough space available, NULL otherwise.
1469The cursor stays at the same position.
1470
1471IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1472if this is a compressed leaf page in a secondary index.
1473This has to be done either within the same mini-transaction,
1474or by invoking ibuf_reset_free_bits() before mtr_commit().
1475
1476@return pointer to record if succeed, NULL otherwise */
1477rec_t*
1478page_cur_insert_rec_zip(
1479/*====================*/
1480 page_cur_t* cursor, /*!< in/out: page cursor */
1481 dict_index_t* index, /*!< in: record descriptor */
1482 const rec_t* rec, /*!< in: pointer to a physical record */
1483 ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1484 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1485{
1486 byte* insert_buf;
1487 ulint rec_size;
1488 page_t* page; /*!< the relevant page */
1489 rec_t* last_insert; /*!< cursor position at previous
1490 insert */
1491 rec_t* free_rec; /*!< a free record that was reused,
1492 or NULL */
1493 rec_t* insert_rec; /*!< inserted record */
1494 ulint heap_no; /*!< heap number of the inserted
1495 record */
1496 page_zip_des_t* page_zip;
1497
1498 page_zip = page_cur_get_page_zip(cursor);
1499 ut_ad(page_zip);
1500
1501 ut_ad(rec_offs_validate(rec, index, offsets));
1502
1503 page = page_cur_get_page(cursor);
1504 ut_ad(dict_table_is_comp(index->table));
1505 ut_ad(page_is_comp(page));
1506 ut_ad(fil_page_index_page_check(page));
1507 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1508 || index->is_dummy
1509 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1510 ut_ad(!page_get_instant(page));
1511 ut_ad(!page_cur_is_after_last(cursor));
1512#ifdef UNIV_ZIP_DEBUG
1513 ut_a(page_zip_validate(page_zip, page, index));
1514#endif /* UNIV_ZIP_DEBUG */
1515
1516 /* 1. Get the size of the physical record in the page */
1517 rec_size = rec_offs_size(offsets);
1518
1519#ifdef UNIV_DEBUG_VALGRIND
1520 {
1521 const void* rec_start
1522 = rec - rec_offs_extra_size(offsets);
1523 ulint extra_size
1524 = rec_offs_extra_size(offsets)
1525 - (rec_offs_comp(offsets)
1526 ? REC_N_NEW_EXTRA_BYTES
1527 : REC_N_OLD_EXTRA_BYTES);
1528
1529 /* All data bytes of the record must be valid. */
1530 UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1531 /* The variable-length header must be valid. */
1532 UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1533 }
1534#endif /* UNIV_DEBUG_VALGRIND */
1535
1536 const bool reorg_before_insert = page_has_garbage(page)
1537 && rec_size > page_get_max_insert_size(page, 1)
1538 && rec_size <= page_get_max_insert_size_after_reorganize(
1539 page, 1);
1540
1541 /* 2. Try to find suitable space from page memory management */
1542 if (!page_zip_available(page_zip, dict_index_is_clust(index),
1543 rec_size, 1)
1544 || reorg_before_insert) {
1545 /* The values can change dynamically. */
1546 bool log_compressed = page_zip_log_pages;
1547 ulint level = page_zip_level;
1548#ifdef UNIV_DEBUG
1549 rec_t* cursor_rec = page_cur_get_rec(cursor);
1550#endif /* UNIV_DEBUG */
1551
1552 /* If we are not writing compressed page images, we
1553 must reorganize the page before attempting the
1554 insert. */
1555 if (recv_recovery_is_on()) {
1556 /* Insert into the uncompressed page only.
1557 The page reorganization or creation that we
1558 would attempt outside crash recovery would
1559 have been covered by a previous redo log record. */
1560 } else if (page_is_empty(page)) {
1561 ut_ad(page_cur_is_before_first(cursor));
1562
1563 /* This is an empty page. Recreate it to
1564 get rid of the modification log. */
1565 page_create_zip(page_cur_get_block(cursor), index,
1566 page_header_get_field(page, PAGE_LEVEL),
1567 0, NULL, mtr);
1568 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1569
1570 if (page_zip_available(
1571 page_zip, dict_index_is_clust(index),
1572 rec_size, 1)) {
1573 goto use_heap;
1574 }
1575
1576 /* The cursor should remain on the page infimum. */
1577 return(NULL);
1578 } else if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1579 /* The page has been freshly compressed, so
1580 reorganizing it will not help. */
1581 } else if (log_compressed && !reorg_before_insert) {
1582 /* Insert into uncompressed page only, and
1583 try page_zip_reorganize() afterwards. */
1584 } else if (btr_page_reorganize_low(
1585 recv_recovery_is_on(), level,
1586 cursor, index, mtr)) {
1587 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1588
1589 if (page_zip_available(
1590 page_zip, dict_index_is_clust(index),
1591 rec_size, 1)) {
1592 /* After reorganizing, there is space
1593 available. */
1594 goto use_heap;
1595 }
1596 } else {
1597 ut_ad(cursor->rec == cursor_rec);
1598 return(NULL);
1599 }
1600
1601 /* Try compressing the whole page afterwards. */
1602 insert_rec = page_cur_insert_rec_low(
1603 cursor->rec, index, rec, offsets, NULL);
1604
1605 /* If recovery is on, this implies that the compression
1606 of the page was successful during runtime. Had that not
1607 been the case or had the redo logging of compressed
1608 pages been enabled during runtime then we'd have seen
1609 a MLOG_ZIP_PAGE_COMPRESS redo record. Therefore, we
1610 know that we don't need to reorganize the page. We,
1611 however, do need to recompress the page. That will
1612 happen when the next redo record is read which must
1613 be of type MLOG_ZIP_PAGE_COMPRESS_NO_DATA and it must
1614 contain a valid compression level value.
1615 This implies that during recovery from this point till
1616 the next redo is applied the uncompressed and
1617 compressed versions are not identical and
1618 page_zip_validate will fail but that is OK because
1619 we call page_zip_validate only after processing
1620 all changes to a page under a single mtr during
1621 recovery. */
1622 if (insert_rec == NULL) {
1623 /* Out of space.
1624 This should never occur during crash recovery,
1625 because the MLOG_COMP_REC_INSERT should only
1626 be logged after a successful operation. */
1627 ut_ad(!recv_recovery_is_on());
1628 ut_ad(!index->is_dummy);
1629 } else if (recv_recovery_is_on()) {
1630 /* This should be followed by
1631 MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
1632 which should succeed. */
1633 rec_offs_make_valid(insert_rec, index,
1634 page_is_leaf(page), offsets);
1635 } else {
1636 ulint pos = page_rec_get_n_recs_before(insert_rec);
1637 ut_ad(pos > 0);
1638
1639 if (!log_compressed) {
1640 if (page_zip_compress(
1641 page_zip, page, index,
1642 level, NULL, NULL)) {
1643 page_cur_insert_rec_write_log(
1644 insert_rec, rec_size,
1645 cursor->rec, index, mtr);
1646 page_zip_compress_write_log_no_data(
1647 level, page, index, mtr);
1648
1649 rec_offs_make_valid(
1650 insert_rec, index,
1651 page_is_leaf(page), offsets);
1652 return(insert_rec);
1653 }
1654
1655 /* Page compress failed. If this happened on a
1656 leaf page, put the data size into the sample
1657 buffer. */
1658 if (page_is_leaf(page)) {
1659 ulint occupied = page_get_data_size(page)
1660 + page_dir_calc_reserved_space(
1661 page_get_n_recs(page));
1662 index->stat_defrag_data_size_sample[
1663 index->stat_defrag_sample_next_slot] =
1664 occupied;
1665 index->stat_defrag_sample_next_slot =
1666 (index->stat_defrag_sample_next_slot
1667 + 1) % STAT_DEFRAG_DATA_SIZE_N_SAMPLE;
1668 }
1669
1670 ut_ad(cursor->rec
1671 == (pos > 1
1672 ? page_rec_get_nth(
1673 page, pos - 1)
1674 : page + PAGE_NEW_INFIMUM));
1675 } else {
1676 /* We are writing entire page images
1677 to the log. Reduce the redo log volume
1678 by reorganizing the page at the same time. */
1679 if (page_zip_reorganize(
1680 cursor->block, index, mtr)) {
1681 /* The page was reorganized:
1682 Seek to pos. */
1683 if (pos > 1) {
1684 cursor->rec = page_rec_get_nth(
1685 page, pos - 1);
1686 } else {
1687 cursor->rec = page
1688 + PAGE_NEW_INFIMUM;
1689 }
1690
1691 insert_rec = page + rec_get_next_offs(
1692 cursor->rec, TRUE);
1693 rec_offs_make_valid(
1694 insert_rec, index,
1695 page_is_leaf(page), offsets);
1696 return(insert_rec);
1697 }
1698
1699 /* Theoretically, we could try one
1700 last resort of btr_page_reorganize_low()
1701 followed by page_zip_available(), but
1702 that would be very unlikely to
1703 succeed. (If the full reorganized page
1704 failed to compress, why would it
1705 succeed to compress the page, plus log
1706 the insert of this record? */
1707 }
1708
1709 /* Out of space: restore the page */
1710 if (!page_zip_decompress(page_zip, page, FALSE)) {
1711 ut_error; /* Memory corrupted? */
1712 }
1713 ut_ad(page_validate(page, index));
1714 insert_rec = NULL;
1715 }
1716
1717 return(insert_rec);
1718 }
1719
1720 free_rec = page_header_get_ptr(page, PAGE_FREE);
1721 if (UNIV_LIKELY_NULL(free_rec)) {
1722 /* Try to allocate from the head of the free list. */
1723 lint extra_size_diff;
1724 ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1725 ulint* foffsets = foffsets_;
1726 mem_heap_t* heap = NULL;
1727
1728 rec_offs_init(foffsets_);
1729
1730 foffsets = rec_get_offsets(free_rec, index, foffsets,
1731 page_rec_is_leaf(free_rec),
1732 ULINT_UNDEFINED, &heap);
1733 if (rec_offs_size(foffsets) < rec_size) {
1734too_small:
1735 if (UNIV_LIKELY_NULL(heap)) {
1736 mem_heap_free(heap);
1737 }
1738
1739 goto use_heap;
1740 }
1741
1742 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1743
1744 /* On compressed pages, do not relocate records from
1745 the free list. If extra_size would grow, use the heap. */
1746 extra_size_diff = lint(rec_offs_extra_size(offsets)
1747 - rec_offs_extra_size(foffsets));
1748
1749 if (UNIV_UNLIKELY(extra_size_diff < 0)) {
1750 /* Add an offset to the extra_size. */
1751 if (rec_offs_size(foffsets)
1752 < rec_size - ulint(extra_size_diff)) {
1753
1754 goto too_small;
1755 }
1756
1757 insert_buf -= extra_size_diff;
1758 } else if (UNIV_UNLIKELY(extra_size_diff)) {
1759 /* Do not allow extra_size to grow */
1760
1761 goto too_small;
1762 }
1763
1764 heap_no = rec_get_heap_no_new(free_rec);
1765 page_mem_alloc_free(page, page_zip,
1766 rec_get_next_ptr(free_rec, TRUE),
1767 rec_size);
1768
1769 if (!page_is_leaf(page)) {
1770 /* Zero out the node pointer of free_rec,
1771 in case it will not be overwritten by
1772 insert_rec. */
1773
1774 ut_ad(rec_size > REC_NODE_PTR_SIZE);
1775
1776 if (rec_offs_extra_size(foffsets)
1777 + rec_offs_data_size(foffsets) > rec_size) {
1778
1779 memset(rec_get_end(free_rec, foffsets)
1780 - REC_NODE_PTR_SIZE, 0,
1781 REC_NODE_PTR_SIZE);
1782 }
1783 } else if (dict_index_is_clust(index)) {
1784 /* Zero out the DB_TRX_ID and DB_ROLL_PTR
1785 columns of free_rec, in case it will not be
1786 overwritten by insert_rec. */
1787
1788 ulint trx_id_col;
1789 ulint trx_id_offs;
1790 ulint len;
1791
1792 trx_id_col = dict_index_get_sys_col_pos(index,
1793 DATA_TRX_ID);
1794 ut_ad(trx_id_col > 0);
1795 ut_ad(trx_id_col != ULINT_UNDEFINED);
1796
1797 trx_id_offs = rec_get_nth_field_offs(foffsets,
1798 trx_id_col, &len);
1799 ut_ad(len == DATA_TRX_ID_LEN);
1800
1801 if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
1802 + rec_offs_extra_size(foffsets) > rec_size) {
1803 /* We will have to zero out the
1804 DB_TRX_ID and DB_ROLL_PTR, because
1805 they will not be fully overwritten by
1806 insert_rec. */
1807
1808 memset(free_rec + trx_id_offs, 0,
1809 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1810 }
1811
1812 ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
1813 == rec_get_nth_field(free_rec, foffsets,
1814 trx_id_col + 1, &len));
1815 ut_ad(len == DATA_ROLL_PTR_LEN);
1816 }
1817
1818 if (UNIV_LIKELY_NULL(heap)) {
1819 mem_heap_free(heap);
1820 }
1821 } else {
1822use_heap:
1823 free_rec = NULL;
1824 insert_buf = page_mem_alloc_heap(page, page_zip,
1825 rec_size, &heap_no);
1826
1827 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1828 return(NULL);
1829 }
1830
1831 page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
1832 }
1833
1834 /* 3. Create the record */
1835 insert_rec = rec_copy(insert_buf, rec, offsets);
1836 rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1837
1838 /* 4. Insert the record in the linked list of records */
1839 ut_ad(cursor->rec != insert_rec);
1840
1841 {
1842 /* next record after current before the insertion */
1843 const rec_t* next_rec = page_rec_get_next_low(
1844 cursor->rec, TRUE);
1845 ut_ad(rec_get_status(cursor->rec)
1846 <= REC_STATUS_INFIMUM);
1847 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1848 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1849
1850 page_rec_set_next(insert_rec, next_rec);
1851 page_rec_set_next(cursor->rec, insert_rec);
1852 }
1853
1854 page_header_set_field(page, page_zip, PAGE_N_RECS,
1855 1U + page_get_n_recs(page));
1856
1857 /* 5. Set the n_owned field in the inserted record to zero,
1858 and set the heap_no field */
1859 rec_set_n_owned_new(insert_rec, NULL, 0);
1860 rec_set_heap_no_new(insert_rec, heap_no);
1861
1862 UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
1863 rec_offs_size(offsets));
1864
1865 page_zip_dir_insert(page_zip, cursor->rec, free_rec, insert_rec);
1866
1867 /* 6. Update the last insertion info in page header */
1868
1869 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1870 ut_ad(!last_insert
1871 || rec_get_node_ptr_flag(last_insert)
1872 == rec_get_node_ptr_flag(insert_rec));
1873
1874 if (!dict_index_is_spatial(index)) {
1875 byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1876 if (UNIV_UNLIKELY(last_insert == NULL)) {
1877no_direction:
1878 page_direction_reset(ptr, page, page_zip);
1879 } else if (last_insert == cursor->rec
1880 && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1881 page_direction_increment(ptr, page, page_zip,
1882 PAGE_RIGHT);
1883 } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1884 && page_rec_get_next(insert_rec) == last_insert) {
1885 page_direction_increment(ptr, page, page_zip,
1886 PAGE_LEFT);
1887 } else {
1888 goto no_direction;
1889 }
1890 }
1891
1892 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
1893
1894 /* 7. It remains to update the owner record. */
1895 {
1896 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1897 ulint n_owned;
1898
1899 n_owned = rec_get_n_owned_new(owner_rec);
1900 rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
1901
1902 /* 8. Now we have incremented the n_owned field of the owner
1903 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1904 we have to split the corresponding directory slot in two. */
1905
1906 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1907 page_dir_split_slot(
1908 page, page_zip,
1909 page_dir_find_owner_slot(owner_rec));
1910 }
1911 }
1912
1913 page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
1914
1915 /* 9. Write log record of the insert */
1916 if (UNIV_LIKELY(mtr != NULL)) {
1917 page_cur_insert_rec_write_log(insert_rec, rec_size,
1918 cursor->rec, index, mtr);
1919 }
1920
1921 return(insert_rec);
1922}
1923
1924/**********************************************************//**
1925Writes a log record of copying a record list end to a new created page.
1926@return 4-byte field where to write the log data length, or NULL if
1927logging is disabled */
1928UNIV_INLINE
1929byte*
1930page_copy_rec_list_to_created_page_write_log(
1931/*=========================================*/
1932 page_t* page, /*!< in: index page */
1933 dict_index_t* index, /*!< in: record descriptor */
1934 mtr_t* mtr) /*!< in: mtr */
1935{
1936 byte* log_ptr;
1937
1938 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1939 ut_ad(mtr->is_named_space(index->table->space));
1940
1941 log_ptr = mlog_open_and_write_index(mtr, page, index,
1942 page_is_comp(page)
1943 ? MLOG_COMP_LIST_END_COPY_CREATED
1944 : MLOG_LIST_END_COPY_CREATED, 4);
1945 if (UNIV_LIKELY(log_ptr != NULL)) {
1946 mlog_close(mtr, log_ptr + 4);
1947 }
1948
1949 return(log_ptr);
1950}
1951
1952/**********************************************************//**
1953Parses a log record of copying a record list end to a new created page.
1954@return end of log record or NULL */
1955byte*
1956page_parse_copy_rec_list_to_created_page(
1957/*=====================================*/
1958 byte* ptr, /*!< in: buffer */
1959 byte* end_ptr,/*!< in: buffer end */
1960 buf_block_t* block, /*!< in: page or NULL */
1961 dict_index_t* index, /*!< in: record descriptor */
1962 mtr_t* mtr) /*!< in: mtr or NULL */
1963{
1964 byte* rec_end;
1965 ulint log_data_len;
1966 page_t* page;
1967 page_zip_des_t* page_zip;
1968
1969 ut_ad(index->is_dummy);
1970
1971 if (ptr + 4 > end_ptr) {
1972
1973 return(NULL);
1974 }
1975
1976 log_data_len = mach_read_from_4(ptr);
1977 ptr += 4;
1978
1979 rec_end = ptr + log_data_len;
1980
1981 if (rec_end > end_ptr) {
1982
1983 return(NULL);
1984 }
1985
1986 if (!block) {
1987
1988 return(rec_end);
1989 }
1990
1991 /* This function is never invoked on the clustered index root page,
1992 except in the redo log apply of
1993 page_copy_rec_list_end_to_created_page() which was logged by.
1994 page_copy_rec_list_to_created_page_write_log().
1995 For other pages, this field must be zero-initialized. */
1996 ut_ad(!page_get_instant(block->frame) || page_is_root(block->frame));
1997
1998 while (ptr < rec_end) {
1999 ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
2000 block, index, mtr);
2001 }
2002
2003 ut_a(ptr == rec_end);
2004
2005 page = buf_block_get_frame(block);
2006 page_zip = buf_block_get_page_zip(block);
2007
2008 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2009
2010 if (!dict_index_is_spatial(index)) {
2011 page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page,
2012 page, page_zip);
2013 }
2014
2015 return(rec_end);
2016}
2017
2018/*************************************************************//**
2019Copies records from page to a newly created page, from a given record onward,
2020including that record. Infimum and supremum records are not copied.
2021
2022IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
2023if this is a compressed leaf page in a secondary index.
2024This has to be done either within the same mini-transaction,
2025or by invoking ibuf_reset_free_bits() before mtr_commit(). */
2026void
2027page_copy_rec_list_end_to_created_page(
2028/*===================================*/
2029 page_t* new_page, /*!< in/out: index page to copy to */
2030 rec_t* rec, /*!< in: first record to copy */
2031 dict_index_t* index, /*!< in: record descriptor */
2032 mtr_t* mtr) /*!< in: mtr */
2033{
2034 page_dir_slot_t* slot = 0; /* remove warning */
2035 byte* heap_top;
2036 rec_t* insert_rec = 0; /* remove warning */
2037 rec_t* prev_rec;
2038 ulint count;
2039 ulint n_recs;
2040 ulint slot_index;
2041 ulint rec_size;
2042 byte* log_ptr;
2043 ulint log_data_len;
2044 mem_heap_t* heap = NULL;
2045 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2046 ulint* offsets = offsets_;
2047 rec_offs_init(offsets_);
2048
2049 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
2050 ut_ad(page_align(rec) != new_page);
2051 ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
2052 /* This function is never invoked on the clustered index root page,
2053 except in btr_lift_page_up(). */
2054 ut_ad(!page_get_instant(new_page) || page_is_root(new_page));
2055
2056 if (page_rec_is_infimum(rec)) {
2057
2058 rec = page_rec_get_next(rec);
2059 }
2060
2061 if (page_rec_is_supremum(rec)) {
2062
2063 return;
2064 }
2065
2066#ifdef UNIV_DEBUG
2067 /* To pass the debug tests we have to set these dummy values
2068 in the debug version */
2069 page_dir_set_n_slots(new_page, NULL, srv_page_size / 2);
2070 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
2071 new_page + srv_page_size - 1);
2072#endif
2073 log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
2074 index, mtr);
2075
2076 log_data_len = mtr->get_log()->size();
2077
2078 /* Individual inserts are logged in a shorter form */
2079
2080 const mtr_log_t log_mode = index->table->is_temporary()
2081 || !index->is_readable() /* IMPORT TABLESPACE */
2082 ? mtr_get_log_mode(mtr)
2083 : mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
2084
2085 prev_rec = page_get_infimum_rec(new_page);
2086 if (page_is_comp(new_page)) {
2087 heap_top = new_page + PAGE_NEW_SUPREMUM_END;
2088 } else {
2089 heap_top = new_page + PAGE_OLD_SUPREMUM_END;
2090 }
2091 count = 0;
2092 slot_index = 0;
2093 n_recs = 0;
2094
2095 const bool is_leaf = page_is_leaf(new_page);
2096
2097 do {
2098 offsets = rec_get_offsets(rec, index, offsets, is_leaf,
2099 ULINT_UNDEFINED, &heap);
2100 insert_rec = rec_copy(heap_top, rec, offsets);
2101
2102 if (page_is_comp(new_page)) {
2103 rec_set_next_offs_new(prev_rec,
2104 page_offset(insert_rec));
2105
2106 rec_set_n_owned_new(insert_rec, NULL, 0);
2107 rec_set_heap_no_new(insert_rec,
2108 PAGE_HEAP_NO_USER_LOW + n_recs);
2109 } else {
2110 rec_set_next_offs_old(prev_rec,
2111 page_offset(insert_rec));
2112
2113 rec_set_n_owned_old(insert_rec, 0);
2114 rec_set_heap_no_old(insert_rec,
2115 PAGE_HEAP_NO_USER_LOW + n_recs);
2116 }
2117
2118 count++;
2119 n_recs++;
2120
2121 if (UNIV_UNLIKELY
2122 (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
2123
2124 slot_index++;
2125
2126 slot = page_dir_get_nth_slot(new_page, slot_index);
2127
2128 page_dir_slot_set_rec(slot, insert_rec);
2129 page_dir_slot_set_n_owned(slot, NULL, count);
2130
2131 count = 0;
2132 }
2133
2134 rec_size = rec_offs_size(offsets);
2135
2136 ut_ad(heap_top < new_page + srv_page_size);
2137
2138 heap_top += rec_size;
2139
2140 rec_offs_make_valid(insert_rec, index, is_leaf, offsets);
2141 page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
2142 index, mtr);
2143 prev_rec = insert_rec;
2144 rec = page_rec_get_next(rec);
2145 } while (!page_rec_is_supremum(rec));
2146
2147 if ((slot_index > 0) && (count + 1
2148 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
2149 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
2150 /* We can merge the two last dir slots. This operation is
2151 here to make this function imitate exactly the equivalent
2152 task made using page_cur_insert_rec, which we use in database
2153 recovery to reproduce the task performed by this function.
2154 To be able to check the correctness of recovery, it is good
2155 that it imitates exactly. */
2156
2157 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
2158
2159 page_dir_slot_set_n_owned(slot, NULL, 0);
2160
2161 slot_index--;
2162 }
2163
2164 if (UNIV_LIKELY_NULL(heap)) {
2165 mem_heap_free(heap);
2166 }
2167
2168 /* Restore the log mode */
2169
2170 mtr_set_log_mode(mtr, log_mode);
2171
2172 log_data_len = mtr->get_log()->size() - log_data_len;
2173
2174 ut_a(log_data_len < 100U << srv_page_size_shift);
2175
2176 if (log_ptr != NULL) {
2177 mach_write_to_4(log_ptr, log_data_len);
2178 }
2179
2180 if (page_is_comp(new_page)) {
2181 rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
2182 } else {
2183 rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
2184 }
2185
2186 slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
2187
2188 page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
2189 page_dir_slot_set_n_owned(slot, NULL, count + 1);
2190
2191 page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
2192 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
2193 page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
2194 page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
2195
2196 *reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_LAST_INSERT + new_page)
2197 = 0;
2198 page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page,
2199 new_page, NULL);
2200}
2201
2202/***********************************************************//**
2203Writes log record of a record delete on a page. */
2204UNIV_INLINE
2205void
2206page_cur_delete_rec_write_log(
2207/*==========================*/
2208 rec_t* rec, /*!< in: record to be deleted */
2209 const dict_index_t* index, /*!< in: record descriptor */
2210 mtr_t* mtr) /*!< in: mini-transaction handle */
2211{
2212 byte* log_ptr;
2213
2214 ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2215 ut_ad(mtr->is_named_space(index->table->space));
2216
2217 log_ptr = mlog_open_and_write_index(mtr, rec, index,
2218 page_rec_is_comp(rec)
2219 ? MLOG_COMP_REC_DELETE
2220 : MLOG_REC_DELETE, 2);
2221
2222 if (!log_ptr) {
2223 /* Logging in mtr is switched off during crash recovery:
2224 in that case mlog_open returns NULL */
2225 return;
2226 }
2227
2228 /* Write the cursor rec offset as a 2-byte ulint */
2229 mach_write_to_2(log_ptr, page_offset(rec));
2230
2231 mlog_close(mtr, log_ptr + 2);
2232}
2233
2234/***********************************************************//**
2235Parses log record of a record delete on a page.
2236@return pointer to record end or NULL */
2237byte*
2238page_cur_parse_delete_rec(
2239/*======================*/
2240 byte* ptr, /*!< in: buffer */
2241 byte* end_ptr,/*!< in: buffer end */
2242 buf_block_t* block, /*!< in: page or NULL */
2243 dict_index_t* index, /*!< in: record descriptor */
2244 mtr_t* mtr) /*!< in: mtr or NULL */
2245{
2246 ulint offset;
2247 page_cur_t cursor;
2248
2249 if (end_ptr < ptr + 2) {
2250
2251 return(NULL);
2252 }
2253
2254 /* Read the cursor rec offset as a 2-byte ulint */
2255 offset = mach_read_from_2(ptr);
2256 ptr += 2;
2257
2258 ut_a(offset <= srv_page_size);
2259
2260 if (block) {
2261 page_t* page = buf_block_get_frame(block);
2262 mem_heap_t* heap = NULL;
2263 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2264 rec_t* rec = page + offset;
2265 rec_offs_init(offsets_);
2266
2267 page_cur_position(rec, block, &cursor);
2268 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
2269
2270 page_cur_delete_rec(&cursor, index,
2271 rec_get_offsets(rec, index, offsets_,
2272 page_rec_is_leaf(rec),
2273 ULINT_UNDEFINED, &heap),
2274 mtr);
2275 if (UNIV_LIKELY_NULL(heap)) {
2276 mem_heap_free(heap);
2277 }
2278 }
2279
2280 return(ptr);
2281}
2282
2283/***********************************************************//**
2284Deletes a record at the page cursor. The cursor is moved to the next
2285record after the deleted one. */
2286void
2287page_cur_delete_rec(
2288/*================*/
2289 page_cur_t* cursor, /*!< in/out: a page cursor */
2290 const dict_index_t* index, /*!< in: record descriptor */
2291 const ulint* offsets,/*!< in: rec_get_offsets(
2292 cursor->rec, index) */
2293 mtr_t* mtr) /*!< in: mini-transaction handle
2294 or NULL */
2295{
2296 page_dir_slot_t* cur_dir_slot;
2297 page_dir_slot_t* prev_slot;
2298 page_t* page;
2299 page_zip_des_t* page_zip;
2300 rec_t* current_rec;
2301 rec_t* prev_rec = NULL;
2302 rec_t* next_rec;
2303 ulint cur_slot_no;
2304 ulint cur_n_owned;
2305 rec_t* rec;
2306
2307 page = page_cur_get_page(cursor);
2308 page_zip = page_cur_get_page_zip(cursor);
2309
2310 /* page_zip_validate() will fail here when
2311 btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2312 Then, both "page_zip" and "page" would have the min-rec-mark
2313 set on the smallest user record, but "page" would additionally
2314 have it set on the smallest-but-one record. Because sloppy
2315 page_zip_validate_low() only ignores min-rec-flag differences
2316 in the smallest user record, it cannot be used here either. */
2317
2318 current_rec = cursor->rec;
2319 ut_ad(rec_offs_validate(current_rec, index, offsets));
2320 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2321 ut_ad(fil_page_index_page_check(page));
2322 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
2323 || index->is_dummy
2324 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
2325 ut_ad(!mtr || mtr->is_named_space(index->table->space));
2326
2327 /* The record must not be the supremum or infimum record. */
2328 ut_ad(page_rec_is_user_rec(current_rec));
2329
2330 if (page_get_n_recs(page) == 1 && !recv_recovery_is_on()) {
2331 /* Empty the page, unless we are applying the redo log
2332 during crash recovery. During normal operation, the
2333 page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
2334 MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
2335 ut_ad(page_is_leaf(page));
2336 /* Usually, this should be the root page,
2337 and the whole index tree should become empty.
2338 However, this could also be a call in
2339 btr_cur_pessimistic_update() to delete the only
2340 record in the page and to insert another one. */
2341 page_cur_move_to_next(cursor);
2342 ut_ad(page_cur_is_after_last(cursor));
2343 page_create_empty(page_cur_get_block(cursor),
2344 const_cast<dict_index_t*>(index), mtr);
2345 return;
2346 }
2347
2348 /* Save to local variables some data associated with current_rec */
2349 cur_slot_no = page_dir_find_owner_slot(current_rec);
2350 ut_ad(cur_slot_no > 0);
2351 cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
2352 cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2353
2354 /* 0. Write the log record */
2355 if (mtr != 0) {
2356 page_cur_delete_rec_write_log(current_rec, index, mtr);
2357 }
2358
2359 /* 1. Reset the last insert info in the page header and increment
2360 the modify clock for the frame */
2361
2362 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2363
2364 /* The page gets invalid for optimistic searches: increment the
2365 frame modify clock only if there is an mini-transaction covering
2366 the change. During IMPORT we allocate local blocks that are not
2367 part of the buffer pool. */
2368
2369 if (mtr != 0) {
2370 buf_block_modify_clock_inc(page_cur_get_block(cursor));
2371 }
2372
2373 /* 2. Find the next and the previous record. Note that the cursor is
2374 left at the next record. */
2375
2376 ut_ad(cur_slot_no > 0);
2377 prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
2378
2379 rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
2380
2381 /* rec now points to the record of the previous directory slot. Look
2382 for the immediate predecessor of current_rec in a loop. */
2383
2384 while (current_rec != rec) {
2385 prev_rec = rec;
2386 rec = page_rec_get_next(rec);
2387 }
2388
2389 page_cur_move_to_next(cursor);
2390 next_rec = cursor->rec;
2391
2392 /* 3. Remove the record from the linked list of records */
2393
2394 page_rec_set_next(prev_rec, next_rec);
2395
2396 /* 4. If the deleted record is pointed to by a dir slot, update the
2397 record pointer in slot. In the following if-clause we assume that
2398 prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2399 >= 2. */
2400
2401 compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
2402 ut_ad(cur_n_owned > 1);
2403
2404 if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
2405 page_dir_slot_set_rec(cur_dir_slot, prev_rec);
2406 }
2407
2408 /* 5. Update the number of owned records of the slot */
2409
2410 page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
2411
2412 /* 6. Free the memory occupied by the record */
2413 page_mem_free(page, page_zip, current_rec, index, offsets);
2414
2415 /* 7. Now we have decremented the number of owned records of the slot.
2416 If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2417 slots. */
2418
2419 if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2420 page_dir_balance_slot(page, page_zip, cur_slot_no);
2421 }
2422
2423#ifdef UNIV_ZIP_DEBUG
2424 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2425#endif /* UNIV_ZIP_DEBUG */
2426}
2427
2428#ifdef UNIV_COMPILE_TEST_FUNCS
2429
2430/*******************************************************************//**
2431Print the first n numbers, generated by page_cur_lcg_prng() to make sure
2432(visually) that it works properly. */
2433void
2434test_page_cur_lcg_prng(
2435/*===================*/
2436 int n) /*!< in: print first n numbers */
2437{
2438 int i;
2439 unsigned long long rnd;
2440
2441 for (i = 0; i < n; i++) {
2442 rnd = page_cur_lcg_prng();
2443 printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2444 rnd,
2445 rnd % 2,
2446 rnd % 3,
2447 rnd % 5,
2448 rnd % 7,
2449 rnd % 11);
2450 }
2451}
2452
2453#endif /* UNIV_COMPILE_TEST_FUNCS */
2454