1/*****************************************************************************
2
3Copyright (c) 2014, 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2017, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file btr/btr0bulk.cc
22The B-tree bulk load
23
24Created 03/11/2014 Shaohua Wang
25*******************************************************/
26
27#include "btr0bulk.h"
28#include "btr0btr.h"
29#include "btr0cur.h"
30#include "btr0pcur.h"
31#include "ibuf0ibuf.h"
32
33/** Innodb B-tree index fill factor for bulk load. */
34uint innobase_fill_factor;
35
36/** Initialize members, allocate page if needed and start mtr.
37Note: we commit all mtrs on failure.
38@return error code. */
39dberr_t
40PageBulk::init()
41{
42 mtr_t* mtr;
43 buf_block_t* new_block;
44 page_t* new_page;
45 page_zip_des_t* new_page_zip;
46 ulint new_page_no;
47
48 ut_ad(m_heap == NULL);
49 m_heap = mem_heap_create(1000);
50
51 mtr = static_cast<mtr_t*>(
52 mem_heap_alloc(m_heap, sizeof(mtr_t)));
53 mtr_start(mtr);
54 mtr_x_lock(dict_index_get_lock(m_index), mtr);
55 mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
56 mtr_set_flush_observer(mtr, m_flush_observer);
57
58 if (m_page_no == FIL_NULL) {
59 mtr_t alloc_mtr;
60
61 /* We commit redo log for allocation by a separate mtr,
62 because we don't guarantee pages are committed following
63 the allocation order, and we will always generate redo log
64 for page allocation, even when creating a new tablespace. */
65 alloc_mtr.start();
66 m_index->set_modified(alloc_mtr);
67
68 ulint n_reserved;
69 bool success;
70 success = fsp_reserve_free_extents(&n_reserved,
71 m_index->table->space,
72 1, FSP_NORMAL, &alloc_mtr);
73 if (!success) {
74 mtr_commit(&alloc_mtr);
75 mtr_commit(mtr);
76 return(DB_OUT_OF_FILE_SPACE);
77 }
78
79 /* Allocate a new page. */
80 new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
81 &alloc_mtr, mtr);
82
83 m_index->table->space->release_free_extents(n_reserved);
84
85 alloc_mtr.commit();
86
87 new_page = buf_block_get_frame(new_block);
88 new_page_zip = buf_block_get_page_zip(new_block);
89 new_page_no = page_get_page_no(new_page);
90
91 if (new_page_zip) {
92 page_create_zip(new_block, m_index, m_level, 0,
93 NULL, mtr);
94 } else {
95 ut_ad(!dict_index_is_spatial(m_index));
96 page_create(new_block, mtr,
97 dict_table_is_comp(m_index->table),
98 false);
99 btr_page_set_level(new_page, NULL, m_level, mtr);
100 }
101
102 btr_page_set_next(new_page, NULL, FIL_NULL, mtr);
103 btr_page_set_prev(new_page, NULL, FIL_NULL, mtr);
104
105 btr_page_set_index_id(new_page, NULL, m_index->id, mtr);
106 } else {
107 new_block = btr_block_get(
108 page_id_t(m_index->table->space->id, m_page_no),
109 page_size_t(m_index->table->space->flags),
110 RW_X_LATCH, m_index, mtr);
111
112 new_page = buf_block_get_frame(new_block);
113 new_page_zip = buf_block_get_page_zip(new_block);
114 new_page_no = page_get_page_no(new_page);
115 ut_ad(m_page_no == new_page_no);
116
117 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
118
119 btr_page_set_level(new_page, NULL, m_level, mtr);
120 }
121
122 if (dict_index_is_sec_or_ibuf(m_index)
123 && !m_index->table->is_temporary()
124 && page_is_leaf(new_page)) {
125 page_update_max_trx_id(new_block, NULL, m_trx_id, mtr);
126 }
127
128 m_mtr = mtr;
129 m_block = new_block;
130 m_block->skip_flush_check = true;
131 m_page = new_page;
132 m_page_zip = new_page_zip;
133 m_page_no = new_page_no;
134 m_cur_rec = page_get_infimum_rec(new_page);
135 ut_ad(m_is_comp == !!page_is_comp(new_page));
136 m_free_space = page_get_free_space_of_empty(m_is_comp);
137
138 if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
139 /* Keep default behavior compatible with 5.6 */
140 m_reserved_space = dict_index_get_space_reserve();
141 } else {
142 m_reserved_space =
143 srv_page_size * (100 - innobase_fill_factor) / 100;
144 }
145
146 m_padding_space =
147 srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
148 m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
149 m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
150
151 ut_d(m_total_data = 0);
152 page_header_set_field(m_page, NULL, PAGE_HEAP_TOP, srv_page_size - 1);
153
154 return(DB_SUCCESS);
155}
156
157/** Insert a record in the page.
158@param[in] rec record
159@param[in] offsets record offsets */
160void
161PageBulk::insert(
162 const rec_t* rec,
163 ulint* offsets)
164{
165 ulint rec_size;
166
167 ut_ad(m_heap != NULL);
168
169 rec_size = rec_offs_size(offsets);
170 ut_d(const bool is_leaf = page_rec_is_leaf(m_cur_rec));
171
172#ifdef UNIV_DEBUG
173 /* Check whether records are in order. */
174 if (!page_rec_is_infimum(m_cur_rec)) {
175 rec_t* old_rec = m_cur_rec;
176 ulint* old_offsets = rec_get_offsets(
177 old_rec, m_index, NULL, is_leaf,
178 ULINT_UNDEFINED, &m_heap);
179
180 ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index)
181 > 0);
182 }
183
184 m_total_data += rec_size;
185#endif /* UNIV_DEBUG */
186
187 /* 1. Copy the record to page. */
188 rec_t* insert_rec = rec_copy(m_heap_top, rec, offsets);
189 rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
190
191 /* 2. Insert the record in the linked list. */
192 rec_t* next_rec = page_rec_get_next(m_cur_rec);
193
194 page_rec_set_next(insert_rec, next_rec);
195 page_rec_set_next(m_cur_rec, insert_rec);
196
197 /* 3. Set the n_owned field in the inserted record to zero,
198 and set the heap_no field. */
199 if (m_is_comp) {
200 rec_set_n_owned_new(insert_rec, NULL, 0);
201 rec_set_heap_no_new(insert_rec,
202 PAGE_HEAP_NO_USER_LOW + m_rec_no);
203 } else {
204 rec_set_n_owned_old(insert_rec, 0);
205 rec_set_heap_no_old(insert_rec,
206 PAGE_HEAP_NO_USER_LOW + m_rec_no);
207 }
208
209 /* 4. Set member variables. */
210 ulint slot_size;
211 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
212 - page_dir_calc_reserved_space(m_rec_no);
213
214 ut_ad(m_free_space >= rec_size + slot_size);
215 ut_ad(m_heap_top + rec_size < m_page + srv_page_size);
216
217 m_free_space -= rec_size + slot_size;
218 m_heap_top += rec_size;
219 m_rec_no += 1;
220 m_cur_rec = insert_rec;
221}
222
223/** Mark end of insertion to the page. Scan all records to set page dirs,
224and set page header members.
225Note: we refer to page_copy_rec_list_end_to_created_page. */
226void
227PageBulk::finish()
228{
229 ut_ad(m_rec_no > 0);
230
231#ifdef UNIV_DEBUG
232 ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
233 <= page_get_free_space_of_empty(m_is_comp));
234
235 /* To pass the debug tests we have to set these dummy values
236 in the debug version */
237 page_dir_set_n_slots(m_page, NULL, srv_page_size / 2);
238#endif
239
240 ulint count = 0;
241 ulint n_recs = 0;
242 ulint slot_index = 0;
243 rec_t* insert_rec = page_rec_get_next(page_get_infimum_rec(m_page));
244 page_dir_slot_t* slot = NULL;
245
246 /* Set owner & dir. */
247 do {
248
249 count++;
250 n_recs++;
251
252 if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
253
254 slot_index++;
255
256 slot = page_dir_get_nth_slot(m_page, slot_index);
257
258 page_dir_slot_set_rec(slot, insert_rec);
259 page_dir_slot_set_n_owned(slot, NULL, count);
260
261 count = 0;
262 }
263
264 insert_rec = page_rec_get_next(insert_rec);
265 } while (!page_rec_is_supremum(insert_rec));
266
267 if (slot_index > 0
268 && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
269 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
270 /* We can merge the two last dir slots. This operation is
271 here to make this function imitate exactly the equivalent
272 task made using page_cur_insert_rec, which we use in database
273 recovery to reproduce the task performed by this function.
274 To be able to check the correctness of recovery, it is good
275 that it imitates exactly. */
276
277 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
278
279 page_dir_slot_set_n_owned(slot, NULL, 0);
280
281 slot_index--;
282 }
283
284 slot = page_dir_get_nth_slot(m_page, 1 + slot_index);
285 page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
286 page_dir_slot_set_n_owned(slot, NULL, count + 1);
287
288 ut_ad(!dict_index_is_spatial(m_index));
289 page_dir_set_n_slots(m_page, NULL, 2 + slot_index);
290 page_header_set_ptr(m_page, NULL, PAGE_HEAP_TOP, m_heap_top);
291 page_dir_set_n_heap(m_page, NULL, PAGE_HEAP_NO_USER_LOW + m_rec_no);
292 page_header_set_ptr(m_page, NULL, PAGE_LAST_INSERT, m_cur_rec);
293 mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
294 ut_ad(!page_get_instant(m_page));
295 m_page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_RIGHT;
296 *reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_N_DIRECTION + m_page)
297 = 0;
298 m_block->skip_flush_check = false;
299}
300
301/** Commit inserts done to the page
302@param[in] success Flag whether all inserts succeed. */
303void
304PageBulk::commit(
305 bool success)
306{
307 if (success) {
308 ut_ad(page_validate(m_page, m_index));
309
310 /* Set no free space left and no buffered changes in ibuf. */
311 if (!dict_index_is_clust(m_index)
312 && !m_index->table->is_temporary()
313 && page_is_leaf(m_page)) {
314 ibuf_set_bitmap_for_bulk_load(
315 m_block, innobase_fill_factor == 100);
316 }
317 }
318
319 mtr_commit(m_mtr);
320}
321
322/** Compress a page of compressed table
323@return true compress successfully or no need to compress
324@return false compress failed. */
325bool
326PageBulk::compress()
327{
328 ut_ad(m_page_zip != NULL);
329
330 return(page_zip_compress(m_page_zip, m_page, m_index,
331 page_zip_level, NULL, m_mtr));
332}
333
334/** Get node pointer
335@return node pointer */
336dtuple_t*
337PageBulk::getNodePtr()
338{
339 rec_t* first_rec;
340 dtuple_t* node_ptr;
341
342 /* Create node pointer */
343 first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
344 ut_a(page_rec_is_user_rec(first_rec));
345 node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
346 m_heap, m_level);
347
348 return(node_ptr);
349}
350
351/** Get split rec in left page.We split a page in half when compresssion fails,
352and the split rec will be copied to right page.
353@return split rec */
354rec_t*
355PageBulk::getSplitRec()
356{
357 rec_t* rec;
358 ulint* offsets;
359 ulint total_used_size;
360 ulint total_recs_size;
361 ulint n_recs;
362
363 ut_ad(m_page_zip != NULL);
364 ut_ad(m_rec_no >= 2);
365
366 ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
367 total_used_size = page_get_free_space_of_empty(m_is_comp)
368 - m_free_space;
369
370 total_recs_size = 0;
371 n_recs = 0;
372 offsets = NULL;
373 rec = page_get_infimum_rec(m_page);
374
375 do {
376 rec = page_rec_get_next(rec);
377 ut_ad(page_rec_is_user_rec(rec));
378
379 offsets = rec_get_offsets(rec, m_index, offsets,
380 page_is_leaf(m_page),
381 ULINT_UNDEFINED, &m_heap);
382 total_recs_size += rec_offs_size(offsets);
383 n_recs++;
384 } while (total_recs_size + page_dir_calc_reserved_space(n_recs)
385 < total_used_size / 2);
386
387 /* Keep at least one record on left page */
388 if (page_rec_is_infimum(page_rec_get_prev(rec))) {
389 rec = page_rec_get_next(rec);
390 ut_ad(page_rec_is_user_rec(rec));
391 }
392
393 return(rec);
394}
395
396/** Copy all records after split rec including itself.
397@param[in] rec split rec */
398void
399PageBulk::copyIn(
400 rec_t* split_rec)
401{
402
403 rec_t* rec = split_rec;
404 ulint* offsets = NULL;
405
406 ut_ad(m_rec_no == 0);
407 ut_ad(page_rec_is_user_rec(rec));
408
409 do {
410 offsets = rec_get_offsets(rec, m_index, offsets,
411 page_rec_is_leaf(split_rec),
412 ULINT_UNDEFINED, &m_heap);
413
414 insert(rec, offsets);
415
416 rec = page_rec_get_next(rec);
417 } while (!page_rec_is_supremum(rec));
418
419 ut_ad(m_rec_no > 0);
420}
421
422/** Remove all records after split rec including itself.
423@param[in] rec split rec */
424void
425PageBulk::copyOut(
426 rec_t* split_rec)
427{
428 rec_t* rec;
429 rec_t* last_rec;
430 ulint n;
431
432 /* Suppose before copyOut, we have 5 records on the page:
433 infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
434
435 after copyOut, we have 2 records on the page:
436 infimum->r1->r2->supremum. slot ajustment is not done. */
437
438 rec = page_rec_get_next(page_get_infimum_rec(m_page));
439 last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
440 n = 0;
441
442 while (rec != split_rec) {
443 rec = page_rec_get_next(rec);
444 n++;
445 }
446
447 ut_ad(n > 0);
448
449 /* Set last record's next in page */
450 ulint* offsets = NULL;
451 rec = page_rec_get_prev(split_rec);
452 offsets = rec_get_offsets(rec, m_index, offsets,
453 page_rec_is_leaf(split_rec),
454 ULINT_UNDEFINED, &m_heap);
455 page_rec_set_next(rec, page_get_supremum_rec(m_page));
456
457 /* Set related members */
458 m_cur_rec = rec;
459 m_heap_top = rec_get_end(rec, offsets);
460
461 offsets = rec_get_offsets(last_rec, m_index, offsets,
462 page_rec_is_leaf(split_rec),
463 ULINT_UNDEFINED, &m_heap);
464
465 m_free_space += ulint(rec_get_end(last_rec, offsets) - m_heap_top)
466 + page_dir_calc_reserved_space(m_rec_no)
467 - page_dir_calc_reserved_space(n);
468 ut_ad(lint(m_free_space) > 0);
469 m_rec_no = n;
470
471#ifdef UNIV_DEBUG
472 m_total_data -= ulint(rec_get_end(last_rec, offsets) - m_heap_top);
473#endif /* UNIV_DEBUG */
474}
475
476/** Set next page
477@param[in] next_page_no next page no */
478void
479PageBulk::setNext(
480 ulint next_page_no)
481{
482 btr_page_set_next(m_page, NULL, next_page_no, m_mtr);
483}
484
485/** Set previous page
486@param[in] prev_page_no previous page no */
487void
488PageBulk::setPrev(
489 ulint prev_page_no)
490{
491 btr_page_set_prev(m_page, NULL, prev_page_no, m_mtr);
492}
493
494/** Check if required space is available in the page for the rec to be inserted.
495We check fill factor & padding here.
496@param[in] length required length
497@return true if space is available */
498bool
499PageBulk::isSpaceAvailable(
500 ulint rec_size)
501{
502 ulint slot_size;
503 ulint required_space;
504
505 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
506 - page_dir_calc_reserved_space(m_rec_no);
507
508 required_space = rec_size + slot_size;
509
510 if (required_space > m_free_space) {
511 ut_ad(m_rec_no > 0);
512 return false;
513 }
514
515 /* Fillfactor & Padding apply to both leaf and non-leaf pages.
516 Note: we keep at least 2 records in a page to avoid B-tree level
517 growing too high. */
518 if (m_rec_no >= 2
519 && ((m_page_zip == NULL && m_free_space - required_space
520 < m_reserved_space)
521 || (m_page_zip != NULL && m_free_space - required_space
522 < m_padding_space))) {
523 return(false);
524 }
525
526 return(true);
527}
528
529/** Check whether the record needs to be stored externally.
530@return false if the entire record can be stored locally on the page */
531bool
532PageBulk::needExt(
533 const dtuple_t* tuple,
534 ulint rec_size)
535{
536 return(page_zip_rec_needs_ext(rec_size, m_is_comp,
537 dtuple_get_n_fields(tuple), m_block->page.size));
538}
539
540/** Store external record
541Since the record is not logged yet, so we don't log update to the record.
542the blob data is logged first, then the record is logged in bulk mode.
543@param[in] big_rec external recrod
544@param[in] offsets record offsets
545@return error code */
546dberr_t
547PageBulk::storeExt(
548 const big_rec_t* big_rec,
549 ulint* offsets)
550{
551 /* Note: not all fileds are initialized in btr_pcur. */
552 btr_pcur_t btr_pcur;
553 btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
554 btr_pcur.latch_mode = BTR_MODIFY_LEAF;
555 btr_pcur.btr_cur.index = m_index;
556
557 page_cur_t* page_cur = &btr_pcur.btr_cur.page_cur;
558 page_cur->index = m_index;
559 page_cur->rec = m_cur_rec;
560 page_cur->offsets = offsets;
561 page_cur->block = m_block;
562
563 dberr_t err = btr_store_big_rec_extern_fields(
564 &btr_pcur, offsets, big_rec, m_mtr, BTR_STORE_INSERT_BULK);
565
566 ut_ad(page_offset(m_cur_rec) == page_offset(page_cur->rec));
567
568 /* Reset m_block and m_cur_rec from page cursor, because
569 block may be changed during blob insert. */
570 m_block = page_cur->block;
571 m_cur_rec = page_cur->rec;
572 m_page = buf_block_get_frame(m_block);
573
574 return(err);
575}
576
577/** Release block by commiting mtr
578Note: log_free_check requires holding no lock/latch in current thread. */
579void
580PageBulk::release()
581{
582 ut_ad(!dict_index_is_spatial(m_index));
583
584 /* We fix the block because we will re-pin it soon. */
585 buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
586
587 /* No other threads can modify this block. */
588 m_modify_clock = buf_block_get_modify_clock(m_block);
589
590 mtr_commit(m_mtr);
591}
592
593/** Start mtr and latch the block */
594dberr_t
595PageBulk::latch()
596{
597 ibool ret;
598
599 mtr_start(m_mtr);
600 mtr_x_lock(dict_index_get_lock(m_index), m_mtr);
601 mtr_set_log_mode(m_mtr, MTR_LOG_NO_REDO);
602 mtr_set_flush_observer(m_mtr, m_flush_observer);
603
604 /* TODO: need a simple and wait version of buf_page_optimistic_get. */
605 ret = buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
606 __FILE__, __LINE__, m_mtr);
607 /* In case the block is S-latched by page_cleaner. */
608 if (!ret) {
609 m_block = buf_page_get_gen(
610 page_id_t(m_index->table->space->id, m_page_no),
611 page_size_t(m_index->table->space->flags),
612 RW_X_LATCH, m_block, BUF_GET_IF_IN_POOL,
613 __FILE__, __LINE__, m_mtr, &m_err);
614
615 if (m_err != DB_SUCCESS) {
616 return (m_err);
617 }
618
619 ut_ad(m_block != NULL);
620 }
621
622 buf_block_buf_fix_dec(m_block);
623
624 ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
625
626 return (m_err);
627}
628
629/** Split a page
630@param[in] page_bulk page to split
631@param[in] next_page_bulk next page
632@return error code */
633dberr_t
634BtrBulk::pageSplit(
635 PageBulk* page_bulk,
636 PageBulk* next_page_bulk)
637{
638 ut_ad(page_bulk->getPageZip() != NULL);
639
640 /* 1. Check if we have only one user record on the page. */
641 if (page_bulk->getRecNo() <= 1) {
642 return(DB_TOO_BIG_RECORD);
643 }
644
645 /* 2. create a new page. */
646 PageBulk new_page_bulk(m_index, m_trx_id, FIL_NULL,
647 page_bulk->getLevel(), m_flush_observer);
648 dberr_t err = new_page_bulk.init();
649 if (err != DB_SUCCESS) {
650 return(err);
651 }
652
653 /* 3. copy the upper half to new page. */
654 rec_t* split_rec = page_bulk->getSplitRec();
655 new_page_bulk.copyIn(split_rec);
656 page_bulk->copyOut(split_rec);
657
658 /* 4. commit the splitted page. */
659 err = pageCommit(page_bulk, &new_page_bulk, true);
660 if (err != DB_SUCCESS) {
661 pageAbort(&new_page_bulk);
662 return(err);
663 }
664
665 /* 5. commit the new page. */
666 err = pageCommit(&new_page_bulk, next_page_bulk, true);
667 if (err != DB_SUCCESS) {
668 pageAbort(&new_page_bulk);
669 return(err);
670 }
671
672 return(err);
673}
674
675/** Commit(finish) a page. We set next/prev page no, compress a page of
676compressed table and split the page if compression fails, insert a node
677pointer to father page if needed, and commit mini-transaction.
678@param[in] page_bulk page to commit
679@param[in] next_page_bulk next page
680@param[in] insert_father false when page_bulk is a root page and
681 true when it's a non-root page
682@return error code */
683dberr_t
684BtrBulk::pageCommit(
685 PageBulk* page_bulk,
686 PageBulk* next_page_bulk,
687 bool insert_father)
688{
689 page_bulk->finish();
690
691 /* Set page links */
692 if (next_page_bulk != NULL) {
693 ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
694
695 page_bulk->setNext(next_page_bulk->getPageNo());
696 next_page_bulk->setPrev(page_bulk->getPageNo());
697 } else {
698 /** Suppose a page is released and latched again, we need to
699 mark it modified in mini-transaction. */
700 page_bulk->setNext(FIL_NULL);
701 }
702
703 /* Compress page if it's a compressed table. */
704 if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
705 return(pageSplit(page_bulk, next_page_bulk));
706 }
707
708 /* Insert node pointer to father page. */
709 if (insert_father) {
710 dtuple_t* node_ptr = page_bulk->getNodePtr();
711 dberr_t err = insert(node_ptr, page_bulk->getLevel()+1);
712
713 if (err != DB_SUCCESS) {
714 return(err);
715 }
716 }
717
718 /* Commit mtr. */
719 page_bulk->commit(true);
720
721 return(DB_SUCCESS);
722}
723
724/** Log free check */
725void
726BtrBulk::logFreeCheck()
727{
728 if (log_sys.check_flush_or_checkpoint) {
729 release();
730
731 log_free_check();
732
733 latch();
734 }
735}
736
737/** Release all latches */
738void
739BtrBulk::release()
740{
741 ut_ad(m_root_level + 1 == m_page_bulks->size());
742
743 for (ulint level = 0; level <= m_root_level; level++) {
744 PageBulk* page_bulk = m_page_bulks->at(level);
745
746 page_bulk->release();
747 }
748}
749
750/** Re-latch all latches */
751void
752BtrBulk::latch()
753{
754 ut_ad(m_root_level + 1 == m_page_bulks->size());
755
756 for (ulint level = 0; level <= m_root_level; level++) {
757 PageBulk* page_bulk = m_page_bulks->at(level);
758 page_bulk->latch();
759 }
760}
761
762/** Insert a tuple to page in a level
763@param[in] tuple tuple to insert
764@param[in] level B-tree level
765@return error code */
766dberr_t
767BtrBulk::insert(
768 dtuple_t* tuple,
769 ulint level)
770{
771 bool is_left_most = false;
772 dberr_t err = DB_SUCCESS;
773
774 ut_ad(m_heap != NULL);
775
776 /* Check if we need to create a PageBulk for the level. */
777 if (level + 1 > m_page_bulks->size()) {
778 PageBulk* new_page_bulk
779 = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id, FIL_NULL,
780 level, m_flush_observer));
781 err = new_page_bulk->init();
782 if (err != DB_SUCCESS) {
783 return(err);
784 }
785
786 m_page_bulks->push_back(new_page_bulk);
787 ut_ad(level + 1 == m_page_bulks->size());
788 m_root_level = level;
789
790 is_left_most = true;
791 }
792
793 ut_ad(m_page_bulks->size() > level);
794
795 PageBulk* page_bulk = m_page_bulks->at(level);
796
797 if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
798 /* The node pointer must be marked as the predefined minimum
799 record, as there is no lower alphabetical limit to records in
800 the leftmost node of a level: */
801 dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
802 | REC_INFO_MIN_REC_FLAG);
803 }
804
805 ulint n_ext = 0;
806 ulint rec_size = rec_get_converted_size(m_index, tuple, n_ext);
807 big_rec_t* big_rec = NULL;
808 rec_t* rec = NULL;
809 ulint* offsets = NULL;
810
811 if (page_bulk->needExt(tuple, rec_size)) {
812 /* The record is so big that we have to store some fields
813 externally on separate database pages */
814 big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
815
816 if (big_rec == NULL) {
817 return(DB_TOO_BIG_RECORD);
818 }
819
820 rec_size = rec_get_converted_size(m_index, tuple, n_ext);
821 }
822
823 if (page_bulk->getPageZip() != NULL
824 && page_zip_is_too_big(m_index, tuple)) {
825 err = DB_TOO_BIG_RECORD;
826 goto func_exit;
827 }
828
829 if (!page_bulk->isSpaceAvailable(rec_size)) {
830 /* Create a sibling page_bulk. */
831 PageBulk* sibling_page_bulk;
832 sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id,
833 FIL_NULL, level,
834 m_flush_observer));
835 err = sibling_page_bulk->init();
836 if (err != DB_SUCCESS) {
837 UT_DELETE(sibling_page_bulk);
838 goto func_exit;
839 }
840
841 /* Commit page bulk. */
842 err = pageCommit(page_bulk, sibling_page_bulk, true);
843 if (err != DB_SUCCESS) {
844 pageAbort(sibling_page_bulk);
845 UT_DELETE(sibling_page_bulk);
846 goto func_exit;
847 }
848
849 /* Set new page bulk to page_bulks. */
850 ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
851 m_page_bulks->at(level) = sibling_page_bulk;
852
853 UT_DELETE(page_bulk);
854 page_bulk = sibling_page_bulk;
855
856 /* Important: log_free_check whether we need a checkpoint. */
857 if (page_is_leaf(sibling_page_bulk->getPage())) {
858 /* Check whether trx is interrupted */
859 if (m_flush_observer->check_interrupted()) {
860 err = DB_INTERRUPTED;
861 goto func_exit;
862 }
863
864 /* Wake up page cleaner to flush dirty pages. */
865 srv_inc_activity_count();
866 os_event_set(buf_flush_event);
867
868 logFreeCheck();
869 }
870
871 }
872
873 /* Convert tuple to rec. */
874 rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
875 page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
876 offsets = rec_get_offsets(rec, m_index, offsets, !level,
877 ULINT_UNDEFINED, &page_bulk->m_heap);
878
879 page_bulk->insert(rec, offsets);
880
881 if (big_rec != NULL) {
882 ut_ad(dict_index_is_clust(m_index));
883 ut_ad(page_bulk->getLevel() == 0);
884 ut_ad(page_bulk == m_page_bulks->at(0));
885
886 /* Release all latched but leaf node. */
887 for (ulint level = 1; level <= m_root_level; level++) {
888 PageBulk* page_bulk = m_page_bulks->at(level);
889
890 page_bulk->release();
891 }
892
893 err = page_bulk->storeExt(big_rec, offsets);
894
895 /* Latch */
896 for (ulint level = 1; level <= m_root_level; level++) {
897 PageBulk* page_bulk = m_page_bulks->at(level);
898 page_bulk->latch();
899 }
900 }
901
902func_exit:
903 if (big_rec != NULL) {
904 dtuple_convert_back_big_rec(m_index, tuple, big_rec);
905 }
906
907 return(err);
908}
909
910/** Btree bulk load finish. We commit the last page in each level
911and copy the last page in top level to the root page of the index
912if no error occurs.
913@param[in] err whether bulk load was successful until now
914@return error code */
915dberr_t
916BtrBulk::finish(dberr_t err)
917{
918 ulint last_page_no = FIL_NULL;
919
920 ut_ad(!m_index->table->is_temporary());
921
922 if (m_page_bulks->size() == 0) {
923 /* The table is empty. The root page of the index tree
924 is already in a consistent state. No need to flush. */
925 return(err);
926 }
927
928 ut_ad(m_root_level + 1 == m_page_bulks->size());
929
930 /* Finish all page bulks */
931 for (ulint level = 0; level <= m_root_level; level++) {
932 PageBulk* page_bulk = m_page_bulks->at(level);
933
934 last_page_no = page_bulk->getPageNo();
935
936 if (err == DB_SUCCESS) {
937 err = pageCommit(page_bulk, NULL,
938 level != m_root_level);
939 }
940
941 if (err != DB_SUCCESS) {
942 pageAbort(page_bulk);
943 }
944
945 UT_DELETE(page_bulk);
946 }
947
948 if (err == DB_SUCCESS) {
949 rec_t* first_rec;
950 mtr_t mtr;
951 buf_block_t* last_block;
952 PageBulk root_page_bulk(m_index, m_trx_id,
953 m_index->page, m_root_level,
954 m_flush_observer);
955
956 mtr.start();
957 m_index->set_modified(mtr);
958 mtr_x_lock(&m_index->lock, &mtr);
959
960 ut_ad(last_page_no != FIL_NULL);
961 last_block = btr_block_get(
962 page_id_t(m_index->table->space->id, last_page_no),
963 page_size_t(m_index->table->space->flags),
964 RW_X_LATCH, m_index, &mtr);
965 first_rec = page_rec_get_next(
966 page_get_infimum_rec(last_block->frame));
967 ut_ad(page_rec_is_user_rec(first_rec));
968
969 /* Copy last page to root page. */
970 err = root_page_bulk.init();
971 if (err != DB_SUCCESS) {
972 mtr.commit();
973 return(err);
974 }
975 root_page_bulk.copyIn(first_rec);
976
977 /* Remove last page. */
978 btr_page_free_low(m_index, last_block, m_root_level, false, &mtr);
979
980 /* Do not flush the last page. */
981 last_block->page.flush_observer = NULL;
982
983 mtr.commit();
984
985 err = pageCommit(&root_page_bulk, NULL, false);
986 ut_ad(err == DB_SUCCESS);
987 }
988
989 ut_ad(!sync_check_iterate(dict_sync_check()));
990
991 ut_ad(err != DB_SUCCESS || btr_validate_index(m_index, NULL, false));
992 return(err);
993}
994