1/*-------------------------------------------------------------------------
2 *
3 * ginbtree.c
4 * page utilities routines for the postgres inverted index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gin/ginbtree.c
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/gin_private.h"
18#include "access/ginxlog.h"
19#include "access/xloginsert.h"
20#include "storage/predicate.h"
21#include "miscadmin.h"
22#include "utils/memutils.h"
23#include "utils/rel.h"
24
25static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
26static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
27 void *insertdata, BlockNumber updateblkno,
28 Buffer childbuf, GinStatsData *buildStats);
29static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
30 bool freestack, GinStatsData *buildStats);
31
32/*
33 * Lock buffer by needed method for search.
34 */
35int
36ginTraverseLock(Buffer buffer, bool searchMode)
37{
38 Page page;
39 int access = GIN_SHARE;
40
41 LockBuffer(buffer, GIN_SHARE);
42 page = BufferGetPage(buffer);
43 if (GinPageIsLeaf(page))
44 {
45 if (searchMode == false)
46 {
47 /* we should relock our page */
48 LockBuffer(buffer, GIN_UNLOCK);
49 LockBuffer(buffer, GIN_EXCLUSIVE);
50
51 /* But root can become non-leaf during relock */
52 if (!GinPageIsLeaf(page))
53 {
54 /* restore old lock type (very rare) */
55 LockBuffer(buffer, GIN_UNLOCK);
56 LockBuffer(buffer, GIN_SHARE);
57 }
58 else
59 access = GIN_EXCLUSIVE;
60 }
61 }
62
63 return access;
64}
65
66/*
67 * Descend the tree to the leaf page that contains or would contain the key
68 * we're searching for. The key should already be filled in 'btree', in
69 * tree-type specific manner. If btree->fullScan is true, descends to the
70 * leftmost leaf page.
71 *
72 * If 'searchmode' is false, on return stack->buffer is exclusively locked,
73 * and the stack represents the full path to the root. Otherwise stack->buffer
74 * is share-locked, and stack->parent is NULL.
75 *
76 * If 'rootConflictCheck' is true, tree root is checked for serialization
77 * conflict.
78 */
79GinBtreeStack *
80ginFindLeafPage(GinBtree btree, bool searchMode,
81 bool rootConflictCheck, Snapshot snapshot)
82{
83 GinBtreeStack *stack;
84
85 stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
86 stack->blkno = btree->rootBlkno;
87 stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
88 stack->parent = NULL;
89 stack->predictNumber = 1;
90
91 if (rootConflictCheck)
92 CheckForSerializableConflictIn(btree->index, NULL, stack->buffer);
93
94 for (;;)
95 {
96 Page page;
97 BlockNumber child;
98 int access;
99
100 stack->off = InvalidOffsetNumber;
101
102 page = BufferGetPage(stack->buffer);
103 TestForOldSnapshot(snapshot, btree->index, page);
104
105 access = ginTraverseLock(stack->buffer, searchMode);
106
107 /*
108 * If we're going to modify the tree, finish any incomplete splits we
109 * encounter on the way.
110 */
111 if (!searchMode && GinPageIsIncompleteSplit(page))
112 ginFinishSplit(btree, stack, false, NULL);
113
114 /*
115 * ok, page is correctly locked, we should check to move right ..,
116 * root never has a right link, so small optimization
117 */
118 while (btree->fullScan == false && stack->blkno != btree->rootBlkno &&
119 btree->isMoveRight(btree, page))
120 {
121 BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
122
123 if (rightlink == InvalidBlockNumber)
124 /* rightmost page */
125 break;
126
127 stack->buffer = ginStepRight(stack->buffer, btree->index, access);
128 stack->blkno = rightlink;
129 page = BufferGetPage(stack->buffer);
130 TestForOldSnapshot(snapshot, btree->index, page);
131
132 if (!searchMode && GinPageIsIncompleteSplit(page))
133 ginFinishSplit(btree, stack, false, NULL);
134 }
135
136 if (GinPageIsLeaf(page)) /* we found, return locked page */
137 return stack;
138
139 /* now we have correct buffer, try to find child */
140 child = btree->findChildPage(btree, stack);
141
142 LockBuffer(stack->buffer, GIN_UNLOCK);
143 Assert(child != InvalidBlockNumber);
144 Assert(stack->blkno != child);
145
146 if (searchMode)
147 {
148 /* in search mode we may forget path to leaf */
149 stack->blkno = child;
150 stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
151 }
152 else
153 {
154 GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
155
156 ptr->parent = stack;
157 stack = ptr;
158 stack->blkno = child;
159 stack->buffer = ReadBuffer(btree->index, stack->blkno);
160 stack->predictNumber = 1;
161 }
162 }
163}
164
165/*
166 * Step right from current page.
167 *
168 * The next page is locked first, before releasing the current page. This is
169 * crucial to protect from concurrent page deletion (see comment in
170 * ginDeletePage).
171 */
172Buffer
173ginStepRight(Buffer buffer, Relation index, int lockmode)
174{
175 Buffer nextbuffer;
176 Page page = BufferGetPage(buffer);
177 bool isLeaf = GinPageIsLeaf(page);
178 bool isData = GinPageIsData(page);
179 BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
180
181 nextbuffer = ReadBuffer(index, blkno);
182 LockBuffer(nextbuffer, lockmode);
183 UnlockReleaseBuffer(buffer);
184
185 /* Sanity check that the page we stepped to is of similar kind. */
186 page = BufferGetPage(nextbuffer);
187 if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
188 elog(ERROR, "right sibling of GIN page is of different type");
189
190 /*
191 * Given the proper lock sequence above, we should never land on a deleted
192 * page.
193 */
194 if (GinPageIsDeleted(page))
195 elog(ERROR, "right sibling of GIN page was deleted");
196
197 return nextbuffer;
198}
199
200void
201freeGinBtreeStack(GinBtreeStack *stack)
202{
203 while (stack)
204 {
205 GinBtreeStack *tmp = stack->parent;
206
207 if (stack->buffer != InvalidBuffer)
208 ReleaseBuffer(stack->buffer);
209
210 pfree(stack);
211 stack = tmp;
212 }
213}
214
215/*
216 * Try to find parent for current stack position. Returns correct parent and
217 * child's offset in stack->parent. The root page is never released, to
218 * prevent conflict with vacuum process.
219 */
220static void
221ginFindParents(GinBtree btree, GinBtreeStack *stack)
222{
223 Page page;
224 Buffer buffer;
225 BlockNumber blkno,
226 leftmostBlkno;
227 OffsetNumber offset;
228 GinBtreeStack *root;
229 GinBtreeStack *ptr;
230
231 /*
232 * Unwind the stack all the way up to the root, leaving only the root
233 * item.
234 *
235 * Be careful not to release the pin on the root page! The pin on root
236 * page is required to lock out concurrent vacuums on the tree.
237 */
238 root = stack->parent;
239 while (root->parent)
240 {
241 ReleaseBuffer(root->buffer);
242 root = root->parent;
243 }
244
245 Assert(root->blkno == btree->rootBlkno);
246 Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
247 root->off = InvalidOffsetNumber;
248
249 blkno = root->blkno;
250 buffer = root->buffer;
251 offset = InvalidOffsetNumber;
252
253 ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
254
255 for (;;)
256 {
257 LockBuffer(buffer, GIN_EXCLUSIVE);
258 page = BufferGetPage(buffer);
259 if (GinPageIsLeaf(page))
260 elog(ERROR, "Lost path");
261
262 if (GinPageIsIncompleteSplit(page))
263 {
264 Assert(blkno != btree->rootBlkno);
265 ptr->blkno = blkno;
266 ptr->buffer = buffer;
267
268 /*
269 * parent may be wrong, but if so, the ginFinishSplit call will
270 * recurse to call ginFindParents again to fix it.
271 */
272 ptr->parent = root;
273 ptr->off = InvalidOffsetNumber;
274
275 ginFinishSplit(btree, ptr, false, NULL);
276 }
277
278 leftmostBlkno = btree->getLeftMostChild(btree, page);
279
280 while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
281 {
282 blkno = GinPageGetOpaque(page)->rightlink;
283 if (blkno == InvalidBlockNumber)
284 {
285 UnlockReleaseBuffer(buffer);
286 break;
287 }
288 buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
289 page = BufferGetPage(buffer);
290
291 /* finish any incomplete splits, as above */
292 if (GinPageIsIncompleteSplit(page))
293 {
294 Assert(blkno != btree->rootBlkno);
295 ptr->blkno = blkno;
296 ptr->buffer = buffer;
297 ptr->parent = root;
298 ptr->off = InvalidOffsetNumber;
299
300 ginFinishSplit(btree, ptr, false, NULL);
301 }
302 }
303
304 if (blkno != InvalidBlockNumber)
305 {
306 ptr->blkno = blkno;
307 ptr->buffer = buffer;
308 ptr->parent = root; /* it may be wrong, but in next call we will
309 * correct */
310 ptr->off = offset;
311 stack->parent = ptr;
312 return;
313 }
314
315 /* Descend down to next level */
316 blkno = leftmostBlkno;
317 buffer = ReadBuffer(btree->index, blkno);
318 }
319}
320
321/*
322 * Insert a new item to a page.
323 *
324 * Returns true if the insertion was finished. On false, the page was split and
325 * the parent needs to be updated. (A root split returns true as it doesn't
326 * need any further action by the caller to complete.)
327 *
328 * When inserting a downlink to an internal page, 'childbuf' contains the
329 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
330 * atomically with the insert. Also, the existing item at offset stack->off
331 * in the target page is updated to point to updateblkno.
332 *
333 * stack->buffer is locked on entry, and is kept locked.
334 * Likewise for childbuf, if given.
335 */
336static bool
337ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
338 void *insertdata, BlockNumber updateblkno,
339 Buffer childbuf, GinStatsData *buildStats)
340{
341 Page page = BufferGetPage(stack->buffer);
342 bool result;
343 GinPlaceToPageRC rc;
344 uint16 xlflags = 0;
345 Page childpage = NULL;
346 Page newlpage = NULL,
347 newrpage = NULL;
348 void *ptp_workspace = NULL;
349 MemoryContext tmpCxt;
350 MemoryContext oldCxt;
351
352 /*
353 * We do all the work of this function and its subfunctions in a temporary
354 * memory context. This avoids leakages and simplifies APIs, since some
355 * subfunctions allocate storage that has to survive until we've finished
356 * the WAL insertion.
357 */
358 tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
359 "ginPlaceToPage temporary context",
360 ALLOCSET_DEFAULT_SIZES);
361 oldCxt = MemoryContextSwitchTo(tmpCxt);
362
363 if (GinPageIsData(page))
364 xlflags |= GIN_INSERT_ISDATA;
365 if (GinPageIsLeaf(page))
366 {
367 xlflags |= GIN_INSERT_ISLEAF;
368 Assert(!BufferIsValid(childbuf));
369 Assert(updateblkno == InvalidBlockNumber);
370 }
371 else
372 {
373 Assert(BufferIsValid(childbuf));
374 Assert(updateblkno != InvalidBlockNumber);
375 childpage = BufferGetPage(childbuf);
376 }
377
378 /*
379 * See if the incoming tuple will fit on the page. beginPlaceToPage will
380 * decide if the page needs to be split, and will compute the split
381 * contents if so. See comments for beginPlaceToPage and execPlaceToPage
382 * functions for more details of the API here.
383 */
384 rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
385 insertdata, updateblkno,
386 &ptp_workspace,
387 &newlpage, &newrpage);
388
389 if (rc == GPTP_NO_WORK)
390 {
391 /* Nothing to do */
392 result = true;
393 }
394 else if (rc == GPTP_INSERT)
395 {
396 /* It will fit, perform the insertion */
397 START_CRIT_SECTION();
398
399 if (RelationNeedsWAL(btree->index) && !btree->isBuild)
400 {
401 XLogBeginInsert();
402 XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
403 if (BufferIsValid(childbuf))
404 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
405 }
406
407 /* Perform the page update, and register any extra WAL data */
408 btree->execPlaceToPage(btree, stack->buffer, stack,
409 insertdata, updateblkno, ptp_workspace);
410
411 MarkBufferDirty(stack->buffer);
412
413 /* An insert to an internal page finishes the split of the child. */
414 if (BufferIsValid(childbuf))
415 {
416 GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
417 MarkBufferDirty(childbuf);
418 }
419
420 if (RelationNeedsWAL(btree->index) && !btree->isBuild)
421 {
422 XLogRecPtr recptr;
423 ginxlogInsert xlrec;
424 BlockIdData childblknos[2];
425
426 xlrec.flags = xlflags;
427
428 XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
429
430 /*
431 * Log information about child if this was an insertion of a
432 * downlink.
433 */
434 if (BufferIsValid(childbuf))
435 {
436 BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
437 BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
438 XLogRegisterData((char *) childblknos,
439 sizeof(BlockIdData) * 2);
440 }
441
442 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
443 PageSetLSN(page, recptr);
444 if (BufferIsValid(childbuf))
445 PageSetLSN(childpage, recptr);
446 }
447
448 END_CRIT_SECTION();
449
450 /* Insertion is complete. */
451 result = true;
452 }
453 else if (rc == GPTP_SPLIT)
454 {
455 /*
456 * Didn't fit, need to split. The split has been computed in newlpage
457 * and newrpage, which are pointers to palloc'd pages, not associated
458 * with buffers. stack->buffer is not touched yet.
459 */
460 Buffer rbuffer;
461 BlockNumber savedRightLink;
462 ginxlogSplit data;
463 Buffer lbuffer = InvalidBuffer;
464 Page newrootpg = NULL;
465
466 /* Get a new index page to become the right page */
467 rbuffer = GinNewBuffer(btree->index);
468
469 /* During index build, count the new page */
470 if (buildStats)
471 {
472 if (btree->isData)
473 buildStats->nDataPages++;
474 else
475 buildStats->nEntryPages++;
476 }
477
478 savedRightLink = GinPageGetOpaque(page)->rightlink;
479
480 /* Begin setting up WAL record */
481 data.node = btree->index->rd_node;
482 data.flags = xlflags;
483 if (BufferIsValid(childbuf))
484 {
485 data.leftChildBlkno = BufferGetBlockNumber(childbuf);
486 data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
487 }
488 else
489 data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
490
491 if (stack->parent == NULL)
492 {
493 /*
494 * splitting the root, so we need to allocate new left page and
495 * place pointers to left and right page on root page.
496 */
497 lbuffer = GinNewBuffer(btree->index);
498
499 /* During index build, count the new left page */
500 if (buildStats)
501 {
502 if (btree->isData)
503 buildStats->nDataPages++;
504 else
505 buildStats->nEntryPages++;
506 }
507
508 data.rrlink = InvalidBlockNumber;
509 data.flags |= GIN_SPLIT_ROOT;
510
511 GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
512 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
513
514 /*
515 * Construct a new root page containing downlinks to the new left
516 * and right pages. (Do this in a temporary copy rather than
517 * overwriting the original page directly, since we're not in the
518 * critical section yet.)
519 */
520 newrootpg = PageGetTempPage(newrpage);
521 GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
522
523 btree->fillRoot(btree, newrootpg,
524 BufferGetBlockNumber(lbuffer), newlpage,
525 BufferGetBlockNumber(rbuffer), newrpage);
526
527 if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
528 {
529
530 PredicateLockPageSplit(btree->index,
531 BufferGetBlockNumber(stack->buffer),
532 BufferGetBlockNumber(lbuffer));
533
534 PredicateLockPageSplit(btree->index,
535 BufferGetBlockNumber(stack->buffer),
536 BufferGetBlockNumber(rbuffer));
537 }
538
539 }
540 else
541 {
542 /* splitting a non-root page */
543 data.rrlink = savedRightLink;
544
545 GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
546 GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
547 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
548
549 if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
550 {
551
552 PredicateLockPageSplit(btree->index,
553 BufferGetBlockNumber(stack->buffer),
554 BufferGetBlockNumber(rbuffer));
555 }
556 }
557
558 /*
559 * OK, we have the new contents of the left page in a temporary copy
560 * now (newlpage), and likewise for the new contents of the
561 * newly-allocated right block. The original page is still unchanged.
562 *
563 * If this is a root split, we also have a temporary page containing
564 * the new contents of the root.
565 */
566
567 START_CRIT_SECTION();
568
569 MarkBufferDirty(rbuffer);
570 MarkBufferDirty(stack->buffer);
571
572 /*
573 * Restore the temporary copies over the real buffers.
574 */
575 if (stack->parent == NULL)
576 {
577 /* Splitting the root, three pages to update */
578 MarkBufferDirty(lbuffer);
579 memcpy(page, newrootpg, BLCKSZ);
580 memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
581 memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
582 }
583 else
584 {
585 /* Normal split, only two pages to update */
586 memcpy(page, newlpage, BLCKSZ);
587 memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
588 }
589
590 /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
591 if (BufferIsValid(childbuf))
592 {
593 GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
594 MarkBufferDirty(childbuf);
595 }
596
597 /* write WAL record */
598 if (RelationNeedsWAL(btree->index) && !btree->isBuild)
599 {
600 XLogRecPtr recptr;
601
602 XLogBeginInsert();
603
604 /*
605 * We just take full page images of all the split pages. Splits
606 * are uncommon enough that it's not worth complicating the code
607 * to be more efficient.
608 */
609 if (stack->parent == NULL)
610 {
611 XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
612 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
613 XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
614 }
615 else
616 {
617 XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
618 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
619 }
620 if (BufferIsValid(childbuf))
621 XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
622
623 XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
624
625 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
626
627 PageSetLSN(page, recptr);
628 PageSetLSN(BufferGetPage(rbuffer), recptr);
629 if (stack->parent == NULL)
630 PageSetLSN(BufferGetPage(lbuffer), recptr);
631 if (BufferIsValid(childbuf))
632 PageSetLSN(childpage, recptr);
633 }
634 END_CRIT_SECTION();
635
636 /*
637 * We can release the locks/pins on the new pages now, but keep
638 * stack->buffer locked. childbuf doesn't get unlocked either.
639 */
640 UnlockReleaseBuffer(rbuffer);
641 if (stack->parent == NULL)
642 UnlockReleaseBuffer(lbuffer);
643
644 /*
645 * If we split the root, we're done. Otherwise the split is not
646 * complete until the downlink for the new page has been inserted to
647 * the parent.
648 */
649 result = (stack->parent == NULL);
650 }
651 else
652 {
653 elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
654 result = false; /* keep compiler quiet */
655 }
656
657 /* Clean up temp context */
658 MemoryContextSwitchTo(oldCxt);
659 MemoryContextDelete(tmpCxt);
660
661 return result;
662}
663
664/*
665 * Finish a split by inserting the downlink for the new page to parent.
666 *
667 * On entry, stack->buffer is exclusively locked.
668 *
669 * If freestack is true, all the buffers are released and unlocked as we
670 * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
671 * locked, and stack is unmodified, except for possibly moving right to find
672 * the correct parent of page.
673 */
674static void
675ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
676 GinStatsData *buildStats)
677{
678 Page page;
679 bool done;
680 bool first = true;
681
682 /*
683 * freestack == false when we encounter an incompletely split page during
684 * a scan, while freestack == true is used in the normal scenario that a
685 * split is finished right after the initial insert.
686 */
687 if (!freestack)
688 elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
689 stack->blkno, RelationGetRelationName(btree->index));
690
691 /* this loop crawls up the stack until the insertion is complete */
692 do
693 {
694 GinBtreeStack *parent = stack->parent;
695 void *insertdata;
696 BlockNumber updateblkno;
697
698 /* search parent to lock */
699 LockBuffer(parent->buffer, GIN_EXCLUSIVE);
700
701 /*
702 * If the parent page was incompletely split, finish that split first,
703 * then continue with the current one.
704 *
705 * Note: we have to finish *all* incomplete splits we encounter, even
706 * if we have to move right. Otherwise we might choose as the target a
707 * page that has no downlink in the parent, and splitting it further
708 * would fail.
709 */
710 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
711 ginFinishSplit(btree, parent, false, buildStats);
712
713 /* move right if it's needed */
714 page = BufferGetPage(parent->buffer);
715 while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
716 {
717 if (GinPageRightMost(page))
718 {
719 /*
720 * rightmost page, but we don't find parent, we should use
721 * plain search...
722 */
723 LockBuffer(parent->buffer, GIN_UNLOCK);
724 ginFindParents(btree, stack);
725 parent = stack->parent;
726 Assert(parent != NULL);
727 break;
728 }
729
730 parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
731 parent->blkno = BufferGetBlockNumber(parent->buffer);
732 page = BufferGetPage(parent->buffer);
733
734 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
735 ginFinishSplit(btree, parent, false, buildStats);
736 }
737
738 /* insert the downlink */
739 insertdata = btree->prepareDownlink(btree, stack->buffer);
740 updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
741 done = ginPlaceToPage(btree, parent,
742 insertdata, updateblkno,
743 stack->buffer, buildStats);
744 pfree(insertdata);
745
746 /*
747 * If the caller requested to free the stack, unlock and release the
748 * child buffer now. Otherwise keep it pinned and locked, but if we
749 * have to recurse up the tree, we can unlock the upper pages, only
750 * keeping the page at the bottom of the stack locked.
751 */
752 if (!first || freestack)
753 LockBuffer(stack->buffer, GIN_UNLOCK);
754 if (freestack)
755 {
756 ReleaseBuffer(stack->buffer);
757 pfree(stack);
758 }
759 stack = parent;
760
761 first = false;
762 } while (!done);
763
764 /* unlock the parent */
765 LockBuffer(stack->buffer, GIN_UNLOCK);
766
767 if (freestack)
768 freeGinBtreeStack(stack);
769}
770
771/*
772 * Insert a value to tree described by stack.
773 *
774 * The value to be inserted is given in 'insertdata'. Its format depends
775 * on whether this is an entry or data tree, ginInsertValue just passes it
776 * through to the tree-specific callback function.
777 *
778 * During an index build, buildStats is non-null and the counters it contains
779 * are incremented as needed.
780 *
781 * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
782 */
783void
784ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
785 GinStatsData *buildStats)
786{
787 bool done;
788
789 /* If the leaf page was incompletely split, finish the split first */
790 if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
791 ginFinishSplit(btree, stack, false, buildStats);
792
793 done = ginPlaceToPage(btree, stack,
794 insertdata, InvalidBlockNumber,
795 InvalidBuffer, buildStats);
796 if (done)
797 {
798 LockBuffer(stack->buffer, GIN_UNLOCK);
799 freeGinBtreeStack(stack);
800 }
801 else
802 ginFinishSplit(btree, stack, true, buildStats);
803}
804