1/*-------------------------------------------------------------------------
2 *
3 * nbtinsert.c
4 * Item insertion in Lehman and Yao btrees for Postgres.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/nbtree.h"
19#include "access/nbtxlog.h"
20#include "access/tableam.h"
21#include "access/transam.h"
22#include "access/xloginsert.h"
23#include "miscadmin.h"
24#include "storage/lmgr.h"
25#include "storage/predicate.h"
26#include "storage/smgr.h"
27
28/* Minimum tree height for application of fastpath optimization */
29#define BTREE_FASTPATH_MIN_LEVEL 2
30
31
32static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
33
34static TransactionId _bt_check_unique(Relation rel, BTInsertState insertstate,
35 Relation heapRel,
36 IndexUniqueCheck checkUnique, bool *is_unique,
37 uint32 *speculativeToken);
38static OffsetNumber _bt_findinsertloc(Relation rel,
39 BTInsertState insertstate,
40 bool checkingunique,
41 BTStack stack,
42 Relation heapRel);
43static void _bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack);
44static void _bt_insertonpg(Relation rel, BTScanInsert itup_key,
45 Buffer buf,
46 Buffer cbuf,
47 BTStack stack,
48 IndexTuple itup,
49 OffsetNumber newitemoff,
50 bool split_only_page);
51static Buffer _bt_split(Relation rel, BTScanInsert itup_key, Buffer buf,
52 Buffer cbuf, OffsetNumber newitemoff, Size newitemsz,
53 IndexTuple newitem);
54static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
55 BTStack stack, bool is_root, bool is_only);
56static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
57 OffsetNumber itup_off);
58static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
59
60/*
61 * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
62 *
63 * This routine is called by the public interface routine, btinsert.
64 * By here, itup is filled in, including the TID.
65 *
66 * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
67 * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
68 * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
69 * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
70 * don't actually insert.
71 *
72 * The result value is only significant for UNIQUE_CHECK_PARTIAL:
73 * it must be true if the entry is known unique, else false.
74 * (In the current implementation we'll also return true after a
75 * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
76 * that's just a coding artifact.)
77 */
78bool
79_bt_doinsert(Relation rel, IndexTuple itup,
80 IndexUniqueCheck checkUnique, Relation heapRel)
81{
82 bool is_unique = false;
83 BTInsertStateData insertstate;
84 BTScanInsert itup_key;
85 BTStack stack = NULL;
86 Buffer buf;
87 bool fastpath;
88 bool checkingunique = (checkUnique != UNIQUE_CHECK_NO);
89
90 /* we need an insertion scan key to do our search, so build one */
91 itup_key = _bt_mkscankey(rel, itup);
92
93 if (checkingunique)
94 {
95 if (!itup_key->anynullkeys)
96 {
97 /* No (heapkeyspace) scantid until uniqueness established */
98 itup_key->scantid = NULL;
99 }
100 else
101 {
102 /*
103 * Scan key for new tuple contains NULL key values. Bypass
104 * checkingunique steps. They are unnecessary because core code
105 * considers NULL unequal to every value, including NULL.
106 *
107 * This optimization avoids O(N^2) behavior within the
108 * _bt_findinsertloc() heapkeyspace path when a unique index has a
109 * large number of "duplicates" with NULL key values.
110 */
111 checkingunique = false;
112 /* Tuple is unique in the sense that core code cares about */
113 Assert(checkUnique != UNIQUE_CHECK_EXISTING);
114 is_unique = true;
115 }
116 }
117
118 /*
119 * Fill in the BTInsertState working area, to track the current page and
120 * position within the page to insert on
121 */
122 insertstate.itup = itup;
123 /* PageAddItem will MAXALIGN(), but be consistent */
124 insertstate.itemsz = MAXALIGN(IndexTupleSize(itup));
125 insertstate.itup_key = itup_key;
126 insertstate.bounds_valid = false;
127 insertstate.buf = InvalidBuffer;
128
129 /*
130 * It's very common to have an index on an auto-incremented or
131 * monotonically increasing value. In such cases, every insertion happens
132 * towards the end of the index. We try to optimize that case by caching
133 * the right-most leaf of the index. If our cached block is still the
134 * rightmost leaf, has enough free space to accommodate a new entry and
135 * the insertion key is strictly greater than the first key in this page,
136 * then we can safely conclude that the new key will be inserted in the
137 * cached block. So we simply search within the cached block and insert
138 * the key at the appropriate location. We call it a fastpath.
139 *
140 * Testing has revealed, though, that the fastpath can result in increased
141 * contention on the exclusive-lock on the rightmost leaf page. So we
142 * conditionally check if the lock is available. If it's not available
143 * then we simply abandon the fastpath and take the regular path. This
144 * makes sense because unavailability of the lock also signals that some
145 * other backend might be concurrently inserting into the page, thus
146 * reducing our chances to finding an insertion place in this page.
147 */
148top:
149 fastpath = false;
150 if (RelationGetTargetBlock(rel) != InvalidBlockNumber)
151 {
152 Page page;
153 BTPageOpaque lpageop;
154
155 /*
156 * Conditionally acquire exclusive lock on the buffer before doing any
157 * checks. If we don't get the lock, we simply follow slowpath. If we
158 * do get the lock, this ensures that the index state cannot change,
159 * as far as the rightmost part of the index is concerned.
160 */
161 buf = ReadBuffer(rel, RelationGetTargetBlock(rel));
162
163 if (ConditionalLockBuffer(buf))
164 {
165 _bt_checkpage(rel, buf);
166
167 page = BufferGetPage(buf);
168
169 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
170
171 /*
172 * Check if the page is still the rightmost leaf page, has enough
173 * free space to accommodate the new tuple, and the insertion scan
174 * key is strictly greater than the first key on the page.
175 */
176 if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) &&
177 !P_IGNORE(lpageop) &&
178 (PageGetFreeSpace(page) > insertstate.itemsz) &&
179 PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
180 _bt_compare(rel, itup_key, page, P_FIRSTDATAKEY(lpageop)) > 0)
181 {
182 /*
183 * The right-most block should never have an incomplete split.
184 * But be paranoid and check for it anyway.
185 */
186 Assert(!P_INCOMPLETE_SPLIT(lpageop));
187 fastpath = true;
188 }
189 else
190 {
191 _bt_relbuf(rel, buf);
192
193 /*
194 * Something did not work out. Just forget about the cached
195 * block and follow the normal path. It might be set again if
196 * the conditions are favourable.
197 */
198 RelationSetTargetBlock(rel, InvalidBlockNumber);
199 }
200 }
201 else
202 {
203 ReleaseBuffer(buf);
204
205 /*
206 * If someone's holding a lock, it's likely to change anyway, so
207 * don't try again until we get an updated rightmost leaf.
208 */
209 RelationSetTargetBlock(rel, InvalidBlockNumber);
210 }
211 }
212
213 if (!fastpath)
214 {
215 /*
216 * Find the first page containing this key. Buffer returned by
217 * _bt_search() is locked in exclusive mode.
218 */
219 stack = _bt_search(rel, itup_key, &buf, BT_WRITE, NULL);
220 }
221
222 insertstate.buf = buf;
223 buf = InvalidBuffer; /* insertstate.buf now owns the buffer */
224
225 /*
226 * If we're not allowing duplicates, make sure the key isn't already in
227 * the index.
228 *
229 * NOTE: obviously, _bt_check_unique can only detect keys that are already
230 * in the index; so it cannot defend against concurrent insertions of the
231 * same key. We protect against that by means of holding a write lock on
232 * the first page the value could be on, with omitted/-inf value for the
233 * implicit heap TID tiebreaker attribute. Any other would-be inserter of
234 * the same key must acquire a write lock on the same page, so only one
235 * would-be inserter can be making the check at one time. Furthermore,
236 * once we are past the check we hold write locks continuously until we
237 * have performed our insertion, so no later inserter can fail to see our
238 * insertion. (This requires some care in _bt_findinsertloc.)
239 *
240 * If we must wait for another xact, we release the lock while waiting,
241 * and then must start over completely.
242 *
243 * For a partial uniqueness check, we don't wait for the other xact. Just
244 * let the tuple in and return false for possibly non-unique, or true for
245 * definitely unique.
246 */
247 if (checkingunique)
248 {
249 TransactionId xwait;
250 uint32 speculativeToken;
251
252 xwait = _bt_check_unique(rel, &insertstate, heapRel, checkUnique,
253 &is_unique, &speculativeToken);
254
255 if (TransactionIdIsValid(xwait))
256 {
257 /* Have to wait for the other guy ... */
258 _bt_relbuf(rel, insertstate.buf);
259 insertstate.buf = InvalidBuffer;
260
261 /*
262 * If it's a speculative insertion, wait for it to finish (ie. to
263 * go ahead with the insertion, or kill the tuple). Otherwise
264 * wait for the transaction to finish as usual.
265 */
266 if (speculativeToken)
267 SpeculativeInsertionWait(xwait, speculativeToken);
268 else
269 XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
270
271 /* start over... */
272 if (stack)
273 _bt_freestack(stack);
274 goto top;
275 }
276
277 /* Uniqueness is established -- restore heap tid as scantid */
278 if (itup_key->heapkeyspace)
279 itup_key->scantid = &itup->t_tid;
280 }
281
282 if (checkUnique != UNIQUE_CHECK_EXISTING)
283 {
284 OffsetNumber newitemoff;
285
286 /*
287 * The only conflict predicate locking cares about for indexes is when
288 * an index tuple insert conflicts with an existing lock. We don't
289 * know the actual page we're going to insert on for sure just yet in
290 * checkingunique and !heapkeyspace cases, but it's okay to use the
291 * first page the value could be on (with scantid omitted) instead.
292 */
293 CheckForSerializableConflictIn(rel, NULL, insertstate.buf);
294
295 /*
296 * Do the insertion. Note that insertstate contains cached binary
297 * search bounds established within _bt_check_unique when insertion is
298 * checkingunique.
299 */
300 newitemoff = _bt_findinsertloc(rel, &insertstate, checkingunique,
301 stack, heapRel);
302 _bt_insertonpg(rel, itup_key, insertstate.buf, InvalidBuffer, stack,
303 itup, newitemoff, false);
304 }
305 else
306 {
307 /* just release the buffer */
308 _bt_relbuf(rel, insertstate.buf);
309 }
310
311 /* be tidy */
312 if (stack)
313 _bt_freestack(stack);
314 pfree(itup_key);
315
316 return is_unique;
317}
318
319/*
320 * _bt_check_unique() -- Check for violation of unique index constraint
321 *
322 * Returns InvalidTransactionId if there is no conflict, else an xact ID
323 * we must wait for to see if it commits a conflicting tuple. If an actual
324 * conflict is detected, no return --- just ereport(). If an xact ID is
325 * returned, and the conflicting tuple still has a speculative insertion in
326 * progress, *speculativeToken is set to non-zero, and the caller can wait for
327 * the verdict on the insertion using SpeculativeInsertionWait().
328 *
329 * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
330 * InvalidTransactionId because we don't want to wait. In this case we
331 * set *is_unique to false if there is a potential conflict, and the
332 * core code must redo the uniqueness check later.
333 *
334 * As a side-effect, sets state in insertstate that can later be used by
335 * _bt_findinsertloc() to reuse most of the binary search work we do
336 * here.
337 *
338 * Do not call here when there are NULL values in scan key. NULL should be
339 * considered unequal to NULL when checking for duplicates, but we are not
340 * prepared to handle that correctly.
341 */
342static TransactionId
343_bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
344 IndexUniqueCheck checkUnique, bool *is_unique,
345 uint32 *speculativeToken)
346{
347 IndexTuple itup = insertstate->itup;
348 BTScanInsert itup_key = insertstate->itup_key;
349 SnapshotData SnapshotDirty;
350 OffsetNumber offset;
351 OffsetNumber maxoff;
352 Page page;
353 BTPageOpaque opaque;
354 Buffer nbuf = InvalidBuffer;
355 bool found = false;
356
357 /* Assume unique until we find a duplicate */
358 *is_unique = true;
359
360 InitDirtySnapshot(SnapshotDirty);
361
362 page = BufferGetPage(insertstate->buf);
363 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
364 maxoff = PageGetMaxOffsetNumber(page);
365
366 /*
367 * Find the first tuple with the same key.
368 *
369 * This also saves the binary search bounds in insertstate. We use them
370 * in the fastpath below, but also in the _bt_findinsertloc() call later.
371 */
372 Assert(!insertstate->bounds_valid);
373 offset = _bt_binsrch_insert(rel, insertstate);
374
375 /*
376 * Scan over all equal tuples, looking for live conflicts.
377 */
378 Assert(!insertstate->bounds_valid || insertstate->low == offset);
379 Assert(!itup_key->anynullkeys);
380 Assert(itup_key->scantid == NULL);
381 for (;;)
382 {
383 ItemId curitemid;
384 IndexTuple curitup;
385 BlockNumber nblkno;
386
387 /*
388 * make sure the offset points to an actual item before trying to
389 * examine it...
390 */
391 if (offset <= maxoff)
392 {
393 /*
394 * Fastpath: In most cases, we can use cached search bounds to
395 * limit our consideration to items that are definitely
396 * duplicates. This fastpath doesn't apply when the original page
397 * is empty, or when initial offset is past the end of the
398 * original page, which may indicate that we need to examine a
399 * second or subsequent page.
400 *
401 * Note that this optimization allows us to avoid calling
402 * _bt_compare() directly when there are no duplicates, as long as
403 * the offset where the key will go is not at the end of the page.
404 */
405 if (nbuf == InvalidBuffer && offset == insertstate->stricthigh)
406 {
407 Assert(insertstate->bounds_valid);
408 Assert(insertstate->low >= P_FIRSTDATAKEY(opaque));
409 Assert(insertstate->low <= insertstate->stricthigh);
410 Assert(_bt_compare(rel, itup_key, page, offset) < 0);
411 break;
412 }
413
414 curitemid = PageGetItemId(page, offset);
415
416 /*
417 * We can skip items that are marked killed.
418 *
419 * In the presence of heavy update activity an index may contain
420 * many killed items with the same key; running _bt_compare() on
421 * each killed item gets expensive. Just advance over killed
422 * items as quickly as we can. We only apply _bt_compare() when
423 * we get to a non-killed item. Even those comparisons could be
424 * avoided (in the common case where there is only one page to
425 * visit) by reusing bounds, but just skipping dead items is fast
426 * enough.
427 */
428 if (!ItemIdIsDead(curitemid))
429 {
430 ItemPointerData htid;
431 bool all_dead;
432
433 if (_bt_compare(rel, itup_key, page, offset) != 0)
434 break; /* we're past all the equal tuples */
435
436 /* okay, we gotta fetch the heap tuple ... */
437 curitup = (IndexTuple) PageGetItem(page, curitemid);
438 htid = curitup->t_tid;
439
440 /*
441 * If we are doing a recheck, we expect to find the tuple we
442 * are rechecking. It's not a duplicate, but we have to keep
443 * scanning.
444 */
445 if (checkUnique == UNIQUE_CHECK_EXISTING &&
446 ItemPointerCompare(&htid, &itup->t_tid) == 0)
447 {
448 found = true;
449 }
450
451 /*
452 * Check if there's any table tuples for this index entry
453 * satisfying SnapshotDirty. This is necessary because for AMs
454 * with optimizations like heap's HOT, we have just a single
455 * index entry for the entire chain.
456 */
457 else if (table_index_fetch_tuple_check(heapRel, &htid,
458 &SnapshotDirty,
459 &all_dead))
460 {
461 TransactionId xwait;
462
463 /*
464 * It is a duplicate. If we are only doing a partial
465 * check, then don't bother checking if the tuple is being
466 * updated in another transaction. Just return the fact
467 * that it is a potential conflict and leave the full
468 * check till later. Don't invalidate binary search
469 * bounds.
470 */
471 if (checkUnique == UNIQUE_CHECK_PARTIAL)
472 {
473 if (nbuf != InvalidBuffer)
474 _bt_relbuf(rel, nbuf);
475 *is_unique = false;
476 return InvalidTransactionId;
477 }
478
479 /*
480 * If this tuple is being updated by other transaction
481 * then we have to wait for its commit/abort.
482 */
483 xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
484 SnapshotDirty.xmin : SnapshotDirty.xmax;
485
486 if (TransactionIdIsValid(xwait))
487 {
488 if (nbuf != InvalidBuffer)
489 _bt_relbuf(rel, nbuf);
490 /* Tell _bt_doinsert to wait... */
491 *speculativeToken = SnapshotDirty.speculativeToken;
492 /* Caller releases lock on buf immediately */
493 insertstate->bounds_valid = false;
494 return xwait;
495 }
496
497 /*
498 * Otherwise we have a definite conflict. But before
499 * complaining, look to see if the tuple we want to insert
500 * is itself now committed dead --- if so, don't complain.
501 * This is a waste of time in normal scenarios but we must
502 * do it to support CREATE INDEX CONCURRENTLY.
503 *
504 * We must follow HOT-chains here because during
505 * concurrent index build, we insert the root TID though
506 * the actual tuple may be somewhere in the HOT-chain.
507 * While following the chain we might not stop at the
508 * exact tuple which triggered the insert, but that's OK
509 * because if we find a live tuple anywhere in this chain,
510 * we have a unique key conflict. The other live tuple is
511 * not part of this chain because it had a different index
512 * entry.
513 */
514 htid = itup->t_tid;
515 if (table_index_fetch_tuple_check(heapRel, &htid,
516 SnapshotSelf, NULL))
517 {
518 /* Normal case --- it's still live */
519 }
520 else
521 {
522 /*
523 * It's been deleted, so no error, and no need to
524 * continue searching
525 */
526 break;
527 }
528
529 /*
530 * Check for a conflict-in as we would if we were going to
531 * write to this page. We aren't actually going to write,
532 * but we want a chance to report SSI conflicts that would
533 * otherwise be masked by this unique constraint
534 * violation.
535 */
536 CheckForSerializableConflictIn(rel, NULL, insertstate->buf);
537
538 /*
539 * This is a definite conflict. Break the tuple down into
540 * datums and report the error. But first, make sure we
541 * release the buffer locks we're holding ---
542 * BuildIndexValueDescription could make catalog accesses,
543 * which in the worst case might touch this same index and
544 * cause deadlocks.
545 */
546 if (nbuf != InvalidBuffer)
547 _bt_relbuf(rel, nbuf);
548 _bt_relbuf(rel, insertstate->buf);
549 insertstate->buf = InvalidBuffer;
550 insertstate->bounds_valid = false;
551
552 {
553 Datum values[INDEX_MAX_KEYS];
554 bool isnull[INDEX_MAX_KEYS];
555 char *key_desc;
556
557 index_deform_tuple(itup, RelationGetDescr(rel),
558 values, isnull);
559
560 key_desc = BuildIndexValueDescription(rel, values,
561 isnull);
562
563 ereport(ERROR,
564 (errcode(ERRCODE_UNIQUE_VIOLATION),
565 errmsg("duplicate key value violates unique constraint \"%s\"",
566 RelationGetRelationName(rel)),
567 key_desc ? errdetail("Key %s already exists.",
568 key_desc) : 0,
569 errtableconstraint(heapRel,
570 RelationGetRelationName(rel))));
571 }
572 }
573 else if (all_dead)
574 {
575 /*
576 * The conflicting tuple (or whole HOT chain) is dead to
577 * everyone, so we may as well mark the index entry
578 * killed.
579 */
580 ItemIdMarkDead(curitemid);
581 opaque->btpo_flags |= BTP_HAS_GARBAGE;
582
583 /*
584 * Mark buffer with a dirty hint, since state is not
585 * crucial. Be sure to mark the proper buffer dirty.
586 */
587 if (nbuf != InvalidBuffer)
588 MarkBufferDirtyHint(nbuf, true);
589 else
590 MarkBufferDirtyHint(insertstate->buf, true);
591 }
592 }
593 }
594
595 /*
596 * Advance to next tuple to continue checking.
597 */
598 if (offset < maxoff)
599 offset = OffsetNumberNext(offset);
600 else
601 {
602 int highkeycmp;
603
604 /* If scankey == hikey we gotta check the next page too */
605 if (P_RIGHTMOST(opaque))
606 break;
607 highkeycmp = _bt_compare(rel, itup_key, page, P_HIKEY);
608 Assert(highkeycmp <= 0);
609 if (highkeycmp != 0)
610 break;
611 /* Advance to next non-dead page --- there must be one */
612 for (;;)
613 {
614 nblkno = opaque->btpo_next;
615 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
616 page = BufferGetPage(nbuf);
617 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
618 if (!P_IGNORE(opaque))
619 break;
620 if (P_RIGHTMOST(opaque))
621 elog(ERROR, "fell off the end of index \"%s\"",
622 RelationGetRelationName(rel));
623 }
624 maxoff = PageGetMaxOffsetNumber(page);
625 offset = P_FIRSTDATAKEY(opaque);
626 /* Don't invalidate binary search bounds */
627 }
628 }
629
630 /*
631 * If we are doing a recheck then we should have found the tuple we are
632 * checking. Otherwise there's something very wrong --- probably, the
633 * index is on a non-immutable expression.
634 */
635 if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
636 ereport(ERROR,
637 (errcode(ERRCODE_INTERNAL_ERROR),
638 errmsg("failed to re-find tuple within index \"%s\"",
639 RelationGetRelationName(rel)),
640 errhint("This may be because of a non-immutable index expression."),
641 errtableconstraint(heapRel,
642 RelationGetRelationName(rel))));
643
644 if (nbuf != InvalidBuffer)
645 _bt_relbuf(rel, nbuf);
646
647 return InvalidTransactionId;
648}
649
650
651/*
652 * _bt_findinsertloc() -- Finds an insert location for a tuple
653 *
654 * On entry, insertstate buffer contains the page the new tuple belongs
655 * on. It is exclusive-locked and pinned by the caller.
656 *
657 * If 'checkingunique' is true, the buffer on entry is the first page
658 * that contains duplicates of the new key. If there are duplicates on
659 * multiple pages, the correct insertion position might be some page to
660 * the right, rather than the first page. In that case, this function
661 * moves right to the correct target page.
662 *
663 * (In a !heapkeyspace index, there can be multiple pages with the same
664 * high key, where the new tuple could legitimately be placed on. In
665 * that case, the caller passes the first page containing duplicates,
666 * just like when checkingunique=true. If that page doesn't have enough
667 * room for the new tuple, this function moves right, trying to find a
668 * legal page that does.)
669 *
670 * On exit, insertstate buffer contains the chosen insertion page, and
671 * the offset within that page is returned. If _bt_findinsertloc needed
672 * to move right, the lock and pin on the original page are released, and
673 * the new buffer is exclusively locked and pinned instead.
674 *
675 * If insertstate contains cached binary search bounds, we will take
676 * advantage of them. This avoids repeating comparisons that we made in
677 * _bt_check_unique() already.
678 *
679 * If there is not enough room on the page for the new tuple, we try to
680 * make room by removing any LP_DEAD tuples.
681 */
682static OffsetNumber
683_bt_findinsertloc(Relation rel,
684 BTInsertState insertstate,
685 bool checkingunique,
686 BTStack stack,
687 Relation heapRel)
688{
689 BTScanInsert itup_key = insertstate->itup_key;
690 Page page = BufferGetPage(insertstate->buf);
691 BTPageOpaque lpageop;
692
693 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
694
695 /* Check 1/3 of a page restriction */
696 if (unlikely(insertstate->itemsz > BTMaxItemSize(page)))
697 _bt_check_third_page(rel, heapRel, itup_key->heapkeyspace, page,
698 insertstate->itup);
699
700 Assert(P_ISLEAF(lpageop) && !P_INCOMPLETE_SPLIT(lpageop));
701 Assert(!insertstate->bounds_valid || checkingunique);
702 Assert(!itup_key->heapkeyspace || itup_key->scantid != NULL);
703 Assert(itup_key->heapkeyspace || itup_key->scantid == NULL);
704
705 if (itup_key->heapkeyspace)
706 {
707 /*
708 * If we're inserting into a unique index, we may have to walk right
709 * through leaf pages to find the one leaf page that we must insert on
710 * to.
711 *
712 * This is needed for checkingunique callers because a scantid was not
713 * used when we called _bt_search(). scantid can only be set after
714 * _bt_check_unique() has checked for duplicates. The buffer
715 * initially stored in insertstate->buf has the page where the first
716 * duplicate key might be found, which isn't always the page that new
717 * tuple belongs on. The heap TID attribute for new tuple (scantid)
718 * could force us to insert on a sibling page, though that should be
719 * very rare in practice.
720 */
721 if (checkingunique)
722 {
723 for (;;)
724 {
725 /*
726 * Does the new tuple belong on this page?
727 *
728 * The earlier _bt_check_unique() call may well have
729 * established a strict upper bound on the offset for the new
730 * item. If it's not the last item of the page (i.e. if there
731 * is at least one tuple on the page that goes after the tuple
732 * we're inserting) then we know that the tuple belongs on
733 * this page. We can skip the high key check.
734 */
735 if (insertstate->bounds_valid &&
736 insertstate->low <= insertstate->stricthigh &&
737 insertstate->stricthigh <= PageGetMaxOffsetNumber(page))
738 break;
739
740 /* Test '<=', not '!=', since scantid is set now */
741 if (P_RIGHTMOST(lpageop) ||
742 _bt_compare(rel, itup_key, page, P_HIKEY) <= 0)
743 break;
744
745 _bt_stepright(rel, insertstate, stack);
746 /* Update local state after stepping right */
747 page = BufferGetPage(insertstate->buf);
748 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
749 }
750 }
751
752 /*
753 * If the target page is full, see if we can obtain enough space by
754 * erasing LP_DEAD items
755 */
756 if (PageGetFreeSpace(page) < insertstate->itemsz &&
757 P_HAS_GARBAGE(lpageop))
758 {
759 _bt_vacuum_one_page(rel, insertstate->buf, heapRel);
760 insertstate->bounds_valid = false;
761 }
762 }
763 else
764 {
765 /*----------
766 * This is a !heapkeyspace (version 2 or 3) index. The current page
767 * is the first page that we could insert the new tuple to, but there
768 * may be other pages to the right that we could opt to use instead.
769 *
770 * If the new key is equal to one or more existing keys, we can
771 * legitimately place it anywhere in the series of equal keys. In
772 * fact, if the new key is equal to the page's "high key" we can place
773 * it on the next page. If it is equal to the high key, and there's
774 * not room to insert the new tuple on the current page without
775 * splitting, then we move right hoping to find more free space and
776 * avoid a split.
777 *
778 * Keep scanning right until we
779 * (a) find a page with enough free space,
780 * (b) reach the last page where the tuple can legally go, or
781 * (c) get tired of searching.
782 * (c) is not flippant; it is important because if there are many
783 * pages' worth of equal keys, it's better to split one of the early
784 * pages than to scan all the way to the end of the run of equal keys
785 * on every insert. We implement "get tired" as a random choice,
786 * since stopping after scanning a fixed number of pages wouldn't work
787 * well (we'd never reach the right-hand side of previously split
788 * pages). The probability of moving right is set at 0.99, which may
789 * seem too high to change the behavior much, but it does an excellent
790 * job of preventing O(N^2) behavior with many equal keys.
791 *----------
792 */
793 while (PageGetFreeSpace(page) < insertstate->itemsz)
794 {
795 /*
796 * Before considering moving right, see if we can obtain enough
797 * space by erasing LP_DEAD items
798 */
799 if (P_HAS_GARBAGE(lpageop))
800 {
801 _bt_vacuum_one_page(rel, insertstate->buf, heapRel);
802 insertstate->bounds_valid = false;
803
804 if (PageGetFreeSpace(page) >= insertstate->itemsz)
805 break; /* OK, now we have enough space */
806 }
807
808 /*
809 * Nope, so check conditions (b) and (c) enumerated above
810 *
811 * The earlier _bt_check_unique() call may well have established a
812 * strict upper bound on the offset for the new item. If it's not
813 * the last item of the page (i.e. if there is at least one tuple
814 * on the page that's greater than the tuple we're inserting to)
815 * then we know that the tuple belongs on this page. We can skip
816 * the high key check.
817 */
818 if (insertstate->bounds_valid &&
819 insertstate->low <= insertstate->stricthigh &&
820 insertstate->stricthigh <= PageGetMaxOffsetNumber(page))
821 break;
822
823 if (P_RIGHTMOST(lpageop) ||
824 _bt_compare(rel, itup_key, page, P_HIKEY) != 0 ||
825 random() <= (MAX_RANDOM_VALUE / 100))
826 break;
827
828 _bt_stepright(rel, insertstate, stack);
829 /* Update local state after stepping right */
830 page = BufferGetPage(insertstate->buf);
831 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
832 }
833 }
834
835 /*
836 * We should now be on the correct page. Find the offset within the page
837 * for the new tuple. (Possibly reusing earlier search bounds.)
838 */
839 Assert(P_RIGHTMOST(lpageop) ||
840 _bt_compare(rel, itup_key, page, P_HIKEY) <= 0);
841
842 return _bt_binsrch_insert(rel, insertstate);
843}
844
845/*
846 * Step right to next non-dead page, during insertion.
847 *
848 * This is a bit more complicated than moving right in a search. We must
849 * write-lock the target page before releasing write lock on current page;
850 * else someone else's _bt_check_unique scan could fail to see our insertion.
851 * Write locks on intermediate dead pages won't do because we don't know when
852 * they will get de-linked from the tree.
853 *
854 * This is more aggressive than it needs to be for non-unique !heapkeyspace
855 * indexes.
856 */
857static void
858_bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack)
859{
860 Page page;
861 BTPageOpaque lpageop;
862 Buffer rbuf;
863 BlockNumber rblkno;
864
865 page = BufferGetPage(insertstate->buf);
866 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
867
868 rbuf = InvalidBuffer;
869 rblkno = lpageop->btpo_next;
870 for (;;)
871 {
872 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
873 page = BufferGetPage(rbuf);
874 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
875
876 /*
877 * If this page was incompletely split, finish the split now. We do
878 * this while holding a lock on the left sibling, which is not good
879 * because finishing the split could be a fairly lengthy operation.
880 * But this should happen very seldom.
881 */
882 if (P_INCOMPLETE_SPLIT(lpageop))
883 {
884 _bt_finish_split(rel, rbuf, stack);
885 rbuf = InvalidBuffer;
886 continue;
887 }
888
889 if (!P_IGNORE(lpageop))
890 break;
891 if (P_RIGHTMOST(lpageop))
892 elog(ERROR, "fell off the end of index \"%s\"",
893 RelationGetRelationName(rel));
894
895 rblkno = lpageop->btpo_next;
896 }
897 /* rbuf locked; unlock buf, update state for caller */
898 _bt_relbuf(rel, insertstate->buf);
899 insertstate->buf = rbuf;
900 insertstate->bounds_valid = false;
901}
902
903/*----------
904 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
905 *
906 * This recursive procedure does the following things:
907 *
908 * + if necessary, splits the target page, using 'itup_key' for
909 * suffix truncation on leaf pages (caller passes NULL for
910 * non-leaf pages).
911 * + inserts the tuple.
912 * + if the page was split, pops the parent stack, and finds the
913 * right place to insert the new child pointer (by walking
914 * right using information stored in the parent stack).
915 * + invokes itself with the appropriate tuple for the right
916 * child page on the parent.
917 * + updates the metapage if a true root or fast root is split.
918 *
919 * On entry, we must have the correct buffer in which to do the
920 * insertion, and the buffer must be pinned and write-locked. On return,
921 * we will have dropped both the pin and the lock on the buffer.
922 *
923 * This routine only performs retail tuple insertions. 'itup' should
924 * always be either a non-highkey leaf item, or a downlink (new high
925 * key items are created indirectly, when a page is split). When
926 * inserting to a non-leaf page, 'cbuf' is the left-sibling of the page
927 * we're inserting the downlink for. This function will clear the
928 * INCOMPLETE_SPLIT flag on it, and release the buffer.
929 *----------
930 */
931static void
932_bt_insertonpg(Relation rel,
933 BTScanInsert itup_key,
934 Buffer buf,
935 Buffer cbuf,
936 BTStack stack,
937 IndexTuple itup,
938 OffsetNumber newitemoff,
939 bool split_only_page)
940{
941 Page page;
942 BTPageOpaque lpageop;
943 Size itemsz;
944
945 page = BufferGetPage(buf);
946 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
947
948 /* child buffer must be given iff inserting on an internal page */
949 Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
950 /* tuple must have appropriate number of attributes */
951 Assert(!P_ISLEAF(lpageop) ||
952 BTreeTupleGetNAtts(itup, rel) ==
953 IndexRelationGetNumberOfAttributes(rel));
954 Assert(P_ISLEAF(lpageop) ||
955 BTreeTupleGetNAtts(itup, rel) <=
956 IndexRelationGetNumberOfKeyAttributes(rel));
957
958 /* The caller should've finished any incomplete splits already. */
959 if (P_INCOMPLETE_SPLIT(lpageop))
960 elog(ERROR, "cannot insert to incompletely split page %u",
961 BufferGetBlockNumber(buf));
962
963 itemsz = IndexTupleSize(itup);
964 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
965 * need to be consistent */
966
967 /*
968 * Do we need to split the page to fit the item on it?
969 *
970 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
971 * so this comparison is correct even though we appear to be accounting
972 * only for the item and not for its line pointer.
973 */
974 if (PageGetFreeSpace(page) < itemsz)
975 {
976 bool is_root = P_ISROOT(lpageop);
977 bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
978 Buffer rbuf;
979
980 /*
981 * If we're here then a pagesplit is needed. We should never reach
982 * here if we're using the fastpath since we should have checked for
983 * all the required conditions, including the fact that this page has
984 * enough freespace. Note that this routine can in theory deal with
985 * the situation where a NULL stack pointer is passed (that's what
986 * would happen if the fastpath is taken). But that path is much
987 * slower, defeating the very purpose of the optimization. The
988 * following assertion should protect us from any future code changes
989 * that invalidate those assumptions.
990 *
991 * Note that whenever we fail to take the fastpath, we clear the
992 * cached block. Checking for a valid cached block at this point is
993 * enough to decide whether we're in a fastpath or not.
994 */
995 Assert(!(P_ISLEAF(lpageop) &&
996 BlockNumberIsValid(RelationGetTargetBlock(rel))));
997
998 /* split the buffer into left and right halves */
999 rbuf = _bt_split(rel, itup_key, buf, cbuf, newitemoff, itemsz, itup);
1000 PredicateLockPageSplit(rel,
1001 BufferGetBlockNumber(buf),
1002 BufferGetBlockNumber(rbuf));
1003
1004 /*----------
1005 * By here,
1006 *
1007 * + our target page has been split;
1008 * + the original tuple has been inserted;
1009 * + we have write locks on both the old (left half)
1010 * and new (right half) buffers, after the split; and
1011 * + we know the key we want to insert into the parent
1012 * (it's the "high key" on the left child page).
1013 *
1014 * We're ready to do the parent insertion. We need to hold onto the
1015 * locks for the child pages until we locate the parent, but we can
1016 * at least release the lock on the right child before doing the
1017 * actual insertion. The lock on the left child will be released
1018 * last of all by parent insertion, where it is the 'cbuf' of parent
1019 * page.
1020 *----------
1021 */
1022 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
1023 }
1024 else
1025 {
1026 Buffer metabuf = InvalidBuffer;
1027 Page metapg = NULL;
1028 BTMetaPageData *metad = NULL;
1029 OffsetNumber itup_off;
1030 BlockNumber itup_blkno;
1031 BlockNumber cachedBlock = InvalidBlockNumber;
1032
1033 itup_off = newitemoff;
1034 itup_blkno = BufferGetBlockNumber(buf);
1035
1036 /*
1037 * If we are doing this insert because we split a page that was the
1038 * only one on its tree level, but was not the root, it may have been
1039 * the "fast root". We need to ensure that the fast root link points
1040 * at or above the current page. We can safely acquire a lock on the
1041 * metapage here --- see comments for _bt_newroot().
1042 */
1043 if (split_only_page)
1044 {
1045 Assert(!P_ISLEAF(lpageop));
1046
1047 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1048 metapg = BufferGetPage(metabuf);
1049 metad = BTPageGetMeta(metapg);
1050
1051 if (metad->btm_fastlevel >= lpageop->btpo.level)
1052 {
1053 /* no update wanted */
1054 _bt_relbuf(rel, metabuf);
1055 metabuf = InvalidBuffer;
1056 }
1057 }
1058
1059 /*
1060 * Every internal page should have exactly one negative infinity item
1061 * at all times. Only _bt_split() and _bt_newroot() should add items
1062 * that become negative infinity items through truncation, since
1063 * they're the only routines that allocate new internal pages. Do not
1064 * allow a retail insertion of a new item at the negative infinity
1065 * offset.
1066 */
1067 if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
1068 elog(ERROR, "cannot insert second negative infinity item in block %u of index \"%s\"",
1069 itup_blkno, RelationGetRelationName(rel));
1070
1071 /* Do the update. No ereport(ERROR) until changes are logged */
1072 START_CRIT_SECTION();
1073
1074 if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
1075 elog(PANIC, "failed to add new item to block %u in index \"%s\"",
1076 itup_blkno, RelationGetRelationName(rel));
1077
1078 MarkBufferDirty(buf);
1079
1080 if (BufferIsValid(metabuf))
1081 {
1082 /* upgrade meta-page if needed */
1083 if (metad->btm_version < BTREE_NOVAC_VERSION)
1084 _bt_upgrademetapage(metapg);
1085 metad->btm_fastroot = itup_blkno;
1086 metad->btm_fastlevel = lpageop->btpo.level;
1087 MarkBufferDirty(metabuf);
1088 }
1089
1090 /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
1091 if (BufferIsValid(cbuf))
1092 {
1093 Page cpage = BufferGetPage(cbuf);
1094 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1095
1096 Assert(P_INCOMPLETE_SPLIT(cpageop));
1097 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1098 MarkBufferDirty(cbuf);
1099 }
1100
1101 /*
1102 * Cache the block information if we just inserted into the rightmost
1103 * leaf page of the index and it's not the root page. For very small
1104 * index where root is also the leaf, there is no point trying for any
1105 * optimization.
1106 */
1107 if (P_RIGHTMOST(lpageop) && P_ISLEAF(lpageop) && !P_ISROOT(lpageop))
1108 cachedBlock = BufferGetBlockNumber(buf);
1109
1110 /* XLOG stuff */
1111 if (RelationNeedsWAL(rel))
1112 {
1113 xl_btree_insert xlrec;
1114 xl_btree_metadata xlmeta;
1115 uint8 xlinfo;
1116 XLogRecPtr recptr;
1117
1118 xlrec.offnum = itup_off;
1119
1120 XLogBeginInsert();
1121 XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
1122
1123 if (P_ISLEAF(lpageop))
1124 xlinfo = XLOG_BTREE_INSERT_LEAF;
1125 else
1126 {
1127 /*
1128 * Register the left child whose INCOMPLETE_SPLIT flag was
1129 * cleared.
1130 */
1131 XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
1132
1133 xlinfo = XLOG_BTREE_INSERT_UPPER;
1134 }
1135
1136 if (BufferIsValid(metabuf))
1137 {
1138 Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
1139 xlmeta.version = metad->btm_version;
1140 xlmeta.root = metad->btm_root;
1141 xlmeta.level = metad->btm_level;
1142 xlmeta.fastroot = metad->btm_fastroot;
1143 xlmeta.fastlevel = metad->btm_fastlevel;
1144 xlmeta.oldest_btpo_xact = metad->btm_oldest_btpo_xact;
1145 xlmeta.last_cleanup_num_heap_tuples =
1146 metad->btm_last_cleanup_num_heap_tuples;
1147
1148 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
1149 XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
1150
1151 xlinfo = XLOG_BTREE_INSERT_META;
1152 }
1153
1154 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1155 XLogRegisterBufData(0, (char *) itup, IndexTupleSize(itup));
1156
1157 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1158
1159 if (BufferIsValid(metabuf))
1160 {
1161 PageSetLSN(metapg, recptr);
1162 }
1163 if (BufferIsValid(cbuf))
1164 {
1165 PageSetLSN(BufferGetPage(cbuf), recptr);
1166 }
1167
1168 PageSetLSN(page, recptr);
1169 }
1170
1171 END_CRIT_SECTION();
1172
1173 /* release buffers */
1174 if (BufferIsValid(metabuf))
1175 _bt_relbuf(rel, metabuf);
1176 if (BufferIsValid(cbuf))
1177 _bt_relbuf(rel, cbuf);
1178 _bt_relbuf(rel, buf);
1179
1180 /*
1181 * If we decided to cache the insertion target block, then set it now.
1182 * But before that, check for the height of the tree and don't go for
1183 * the optimization for small indexes. We defer that check to this
1184 * point to ensure that we don't call _bt_getrootheight while holding
1185 * lock on any other block.
1186 *
1187 * We do this after dropping locks on all buffers. So the information
1188 * about whether the insertion block is still the rightmost block or
1189 * not may have changed in between. But we will deal with that during
1190 * next insert operation. No special care is required while setting
1191 * it.
1192 */
1193 if (BlockNumberIsValid(cachedBlock) &&
1194 _bt_getrootheight(rel) >= BTREE_FASTPATH_MIN_LEVEL)
1195 RelationSetTargetBlock(rel, cachedBlock);
1196 }
1197}
1198
1199/*
1200 * _bt_split() -- split a page in the btree.
1201 *
1202 * On entry, buf is the page to split, and is pinned and write-locked.
1203 * newitemoff etc. tell us about the new item that must be inserted
1204 * along with the data from the original page.
1205 *
1206 * itup_key is used for suffix truncation on leaf pages (internal
1207 * page callers pass NULL). When splitting a non-leaf page, 'cbuf'
1208 * is the left-sibling of the page we're inserting the downlink for.
1209 * This function will clear the INCOMPLETE_SPLIT flag on it, and
1210 * release the buffer.
1211 *
1212 * Returns the new right sibling of buf, pinned and write-locked.
1213 * The pin and lock on buf are maintained.
1214 */
1215static Buffer
1216_bt_split(Relation rel, BTScanInsert itup_key, Buffer buf, Buffer cbuf,
1217 OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem)
1218{
1219 Buffer rbuf;
1220 Page origpage;
1221 Page leftpage,
1222 rightpage;
1223 BlockNumber origpagenumber,
1224 rightpagenumber;
1225 BTPageOpaque ropaque,
1226 lopaque,
1227 oopaque;
1228 Buffer sbuf = InvalidBuffer;
1229 Page spage = NULL;
1230 BTPageOpaque sopaque = NULL;
1231 Size itemsz;
1232 ItemId itemid;
1233 IndexTuple item;
1234 OffsetNumber leftoff,
1235 rightoff;
1236 OffsetNumber firstright;
1237 OffsetNumber maxoff;
1238 OffsetNumber i;
1239 bool newitemonleft,
1240 isleaf;
1241 IndexTuple lefthikey;
1242 int indnatts = IndexRelationGetNumberOfAttributes(rel);
1243 int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
1244
1245 /*
1246 * origpage is the original page to be split. leftpage is a temporary
1247 * buffer that receives the left-sibling data, which will be copied back
1248 * into origpage on success. rightpage is the new page that will receive
1249 * the right-sibling data.
1250 *
1251 * leftpage is allocated after choosing a split point. rightpage's new
1252 * buffer isn't acquired until after leftpage is initialized and has new
1253 * high key, the last point where splitting the page may fail (barring
1254 * corruption). Failing before acquiring new buffer won't have lasting
1255 * consequences, since origpage won't have been modified and leftpage is
1256 * only workspace.
1257 */
1258 origpage = BufferGetPage(buf);
1259 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1260 origpagenumber = BufferGetBlockNumber(buf);
1261
1262 /*
1263 * Choose a point to split origpage at.
1264 *
1265 * A split point can be thought of as a point _between_ two existing
1266 * tuples on origpage (lastleft and firstright tuples), provided you
1267 * pretend that the new item that didn't fit is already on origpage.
1268 *
1269 * Since origpage does not actually contain newitem, the representation of
1270 * split points needs to work with two boundary cases: splits where
1271 * newitem is lastleft, and splits where newitem is firstright.
1272 * newitemonleft resolves the ambiguity that would otherwise exist when
1273 * newitemoff == firstright. In all other cases it's clear which side of
1274 * the split every tuple goes on from context. newitemonleft is usually
1275 * (but not always) redundant information.
1276 */
1277 firstright = _bt_findsplitloc(rel, origpage, newitemoff, newitemsz,
1278 newitem, &newitemonleft);
1279
1280 /* Allocate temp buffer for leftpage */
1281 leftpage = PageGetTempPage(origpage);
1282 _bt_pageinit(leftpage, BufferGetPageSize(buf));
1283 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1284
1285 /*
1286 * leftpage won't be the root when we're done. Also, clear the SPLIT_END
1287 * and HAS_GARBAGE flags.
1288 */
1289 lopaque->btpo_flags = oopaque->btpo_flags;
1290 lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1291 /* set flag in leftpage indicating that rightpage has no downlink yet */
1292 lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1293 lopaque->btpo_prev = oopaque->btpo_prev;
1294 /* handle btpo_next after rightpage buffer acquired */
1295 lopaque->btpo.level = oopaque->btpo.level;
1296 /* handle btpo_cycleid after rightpage buffer acquired */
1297
1298 /*
1299 * Copy the original page's LSN into leftpage, which will become the
1300 * updated version of the page. We need this because XLogInsert will
1301 * examine the LSN and possibly dump it in a page image.
1302 */
1303 PageSetLSN(leftpage, PageGetLSN(origpage));
1304 isleaf = P_ISLEAF(oopaque);
1305
1306 /*
1307 * The "high key" for the new left page will be the first key that's going
1308 * to go into the new right page, or a truncated version if this is a leaf
1309 * page split.
1310 *
1311 * The high key for the left page is formed using the first item on the
1312 * right page, which may seem to be contrary to Lehman & Yao's approach of
1313 * using the left page's last item as its new high key when splitting on
1314 * the leaf level. It isn't, though: suffix truncation will leave the
1315 * left page's high key fully equal to the last item on the left page when
1316 * two tuples with equal key values (excluding heap TID) enclose the split
1317 * point. It isn't actually necessary for a new leaf high key to be equal
1318 * to the last item on the left for the L&Y "subtree" invariant to hold.
1319 * It's sufficient to make sure that the new leaf high key is strictly
1320 * less than the first item on the right leaf page, and greater than or
1321 * equal to (not necessarily equal to) the last item on the left leaf
1322 * page.
1323 *
1324 * In other words, when suffix truncation isn't possible, L&Y's exact
1325 * approach to leaf splits is taken. (Actually, even that is slightly
1326 * inaccurate. A tuple with all the keys from firstright but the heap TID
1327 * from lastleft will be used as the new high key, since the last left
1328 * tuple could be physically larger despite being opclass-equal in respect
1329 * of all attributes prior to the heap TID attribute.)
1330 */
1331 if (!newitemonleft && newitemoff == firstright)
1332 {
1333 /* incoming tuple will become first on right page */
1334 itemsz = newitemsz;
1335 item = newitem;
1336 }
1337 else
1338 {
1339 /* existing item at firstright will become first on right page */
1340 itemid = PageGetItemId(origpage, firstright);
1341 itemsz = ItemIdGetLength(itemid);
1342 item = (IndexTuple) PageGetItem(origpage, itemid);
1343 }
1344
1345 /*
1346 * Truncate unneeded key and non-key attributes of the high key item
1347 * before inserting it on the left page. This can only happen at the leaf
1348 * level, since in general all pivot tuple values originate from leaf
1349 * level high keys. A pivot tuple in a grandparent page must guide a
1350 * search not only to the correct parent page, but also to the correct
1351 * leaf page.
1352 */
1353 if (isleaf && (itup_key->heapkeyspace || indnatts != indnkeyatts))
1354 {
1355 IndexTuple lastleft;
1356
1357 /*
1358 * Determine which tuple will become the last on the left page. This
1359 * is needed to decide how many attributes from the first item on the
1360 * right page must remain in new high key for left page.
1361 */
1362 if (newitemonleft && newitemoff == firstright)
1363 {
1364 /* incoming tuple will become last on left page */
1365 lastleft = newitem;
1366 }
1367 else
1368 {
1369 OffsetNumber lastleftoff;
1370
1371 /* item just before firstright will become last on left page */
1372 lastleftoff = OffsetNumberPrev(firstright);
1373 Assert(lastleftoff >= P_FIRSTDATAKEY(oopaque));
1374 itemid = PageGetItemId(origpage, lastleftoff);
1375 lastleft = (IndexTuple) PageGetItem(origpage, itemid);
1376 }
1377
1378 Assert(lastleft != item);
1379 lefthikey = _bt_truncate(rel, lastleft, item, itup_key);
1380 itemsz = IndexTupleSize(lefthikey);
1381 itemsz = MAXALIGN(itemsz);
1382 }
1383 else
1384 lefthikey = item;
1385
1386 /*
1387 * Add new high key to leftpage
1388 */
1389 leftoff = P_HIKEY;
1390
1391 Assert(BTreeTupleGetNAtts(lefthikey, rel) > 0);
1392 Assert(BTreeTupleGetNAtts(lefthikey, rel) <= indnkeyatts);
1393 if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff,
1394 false, false) == InvalidOffsetNumber)
1395 elog(ERROR, "failed to add hikey to the left sibling"
1396 " while splitting block %u of index \"%s\"",
1397 origpagenumber, RelationGetRelationName(rel));
1398 leftoff = OffsetNumberNext(leftoff);
1399 /* be tidy */
1400 if (lefthikey != item)
1401 pfree(lefthikey);
1402
1403 /*
1404 * Acquire a new right page to split into, now that left page has a new
1405 * high key. From here on, it's not okay to throw an error without
1406 * zeroing rightpage first. This coding rule ensures that we won't
1407 * confuse future VACUUM operations, which might otherwise try to re-find
1408 * a downlink to a leftover junk page as the page undergoes deletion.
1409 *
1410 * It would be reasonable to start the critical section just after the new
1411 * rightpage buffer is acquired instead; that would allow us to avoid
1412 * leftover junk pages without bothering to zero rightpage. We do it this
1413 * way because it avoids an unnecessary PANIC when either origpage or its
1414 * existing sibling page are corrupt.
1415 */
1416 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1417 rightpage = BufferGetPage(rbuf);
1418 rightpagenumber = BufferGetBlockNumber(rbuf);
1419 /* rightpage was initialized by _bt_getbuf */
1420 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1421
1422 /*
1423 * Finish off remaining leftpage special area fields. They cannot be set
1424 * before both origpage (leftpage) and rightpage buffers are acquired and
1425 * locked.
1426 */
1427 lopaque->btpo_next = rightpagenumber;
1428 lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1429
1430 /*
1431 * rightpage won't be the root when we're done. Also, clear the SPLIT_END
1432 * and HAS_GARBAGE flags.
1433 */
1434 ropaque->btpo_flags = oopaque->btpo_flags;
1435 ropaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1436 ropaque->btpo_prev = origpagenumber;
1437 ropaque->btpo_next = oopaque->btpo_next;
1438 ropaque->btpo.level = oopaque->btpo.level;
1439 ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1440
1441 /*
1442 * Add new high key to rightpage where necessary.
1443 *
1444 * If the page we're splitting is not the rightmost page at its level in
1445 * the tree, then the first entry on the page is the high key from
1446 * origpage.
1447 */
1448 rightoff = P_HIKEY;
1449
1450 if (!P_RIGHTMOST(oopaque))
1451 {
1452 itemid = PageGetItemId(origpage, P_HIKEY);
1453 itemsz = ItemIdGetLength(itemid);
1454 item = (IndexTuple) PageGetItem(origpage, itemid);
1455 Assert(BTreeTupleGetNAtts(item, rel) > 0);
1456 Assert(BTreeTupleGetNAtts(item, rel) <= indnkeyatts);
1457 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1458 false, false) == InvalidOffsetNumber)
1459 {
1460 memset(rightpage, 0, BufferGetPageSize(rbuf));
1461 elog(ERROR, "failed to add hikey to the right sibling"
1462 " while splitting block %u of index \"%s\"",
1463 origpagenumber, RelationGetRelationName(rel));
1464 }
1465 rightoff = OffsetNumberNext(rightoff);
1466 }
1467
1468 /*
1469 * Now transfer all the data items (non-pivot tuples in isleaf case, or
1470 * additional pivot tuples in !isleaf case) to the appropriate page.
1471 *
1472 * Note: we *must* insert at least the right page's items in item-number
1473 * order, for the benefit of _bt_restore_page().
1474 */
1475 maxoff = PageGetMaxOffsetNumber(origpage);
1476
1477 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1478 {
1479 itemid = PageGetItemId(origpage, i);
1480 itemsz = ItemIdGetLength(itemid);
1481 item = (IndexTuple) PageGetItem(origpage, itemid);
1482
1483 /* does new item belong before this one? */
1484 if (i == newitemoff)
1485 {
1486 if (newitemonleft)
1487 {
1488 Assert(newitemoff <= firstright);
1489 if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1490 {
1491 memset(rightpage, 0, BufferGetPageSize(rbuf));
1492 elog(ERROR, "failed to add new item to the left sibling"
1493 " while splitting block %u of index \"%s\"",
1494 origpagenumber, RelationGetRelationName(rel));
1495 }
1496 leftoff = OffsetNumberNext(leftoff);
1497 }
1498 else
1499 {
1500 Assert(newitemoff >= firstright);
1501 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1502 {
1503 memset(rightpage, 0, BufferGetPageSize(rbuf));
1504 elog(ERROR, "failed to add new item to the right sibling"
1505 " while splitting block %u of index \"%s\"",
1506 origpagenumber, RelationGetRelationName(rel));
1507 }
1508 rightoff = OffsetNumberNext(rightoff);
1509 }
1510 }
1511
1512 /* decide which page to put it on */
1513 if (i < firstright)
1514 {
1515 if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1516 {
1517 memset(rightpage, 0, BufferGetPageSize(rbuf));
1518 elog(ERROR, "failed to add old item to the left sibling"
1519 " while splitting block %u of index \"%s\"",
1520 origpagenumber, RelationGetRelationName(rel));
1521 }
1522 leftoff = OffsetNumberNext(leftoff);
1523 }
1524 else
1525 {
1526 if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1527 {
1528 memset(rightpage, 0, BufferGetPageSize(rbuf));
1529 elog(ERROR, "failed to add old item to the right sibling"
1530 " while splitting block %u of index \"%s\"",
1531 origpagenumber, RelationGetRelationName(rel));
1532 }
1533 rightoff = OffsetNumberNext(rightoff);
1534 }
1535 }
1536
1537 /* cope with possibility that newitem goes at the end */
1538 if (i <= newitemoff)
1539 {
1540 /*
1541 * Can't have newitemonleft here; that would imply we were told to put
1542 * *everything* on the left page, which cannot fit (if it could, we'd
1543 * not be splitting the page).
1544 */
1545 Assert(!newitemonleft);
1546 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1547 {
1548 memset(rightpage, 0, BufferGetPageSize(rbuf));
1549 elog(ERROR, "failed to add new item to the right sibling"
1550 " while splitting block %u of index \"%s\"",
1551 origpagenumber, RelationGetRelationName(rel));
1552 }
1553 rightoff = OffsetNumberNext(rightoff);
1554 }
1555
1556 /*
1557 * We have to grab the right sibling (if any) and fix the prev pointer
1558 * there. We are guaranteed that this is deadlock-free since no other
1559 * writer will be holding a lock on that page and trying to move left, and
1560 * all readers release locks on a page before trying to fetch its
1561 * neighbors.
1562 */
1563 if (!P_RIGHTMOST(oopaque))
1564 {
1565 sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1566 spage = BufferGetPage(sbuf);
1567 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1568 if (sopaque->btpo_prev != origpagenumber)
1569 {
1570 memset(rightpage, 0, BufferGetPageSize(rbuf));
1571 elog(ERROR, "right sibling's left-link doesn't match: "
1572 "block %u links to %u instead of expected %u in index \"%s\"",
1573 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1574 RelationGetRelationName(rel));
1575 }
1576
1577 /*
1578 * Check to see if we can set the SPLIT_END flag in the right-hand
1579 * split page; this can save some I/O for vacuum since it need not
1580 * proceed to the right sibling. We can set the flag if the right
1581 * sibling has a different cycleid: that means it could not be part of
1582 * a group of pages that were all split off from the same ancestor
1583 * page. If you're confused, imagine that page A splits to A B and
1584 * then again, yielding A C B, while vacuum is in progress. Tuples
1585 * originally in A could now be in either B or C, hence vacuum must
1586 * examine both pages. But if D, our right sibling, has a different
1587 * cycleid then it could not contain any tuples that were in A when
1588 * the vacuum started.
1589 */
1590 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1591 ropaque->btpo_flags |= BTP_SPLIT_END;
1592 }
1593
1594 /*
1595 * Right sibling is locked, new siblings are prepared, but original page
1596 * is not updated yet.
1597 *
1598 * NO EREPORT(ERROR) till right sibling is updated. We can get away with
1599 * not starting the critical section till here because we haven't been
1600 * scribbling on the original page yet; see comments above.
1601 */
1602 START_CRIT_SECTION();
1603
1604 /*
1605 * By here, the original data page has been split into two new halves, and
1606 * these are correct. The algorithm requires that the left page never
1607 * move during a split, so we copy the new left page back on top of the
1608 * original. Note that this is not a waste of time, since we also require
1609 * (in the page management code) that the center of a page always be
1610 * clean, and the most efficient way to guarantee this is just to compact
1611 * the data by reinserting it into a new left page. (XXX the latter
1612 * comment is probably obsolete; but in any case it's good to not scribble
1613 * on the original page until we enter the critical section.)
1614 *
1615 * We need to do this before writing the WAL record, so that XLogInsert
1616 * can WAL log an image of the page if necessary.
1617 */
1618 PageRestoreTempPage(leftpage, origpage);
1619 /* leftpage, lopaque must not be used below here */
1620
1621 MarkBufferDirty(buf);
1622 MarkBufferDirty(rbuf);
1623
1624 if (!P_RIGHTMOST(ropaque))
1625 {
1626 sopaque->btpo_prev = rightpagenumber;
1627 MarkBufferDirty(sbuf);
1628 }
1629
1630 /*
1631 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1632 * a split.
1633 */
1634 if (!isleaf)
1635 {
1636 Page cpage = BufferGetPage(cbuf);
1637 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1638
1639 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1640 MarkBufferDirty(cbuf);
1641 }
1642
1643 /* XLOG stuff */
1644 if (RelationNeedsWAL(rel))
1645 {
1646 xl_btree_split xlrec;
1647 uint8 xlinfo;
1648 XLogRecPtr recptr;
1649
1650 xlrec.level = ropaque->btpo.level;
1651 xlrec.firstright = firstright;
1652 xlrec.newitemoff = newitemoff;
1653
1654 XLogBeginInsert();
1655 XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1656
1657 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1658 XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1659 /* Log the right sibling, because we've changed its prev-pointer. */
1660 if (!P_RIGHTMOST(ropaque))
1661 XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1662 if (BufferIsValid(cbuf))
1663 XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1664
1665 /*
1666 * Log the new item, if it was inserted on the left page. (If it was
1667 * put on the right page, we don't need to explicitly WAL log it
1668 * because it's included with all the other items on the right page.)
1669 * Show the new item as belonging to the left page buffer, so that it
1670 * is not stored if XLogInsert decides it needs a full-page image of
1671 * the left page. We store the offset anyway, though, to support
1672 * archive compression of these records.
1673 */
1674 if (newitemonleft)
1675 XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1676
1677 /* Log the left page's new high key */
1678 itemid = PageGetItemId(origpage, P_HIKEY);
1679 item = (IndexTuple) PageGetItem(origpage, itemid);
1680 XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1681
1682 /*
1683 * Log the contents of the right page in the format understood by
1684 * _bt_restore_page(). The whole right page will be recreated.
1685 *
1686 * Direct access to page is not good but faster - we should implement
1687 * some new func in page API. Note we only store the tuples
1688 * themselves, knowing that they were inserted in item-number order
1689 * and so the line pointers can be reconstructed. See comments for
1690 * _bt_restore_page().
1691 */
1692 XLogRegisterBufData(1,
1693 (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1694 ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1695
1696 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1697 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1698
1699 PageSetLSN(origpage, recptr);
1700 PageSetLSN(rightpage, recptr);
1701 if (!P_RIGHTMOST(ropaque))
1702 {
1703 PageSetLSN(spage, recptr);
1704 }
1705 if (!isleaf)
1706 {
1707 PageSetLSN(BufferGetPage(cbuf), recptr);
1708 }
1709 }
1710
1711 END_CRIT_SECTION();
1712
1713 /* release the old right sibling */
1714 if (!P_RIGHTMOST(ropaque))
1715 _bt_relbuf(rel, sbuf);
1716
1717 /* release the child */
1718 if (!isleaf)
1719 _bt_relbuf(rel, cbuf);
1720
1721 /* split's done */
1722 return rbuf;
1723}
1724
1725/*
1726 * _bt_insert_parent() -- Insert downlink into parent, completing split.
1727 *
1728 * On entry, buf and rbuf are the left and right split pages, which we
1729 * still hold write locks on. Both locks will be released here. We
1730 * release the rbuf lock once we have a write lock on the page that we
1731 * intend to insert a downlink to rbuf on (i.e. buf's current parent page).
1732 * The lock on buf is released at the same point as the lock on the parent
1733 * page, since buf's INCOMPLETE_SPLIT flag must be cleared by the same
1734 * atomic operation that completes the split by inserting a new downlink.
1735 *
1736 * stack - stack showing how we got here. Will be NULL when splitting true
1737 * root, or during concurrent root split, where we can be inefficient
1738 * is_root - we split the true root
1739 * is_only - we split a page alone on its level (might have been fast root)
1740 */
1741static void
1742_bt_insert_parent(Relation rel,
1743 Buffer buf,
1744 Buffer rbuf,
1745 BTStack stack,
1746 bool is_root,
1747 bool is_only)
1748{
1749 /*
1750 * Here we have to do something Lehman and Yao don't talk about: deal with
1751 * a root split and construction of a new root. If our stack is empty
1752 * then we have just split a node on what had been the root level when we
1753 * descended the tree. If it was still the root then we perform a
1754 * new-root construction. If it *wasn't* the root anymore, search to find
1755 * the next higher level that someone constructed meanwhile, and find the
1756 * right place to insert as for the normal case.
1757 *
1758 * If we have to search for the parent level, we do so by re-descending
1759 * from the root. This is not super-efficient, but it's rare enough not
1760 * to matter.
1761 */
1762 if (is_root)
1763 {
1764 Buffer rootbuf;
1765
1766 Assert(stack == NULL);
1767 Assert(is_only);
1768 /* create a new root node and update the metapage */
1769 rootbuf = _bt_newroot(rel, buf, rbuf);
1770 /* release the split buffers */
1771 _bt_relbuf(rel, rootbuf);
1772 _bt_relbuf(rel, rbuf);
1773 _bt_relbuf(rel, buf);
1774 }
1775 else
1776 {
1777 BlockNumber bknum = BufferGetBlockNumber(buf);
1778 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1779 Page page = BufferGetPage(buf);
1780 IndexTuple new_item;
1781 BTStackData fakestack;
1782 IndexTuple ritem;
1783 Buffer pbuf;
1784
1785 if (stack == NULL)
1786 {
1787 BTPageOpaque lpageop;
1788
1789 elog(DEBUG2, "concurrent ROOT page split");
1790 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1791 /* Find the leftmost page at the next level up */
1792 pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1793 NULL);
1794 /* Set up a phony stack entry pointing there */
1795 stack = &fakestack;
1796 stack->bts_blkno = BufferGetBlockNumber(pbuf);
1797 stack->bts_offset = InvalidOffsetNumber;
1798 stack->bts_btentry = InvalidBlockNumber;
1799 stack->bts_parent = NULL;
1800 _bt_relbuf(rel, pbuf);
1801 }
1802
1803 /* get high key from left, a strict lower bound for new right page */
1804 ritem = (IndexTuple) PageGetItem(page,
1805 PageGetItemId(page, P_HIKEY));
1806
1807 /* form an index tuple that points at the new right page */
1808 new_item = CopyIndexTuple(ritem);
1809 BTreeInnerTupleSetDownLink(new_item, rbknum);
1810
1811 /*
1812 * Re-find and write lock the parent of buf.
1813 *
1814 * It's possible that the location of buf's downlink has changed since
1815 * our initial _bt_search() descent. _bt_getstackbuf() will detect
1816 * and recover from this, updating the stack, which ensures that the
1817 * new downlink will be inserted at the correct offset. Even buf's
1818 * parent may have changed.
1819 */
1820 stack->bts_btentry = bknum;
1821 pbuf = _bt_getstackbuf(rel, stack);
1822
1823 /*
1824 * Now we can unlock the right child. The left child will be unlocked
1825 * by _bt_insertonpg().
1826 */
1827 _bt_relbuf(rel, rbuf);
1828
1829 if (pbuf == InvalidBuffer)
1830 elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1831 RelationGetRelationName(rel), bknum, rbknum);
1832
1833 /* Recursively update the parent */
1834 _bt_insertonpg(rel, NULL, pbuf, buf, stack->bts_parent,
1835 new_item, stack->bts_offset + 1,
1836 is_only);
1837
1838 /* be tidy */
1839 pfree(new_item);
1840 }
1841}
1842
1843/*
1844 * _bt_finish_split() -- Finish an incomplete split
1845 *
1846 * A crash or other failure can leave a split incomplete. The insertion
1847 * routines won't allow to insert on a page that is incompletely split.
1848 * Before inserting on such a page, call _bt_finish_split().
1849 *
1850 * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
1851 * and unpinned.
1852 */
1853void
1854_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1855{
1856 Page lpage = BufferGetPage(lbuf);
1857 BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1858 Buffer rbuf;
1859 Page rpage;
1860 BTPageOpaque rpageop;
1861 bool was_root;
1862 bool was_only;
1863
1864 Assert(P_INCOMPLETE_SPLIT(lpageop));
1865
1866 /* Lock right sibling, the one missing the downlink */
1867 rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1868 rpage = BufferGetPage(rbuf);
1869 rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1870
1871 /* Could this be a root split? */
1872 if (!stack)
1873 {
1874 Buffer metabuf;
1875 Page metapg;
1876 BTMetaPageData *metad;
1877
1878 /* acquire lock on the metapage */
1879 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1880 metapg = BufferGetPage(metabuf);
1881 metad = BTPageGetMeta(metapg);
1882
1883 was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1884
1885 _bt_relbuf(rel, metabuf);
1886 }
1887 else
1888 was_root = false;
1889
1890 /* Was this the only page on the level before split? */
1891 was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1892
1893 elog(DEBUG1, "finishing incomplete split of %u/%u",
1894 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1895
1896 _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1897}
1898
1899/*
1900 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1901 * we last looked at in the parent.
1902 *
1903 * This is possible because we save the downlink from the parent item,
1904 * which is enough to uniquely identify it. Insertions into the parent
1905 * level could cause the item to move right; deletions could cause it
1906 * to move left, but not left of the page we previously found it in.
1907 *
1908 * Adjusts bts_blkno & bts_offset if changed.
1909 *
1910 * Returns write-locked buffer, or InvalidBuffer if item not found
1911 * (should not happen).
1912 */
1913Buffer
1914_bt_getstackbuf(Relation rel, BTStack stack)
1915{
1916 BlockNumber blkno;
1917 OffsetNumber start;
1918
1919 blkno = stack->bts_blkno;
1920 start = stack->bts_offset;
1921
1922 for (;;)
1923 {
1924 Buffer buf;
1925 Page page;
1926 BTPageOpaque opaque;
1927
1928 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1929 page = BufferGetPage(buf);
1930 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1931
1932 if (P_INCOMPLETE_SPLIT(opaque))
1933 {
1934 _bt_finish_split(rel, buf, stack->bts_parent);
1935 continue;
1936 }
1937
1938 if (!P_IGNORE(opaque))
1939 {
1940 OffsetNumber offnum,
1941 minoff,
1942 maxoff;
1943 ItemId itemid;
1944 IndexTuple item;
1945
1946 minoff = P_FIRSTDATAKEY(opaque);
1947 maxoff = PageGetMaxOffsetNumber(page);
1948
1949 /*
1950 * start = InvalidOffsetNumber means "search the whole page". We
1951 * need this test anyway due to possibility that page has a high
1952 * key now when it didn't before.
1953 */
1954 if (start < minoff)
1955 start = minoff;
1956
1957 /*
1958 * Need this check too, to guard against possibility that page
1959 * split since we visited it originally.
1960 */
1961 if (start > maxoff)
1962 start = OffsetNumberNext(maxoff);
1963
1964 /*
1965 * These loops will check every item on the page --- but in an
1966 * order that's attuned to the probability of where it actually
1967 * is. Scan to the right first, then to the left.
1968 */
1969 for (offnum = start;
1970 offnum <= maxoff;
1971 offnum = OffsetNumberNext(offnum))
1972 {
1973 itemid = PageGetItemId(page, offnum);
1974 item = (IndexTuple) PageGetItem(page, itemid);
1975
1976 if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
1977 {
1978 /* Return accurate pointer to where link is now */
1979 stack->bts_blkno = blkno;
1980 stack->bts_offset = offnum;
1981 return buf;
1982 }
1983 }
1984
1985 for (offnum = OffsetNumberPrev(start);
1986 offnum >= minoff;
1987 offnum = OffsetNumberPrev(offnum))
1988 {
1989 itemid = PageGetItemId(page, offnum);
1990 item = (IndexTuple) PageGetItem(page, itemid);
1991
1992 if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
1993 {
1994 /* Return accurate pointer to where link is now */
1995 stack->bts_blkno = blkno;
1996 stack->bts_offset = offnum;
1997 return buf;
1998 }
1999 }
2000 }
2001
2002 /*
2003 * The item we're looking for moved right at least one page.
2004 */
2005 if (P_RIGHTMOST(opaque))
2006 {
2007 _bt_relbuf(rel, buf);
2008 return InvalidBuffer;
2009 }
2010 blkno = opaque->btpo_next;
2011 start = InvalidOffsetNumber;
2012 _bt_relbuf(rel, buf);
2013 }
2014}
2015
2016/*
2017 * _bt_newroot() -- Create a new root page for the index.
2018 *
2019 * We've just split the old root page and need to create a new one.
2020 * In order to do this, we add a new root page to the file, then lock
2021 * the metadata page and update it. This is guaranteed to be deadlock-
2022 * free, because all readers release their locks on the metadata page
2023 * before trying to lock the root, and all writers lock the root before
2024 * trying to lock the metadata page. We have a write lock on the old
2025 * root page, so we have not introduced any cycles into the waits-for
2026 * graph.
2027 *
2028 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
2029 * locked. On exit, a new root page exists with entries for the
2030 * two new children, metapage is updated and unlocked/unpinned.
2031 * The new root buffer is returned to caller which has to unlock/unpin
2032 * lbuf, rbuf & rootbuf.
2033 */
2034static Buffer
2035_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
2036{
2037 Buffer rootbuf;
2038 Page lpage,
2039 rootpage;
2040 BlockNumber lbkno,
2041 rbkno;
2042 BlockNumber rootblknum;
2043 BTPageOpaque rootopaque;
2044 BTPageOpaque lopaque;
2045 ItemId itemid;
2046 IndexTuple item;
2047 IndexTuple left_item;
2048 Size left_item_sz;
2049 IndexTuple right_item;
2050 Size right_item_sz;
2051 Buffer metabuf;
2052 Page metapg;
2053 BTMetaPageData *metad;
2054
2055 lbkno = BufferGetBlockNumber(lbuf);
2056 rbkno = BufferGetBlockNumber(rbuf);
2057 lpage = BufferGetPage(lbuf);
2058 lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
2059
2060 /* get a new root page */
2061 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
2062 rootpage = BufferGetPage(rootbuf);
2063 rootblknum = BufferGetBlockNumber(rootbuf);
2064
2065 /* acquire lock on the metapage */
2066 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
2067 metapg = BufferGetPage(metabuf);
2068 metad = BTPageGetMeta(metapg);
2069
2070 /*
2071 * Create downlink item for left page (old root). Since this will be the
2072 * first item in a non-leaf page, it implicitly has minus-infinity key
2073 * value, so we need not store any actual key in it.
2074 */
2075 left_item_sz = sizeof(IndexTupleData);
2076 left_item = (IndexTuple) palloc(left_item_sz);
2077 left_item->t_info = left_item_sz;
2078 BTreeInnerTupleSetDownLink(left_item, lbkno);
2079 BTreeTupleSetNAtts(left_item, 0);
2080
2081 /*
2082 * Create downlink item for right page. The key for it is obtained from
2083 * the "high key" position in the left page.
2084 */
2085 itemid = PageGetItemId(lpage, P_HIKEY);
2086 right_item_sz = ItemIdGetLength(itemid);
2087 item = (IndexTuple) PageGetItem(lpage, itemid);
2088 right_item = CopyIndexTuple(item);
2089 BTreeInnerTupleSetDownLink(right_item, rbkno);
2090
2091 /* NO EREPORT(ERROR) from here till newroot op is logged */
2092 START_CRIT_SECTION();
2093
2094 /* upgrade metapage if needed */
2095 if (metad->btm_version < BTREE_NOVAC_VERSION)
2096 _bt_upgrademetapage(metapg);
2097
2098 /* set btree special data */
2099 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
2100 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
2101 rootopaque->btpo_flags = BTP_ROOT;
2102 rootopaque->btpo.level =
2103 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
2104 rootopaque->btpo_cycleid = 0;
2105
2106 /* update metapage data */
2107 metad->btm_root = rootblknum;
2108 metad->btm_level = rootopaque->btpo.level;
2109 metad->btm_fastroot = rootblknum;
2110 metad->btm_fastlevel = rootopaque->btpo.level;
2111
2112 /*
2113 * Insert the left page pointer into the new root page. The root page is
2114 * the rightmost page on its level so there is no "high key" in it; the
2115 * two items will go into positions P_HIKEY and P_FIRSTKEY.
2116 *
2117 * Note: we *must* insert the two items in item-number order, for the
2118 * benefit of _bt_restore_page().
2119 */
2120 Assert(BTreeTupleGetNAtts(left_item, rel) == 0);
2121 if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
2122 false, false) == InvalidOffsetNumber)
2123 elog(PANIC, "failed to add leftkey to new root page"
2124 " while splitting block %u of index \"%s\"",
2125 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2126
2127 /*
2128 * insert the right page pointer into the new root page.
2129 */
2130 Assert(BTreeTupleGetNAtts(right_item, rel) > 0);
2131 Assert(BTreeTupleGetNAtts(right_item, rel) <=
2132 IndexRelationGetNumberOfKeyAttributes(rel));
2133 if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2134 false, false) == InvalidOffsetNumber)
2135 elog(PANIC, "failed to add rightkey to new root page"
2136 " while splitting block %u of index \"%s\"",
2137 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2138
2139 /* Clear the incomplete-split flag in the left child */
2140 Assert(P_INCOMPLETE_SPLIT(lopaque));
2141 lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2142 MarkBufferDirty(lbuf);
2143
2144 MarkBufferDirty(rootbuf);
2145 MarkBufferDirty(metabuf);
2146
2147 /* XLOG stuff */
2148 if (RelationNeedsWAL(rel))
2149 {
2150 xl_btree_newroot xlrec;
2151 XLogRecPtr recptr;
2152 xl_btree_metadata md;
2153
2154 xlrec.rootblk = rootblknum;
2155 xlrec.level = metad->btm_level;
2156
2157 XLogBeginInsert();
2158 XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2159
2160 XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2161 XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2162 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
2163
2164 Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
2165 md.version = metad->btm_version;
2166 md.root = rootblknum;
2167 md.level = metad->btm_level;
2168 md.fastroot = rootblknum;
2169 md.fastlevel = metad->btm_level;
2170 md.oldest_btpo_xact = metad->btm_oldest_btpo_xact;
2171 md.last_cleanup_num_heap_tuples = metad->btm_last_cleanup_num_heap_tuples;
2172
2173 XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2174
2175 /*
2176 * Direct access to page is not good but faster - we should implement
2177 * some new func in page API.
2178 */
2179 XLogRegisterBufData(0,
2180 (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2181 ((PageHeader) rootpage)->pd_special -
2182 ((PageHeader) rootpage)->pd_upper);
2183
2184 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2185
2186 PageSetLSN(lpage, recptr);
2187 PageSetLSN(rootpage, recptr);
2188 PageSetLSN(metapg, recptr);
2189 }
2190
2191 END_CRIT_SECTION();
2192
2193 /* done with metapage */
2194 _bt_relbuf(rel, metabuf);
2195
2196 pfree(left_item);
2197 pfree(right_item);
2198
2199 return rootbuf;
2200}
2201
2202/*
2203 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
2204 *
2205 * This routine adds the tuple to the page as requested. It does
2206 * not affect pin/lock status, but you'd better have a write lock
2207 * and pin on the target buffer! Don't forget to write and release
2208 * the buffer afterwards, either.
2209 *
2210 * The main difference between this routine and a bare PageAddItem call
2211 * is that this code knows that the leftmost index tuple on a non-leaf
2212 * btree page doesn't need to have a key. Therefore, it strips such
2213 * tuples down to just the tuple header. CAUTION: this works ONLY if
2214 * we insert the tuples in order, so that the given itup_off does
2215 * represent the final position of the tuple!
2216 */
2217static bool
2218_bt_pgaddtup(Page page,
2219 Size itemsize,
2220 IndexTuple itup,
2221 OffsetNumber itup_off)
2222{
2223 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2224 IndexTupleData trunctuple;
2225
2226 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2227 {
2228 trunctuple = *itup;
2229 trunctuple.t_info = sizeof(IndexTupleData);
2230 /* Deliberately zero INDEX_ALT_TID_MASK bits */
2231 BTreeTupleSetNAtts(&trunctuple, 0);
2232 itup = &trunctuple;
2233 itemsize = sizeof(IndexTupleData);
2234 }
2235
2236 if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2237 false, false) == InvalidOffsetNumber)
2238 return false;
2239
2240 return true;
2241}
2242
2243/*
2244 * _bt_vacuum_one_page - vacuum just one index page.
2245 *
2246 * Try to remove LP_DEAD items from the given page. The passed buffer
2247 * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2248 * super-exclusive "cleanup" lock (see nbtree/README).
2249 */
2250static void
2251_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2252{
2253 OffsetNumber deletable[MaxOffsetNumber];
2254 int ndeletable = 0;
2255 OffsetNumber offnum,
2256 minoff,
2257 maxoff;
2258 Page page = BufferGetPage(buffer);
2259 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2260
2261 Assert(P_ISLEAF(opaque));
2262
2263 /*
2264 * Scan over all items to see which ones need to be deleted according to
2265 * LP_DEAD flags.
2266 */
2267 minoff = P_FIRSTDATAKEY(opaque);
2268 maxoff = PageGetMaxOffsetNumber(page);
2269 for (offnum = minoff;
2270 offnum <= maxoff;
2271 offnum = OffsetNumberNext(offnum))
2272 {
2273 ItemId itemId = PageGetItemId(page, offnum);
2274
2275 if (ItemIdIsDead(itemId))
2276 deletable[ndeletable++] = offnum;
2277 }
2278
2279 if (ndeletable > 0)
2280 _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2281
2282 /*
2283 * Note: if we didn't find any LP_DEAD items, then the page's
2284 * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
2285 * separate write to clear it, however. We will clear it when we split
2286 * the page.
2287 */
2288}
2289