1/*-------------------------------------------------------------------------
2 *
3 * gist.c
4 * interface routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/gist_private.h"
18#include "access/gistscan.h"
19#include "catalog/pg_collation.h"
20#include "miscadmin.h"
21#include "storage/lmgr.h"
22#include "storage/predicate.h"
23#include "nodes/execnodes.h"
24#include "utils/builtins.h"
25#include "utils/index_selfuncs.h"
26#include "utils/memutils.h"
27#include "utils/rel.h"
28
29
30/* non-export function prototypes */
31static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 GISTSTATE *giststate,
36 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 Buffer leftchild, Buffer rightchild,
38 bool unlockbuf, bool unlockleftchild);
39static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
41static void gistprunepage(Relation rel, Page page, Buffer buffer,
42 Relation heapRel);
43
44
45#define ROTATEDIST(d) do { \
46 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc0(sizeof(SplitedPageLayout)); \
47 tmp->block.blkno = InvalidBlockNumber; \
48 tmp->buffer = InvalidBuffer; \
49 tmp->next = (d); \
50 (d)=tmp; \
51} while(0)
52
53
54/*
55 * GiST handler function: return IndexAmRoutine with access method parameters
56 * and callbacks.
57 */
58Datum
59gisthandler(PG_FUNCTION_ARGS)
60{
61 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
62
63 amroutine->amstrategies = 0;
64 amroutine->amsupport = GISTNProcs;
65 amroutine->amcanorder = false;
66 amroutine->amcanorderbyop = true;
67 amroutine->amcanbackward = false;
68 amroutine->amcanunique = false;
69 amroutine->amcanmulticol = true;
70 amroutine->amoptionalkey = true;
71 amroutine->amsearcharray = false;
72 amroutine->amsearchnulls = true;
73 amroutine->amstorage = true;
74 amroutine->amclusterable = true;
75 amroutine->ampredlocks = true;
76 amroutine->amcanparallel = false;
77 amroutine->amcaninclude = true;
78 amroutine->amkeytype = InvalidOid;
79
80 amroutine->ambuild = gistbuild;
81 amroutine->ambuildempty = gistbuildempty;
82 amroutine->aminsert = gistinsert;
83 amroutine->ambulkdelete = gistbulkdelete;
84 amroutine->amvacuumcleanup = gistvacuumcleanup;
85 amroutine->amcanreturn = gistcanreturn;
86 amroutine->amcostestimate = gistcostestimate;
87 amroutine->amoptions = gistoptions;
88 amroutine->amproperty = gistproperty;
89 amroutine->ambuildphasename = NULL;
90 amroutine->amvalidate = gistvalidate;
91 amroutine->ambeginscan = gistbeginscan;
92 amroutine->amrescan = gistrescan;
93 amroutine->amgettuple = gistgettuple;
94 amroutine->amgetbitmap = gistgetbitmap;
95 amroutine->amendscan = gistendscan;
96 amroutine->ammarkpos = NULL;
97 amroutine->amrestrpos = NULL;
98 amroutine->amestimateparallelscan = NULL;
99 amroutine->aminitparallelscan = NULL;
100 amroutine->amparallelrescan = NULL;
101
102 PG_RETURN_POINTER(amroutine);
103}
104
105/*
106 * Create and return a temporary memory context for use by GiST. We
107 * _always_ invoke user-provided methods in a temporary memory
108 * context, so that memory leaks in those functions cannot cause
109 * problems. Also, we use some additional temporary contexts in the
110 * GiST code itself, to avoid the need to do some awkward manual
111 * memory management.
112 */
113MemoryContext
114createTempGistContext(void)
115{
116 return AllocSetContextCreate(CurrentMemoryContext,
117 "GiST temporary context",
118 ALLOCSET_DEFAULT_SIZES);
119}
120
121/*
122 * gistbuildempty() -- build an empty gist index in the initialization fork
123 */
124void
125gistbuildempty(Relation index)
126{
127 Buffer buffer;
128
129 /* Initialize the root page */
130 buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
131 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
132
133 /* Initialize and xlog buffer */
134 START_CRIT_SECTION();
135 GISTInitBuffer(buffer, F_LEAF);
136 MarkBufferDirty(buffer);
137 log_newpage_buffer(buffer, true);
138 END_CRIT_SECTION();
139
140 /* Unlock and release the buffer */
141 UnlockReleaseBuffer(buffer);
142}
143
144/*
145 * gistinsert -- wrapper for GiST tuple insertion.
146 *
147 * This is the public interface routine for tuple insertion in GiSTs.
148 * It doesn't do any work; just locks the relation and passes the buck.
149 */
150bool
151gistinsert(Relation r, Datum *values, bool *isnull,
152 ItemPointer ht_ctid, Relation heapRel,
153 IndexUniqueCheck checkUnique,
154 IndexInfo *indexInfo)
155{
156 GISTSTATE *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
157 IndexTuple itup;
158 MemoryContext oldCxt;
159
160 /* Initialize GISTSTATE cache if first call in this statement */
161 if (giststate == NULL)
162 {
163 oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
164 giststate = initGISTstate(r);
165 giststate->tempCxt = createTempGistContext();
166 indexInfo->ii_AmCache = (void *) giststate;
167 MemoryContextSwitchTo(oldCxt);
168 }
169
170 oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
171
172 itup = gistFormTuple(giststate, r,
173 values, isnull, true /* size is currently bogus */ );
174 itup->t_tid = *ht_ctid;
175
176 gistdoinsert(r, itup, 0, giststate, heapRel, false);
177
178 /* cleanup */
179 MemoryContextSwitchTo(oldCxt);
180 MemoryContextReset(giststate->tempCxt);
181
182 return false;
183}
184
185
186/*
187 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
188 * at that offset is atomically removed along with inserting the new tuples.
189 * This is used to replace a tuple with a new one.
190 *
191 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
192 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
193 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
194 *
195 * If 'markfollowright' is true and the page is split, the left child is
196 * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
197 * index build, however, there is no concurrent access and the page splitting
198 * is done in a slightly simpler fashion, and false is passed.
199 *
200 * If there is not enough room on the page, it is split. All the split
201 * pages are kept pinned and locked and returned in *splitinfo, the caller
202 * is responsible for inserting the downlinks for them. However, if
203 * 'buffer' is the root page and it needs to be split, gistplacetopage()
204 * performs the split as one atomic operation, and *splitinfo is set to NIL.
205 * In that case, we continue to hold the root page locked, and the child
206 * pages are released; note that new tuple(s) are *not* on the root page
207 * but in one of the new child pages.
208 *
209 * If 'newblkno' is not NULL, returns the block number of page the first
210 * new/updated tuple was inserted to. Usually it's the given page, but could
211 * be its right sibling if the page was split.
212 *
213 * Returns 'true' if the page was split, 'false' otherwise.
214 */
215bool
216gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
217 Buffer buffer,
218 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
219 BlockNumber *newblkno,
220 Buffer leftchildbuf,
221 List **splitinfo,
222 bool markfollowright,
223 Relation heapRel,
224 bool is_build)
225{
226 BlockNumber blkno = BufferGetBlockNumber(buffer);
227 Page page = BufferGetPage(buffer);
228 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
229 XLogRecPtr recptr;
230 int i;
231 bool is_split;
232
233 /*
234 * Refuse to modify a page that's incompletely split. This should not
235 * happen because we finish any incomplete splits while we walk down the
236 * tree. However, it's remotely possible that another concurrent inserter
237 * splits a parent page, and errors out before completing the split. We
238 * will just throw an error in that case, and leave any split we had in
239 * progress unfinished too. The next insert that comes along will clean up
240 * the mess.
241 */
242 if (GistFollowRight(page))
243 elog(ERROR, "concurrent GiST page split was incomplete");
244
245 *splitinfo = NIL;
246
247 /*
248 * if isupdate, remove old key: This node's key has been modified, either
249 * because a child split occurred or because we needed to adjust our key
250 * for an insert in a child node. Therefore, remove the old version of
251 * this node's key.
252 *
253 * for WAL replay, in the non-split case we handle this by setting up a
254 * one-element todelete array; in the split case, it's handled implicitly
255 * because the tuple vector passed to gistSplit won't include this tuple.
256 */
257 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
258
259 /*
260 * If leaf page is full, try at first to delete dead tuples. And then
261 * check again.
262 */
263 if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
264 {
265 gistprunepage(rel, page, buffer, heapRel);
266 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
267 }
268
269 if (is_split)
270 {
271 /* no space for insertion */
272 IndexTuple *itvec;
273 int tlen;
274 SplitedPageLayout *dist = NULL,
275 *ptr;
276 BlockNumber oldrlink = InvalidBlockNumber;
277 GistNSN oldnsn = 0;
278 SplitedPageLayout rootpg;
279 bool is_rootsplit;
280 int npage;
281
282 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
283
284 /*
285 * Form index tuples vector to split. If we're replacing an old tuple,
286 * remove the old version from the vector.
287 */
288 itvec = gistextractpage(page, &tlen);
289 if (OffsetNumberIsValid(oldoffnum))
290 {
291 /* on inner page we should remove old tuple */
292 int pos = oldoffnum - FirstOffsetNumber;
293
294 tlen--;
295 if (pos != tlen)
296 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
297 }
298 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
299 dist = gistSplit(rel, page, itvec, tlen, giststate);
300
301 /*
302 * Check that split didn't produce too many pages.
303 */
304 npage = 0;
305 for (ptr = dist; ptr; ptr = ptr->next)
306 npage++;
307 /* in a root split, we'll add one more page to the list below */
308 if (is_rootsplit)
309 npage++;
310 if (npage > GIST_MAX_SPLIT_PAGES)
311 elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
312 npage, GIST_MAX_SPLIT_PAGES);
313
314 /*
315 * Set up pages to work with. Allocate new buffers for all but the
316 * leftmost page. The original page becomes the new leftmost page, and
317 * is just replaced with the new contents.
318 *
319 * For a root-split, allocate new buffers for all child pages, the
320 * original page is overwritten with new root page containing
321 * downlinks to the new child pages.
322 */
323 ptr = dist;
324 if (!is_rootsplit)
325 {
326 /* save old rightlink and NSN */
327 oldrlink = GistPageGetOpaque(page)->rightlink;
328 oldnsn = GistPageGetNSN(page);
329
330 dist->buffer = buffer;
331 dist->block.blkno = BufferGetBlockNumber(buffer);
332 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
333
334 /* clean all flags except F_LEAF */
335 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
336
337 ptr = ptr->next;
338 }
339 for (; ptr; ptr = ptr->next)
340 {
341 /* Allocate new page */
342 ptr->buffer = gistNewBuffer(rel);
343 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
344 ptr->page = BufferGetPage(ptr->buffer);
345 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
346 PredicateLockPageSplit(rel,
347 BufferGetBlockNumber(buffer),
348 BufferGetBlockNumber(ptr->buffer));
349 }
350
351 /*
352 * Now that we know which blocks the new pages go to, set up downlink
353 * tuples to point to them.
354 */
355 for (ptr = dist; ptr; ptr = ptr->next)
356 {
357 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
358 GistTupleSetValid(ptr->itup);
359 }
360
361 /*
362 * If this is a root split, we construct the new root page with the
363 * downlinks here directly, instead of requiring the caller to insert
364 * them. Add the new root page to the list along with the child pages.
365 */
366 if (is_rootsplit)
367 {
368 IndexTuple *downlinks;
369 int ndownlinks = 0;
370 int i;
371
372 rootpg.buffer = buffer;
373 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
374 GistPageGetOpaque(rootpg.page)->flags = 0;
375
376 /* Prepare a vector of all the downlinks */
377 for (ptr = dist; ptr; ptr = ptr->next)
378 ndownlinks++;
379 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
380 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
381 downlinks[i++] = ptr->itup;
382
383 rootpg.block.blkno = GIST_ROOT_BLKNO;
384 rootpg.block.num = ndownlinks;
385 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
386 &(rootpg.lenlist));
387 rootpg.itup = NULL;
388
389 rootpg.next = dist;
390 dist = &rootpg;
391 }
392 else
393 {
394 /* Prepare split-info to be returned to caller */
395 for (ptr = dist; ptr; ptr = ptr->next)
396 {
397 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
398
399 si->buf = ptr->buffer;
400 si->downlink = ptr->itup;
401 *splitinfo = lappend(*splitinfo, si);
402 }
403 }
404
405 /*
406 * Fill all pages. All the pages are new, ie. freshly allocated empty
407 * pages, or a temporary copy of the old page.
408 */
409 for (ptr = dist; ptr; ptr = ptr->next)
410 {
411 char *data = (char *) (ptr->list);
412
413 for (i = 0; i < ptr->block.num; i++)
414 {
415 IndexTuple thistup = (IndexTuple) data;
416
417 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
418 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
419
420 /*
421 * If this is the first inserted/updated tuple, let the caller
422 * know which page it landed on.
423 */
424 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
425 *newblkno = ptr->block.blkno;
426
427 data += IndexTupleSize(thistup);
428 }
429
430 /* Set up rightlinks */
431 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
432 GistPageGetOpaque(ptr->page)->rightlink =
433 ptr->next->block.blkno;
434 else
435 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
436
437 /*
438 * Mark the all but the right-most page with the follow-right
439 * flag. It will be cleared as soon as the downlink is inserted
440 * into the parent, but this ensures that if we error out before
441 * that, the index is still consistent. (in buffering build mode,
442 * any error will abort the index build anyway, so this is not
443 * needed.)
444 */
445 if (ptr->next && !is_rootsplit && markfollowright)
446 GistMarkFollowRight(ptr->page);
447 else
448 GistClearFollowRight(ptr->page);
449
450 /*
451 * Copy the NSN of the original page to all pages. The
452 * F_FOLLOW_RIGHT flags ensure that scans will follow the
453 * rightlinks until the downlinks are inserted.
454 */
455 GistPageSetNSN(ptr->page, oldnsn);
456 }
457
458 /*
459 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
460 * insertion for that. NB: The number of pages and data segments
461 * specified here must match the calculations in gistXLogSplit()!
462 */
463 if (!is_build && RelationNeedsWAL(rel))
464 XLogEnsureRecordSpace(npage, 1 + npage * 2);
465
466 START_CRIT_SECTION();
467
468 /*
469 * Must mark buffers dirty before XLogInsert, even though we'll still
470 * be changing their opaque fields below.
471 */
472 for (ptr = dist; ptr; ptr = ptr->next)
473 MarkBufferDirty(ptr->buffer);
474 if (BufferIsValid(leftchildbuf))
475 MarkBufferDirty(leftchildbuf);
476
477 /*
478 * The first page in the chain was a temporary working copy meant to
479 * replace the old page. Copy it over the old page.
480 */
481 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
482 dist->page = BufferGetPage(dist->buffer);
483
484 /*
485 * Write the WAL record.
486 *
487 * If we're building a new index, however, we don't WAL-log changes
488 * yet. The LSN-NSN interlock between parent and child requires that
489 * LSNs never move backwards, so set the LSNs to a value that's
490 * smaller than any real or fake unlogged LSN that might be generated
491 * later. (There can't be any concurrent scans during index build, so
492 * we don't need to be able to detect concurrent splits yet.)
493 */
494 if (is_build)
495 recptr = GistBuildLSN;
496 else
497 {
498 if (RelationNeedsWAL(rel))
499 recptr = gistXLogSplit(is_leaf,
500 dist, oldrlink, oldnsn, leftchildbuf,
501 markfollowright);
502 else
503 recptr = gistGetFakeLSN(rel);
504 }
505
506 for (ptr = dist; ptr; ptr = ptr->next)
507 PageSetLSN(ptr->page, recptr);
508
509 /*
510 * Return the new child buffers to the caller.
511 *
512 * If this was a root split, we've already inserted the downlink
513 * pointers, in the form of a new root page. Therefore we can release
514 * all the new buffers, and keep just the root page locked.
515 */
516 if (is_rootsplit)
517 {
518 for (ptr = dist->next; ptr; ptr = ptr->next)
519 UnlockReleaseBuffer(ptr->buffer);
520 }
521 }
522 else
523 {
524 /*
525 * Enough space. We always get here if ntup==0.
526 */
527 START_CRIT_SECTION();
528
529 /*
530 * Delete old tuple if any, then insert new tuple(s) if any. If
531 * possible, use the fast path of PageIndexTupleOverwrite.
532 */
533 if (OffsetNumberIsValid(oldoffnum))
534 {
535 if (ntup == 1)
536 {
537 /* One-for-one replacement, so use PageIndexTupleOverwrite */
538 if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
539 IndexTupleSize(*itup)))
540 elog(ERROR, "failed to add item to index page in \"%s\"",
541 RelationGetRelationName(rel));
542 }
543 else
544 {
545 /* Delete old, then append new tuple(s) to page */
546 PageIndexTupleDelete(page, oldoffnum);
547 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
548 }
549 }
550 else
551 {
552 /* Just append new tuples at the end of the page */
553 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
554 }
555
556 MarkBufferDirty(buffer);
557
558 if (BufferIsValid(leftchildbuf))
559 MarkBufferDirty(leftchildbuf);
560
561 if (is_build)
562 recptr = GistBuildLSN;
563 else
564 {
565 if (RelationNeedsWAL(rel))
566 {
567 OffsetNumber ndeloffs = 0,
568 deloffs[1];
569
570 if (OffsetNumberIsValid(oldoffnum))
571 {
572 deloffs[0] = oldoffnum;
573 ndeloffs = 1;
574 }
575
576 recptr = gistXLogUpdate(buffer,
577 deloffs, ndeloffs, itup, ntup,
578 leftchildbuf);
579 }
580 else
581 recptr = gistGetFakeLSN(rel);
582 }
583 PageSetLSN(page, recptr);
584
585 if (newblkno)
586 *newblkno = blkno;
587 }
588
589 /*
590 * If we inserted the downlink for a child page, set NSN and clear
591 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
592 * follow the rightlink if and only if they looked at the parent page
593 * before we inserted the downlink.
594 *
595 * Note that we do this *after* writing the WAL record. That means that
596 * the possible full page image in the WAL record does not include these
597 * changes, and they must be replayed even if the page is restored from
598 * the full page image. There's a chicken-and-egg problem: if we updated
599 * the child pages first, we wouldn't know the recptr of the WAL record
600 * we're about to write.
601 */
602 if (BufferIsValid(leftchildbuf))
603 {
604 Page leftpg = BufferGetPage(leftchildbuf);
605
606 GistPageSetNSN(leftpg, recptr);
607 GistClearFollowRight(leftpg);
608
609 PageSetLSN(leftpg, recptr);
610 }
611
612 END_CRIT_SECTION();
613
614 return is_split;
615}
616
617/*
618 * Workhouse routine for doing insertion into a GiST index. Note that
619 * this routine assumes it is invoked in a short-lived memory context,
620 * so it does not bother releasing palloc'd allocations.
621 */
622void
623gistdoinsert(Relation r, IndexTuple itup, Size freespace,
624 GISTSTATE *giststate, Relation heapRel, bool is_build)
625{
626 ItemId iid;
627 IndexTuple idxtuple;
628 GISTInsertStack firststack;
629 GISTInsertStack *stack;
630 GISTInsertState state;
631 bool xlocked = false;
632
633 memset(&state, 0, sizeof(GISTInsertState));
634 state.freespace = freespace;
635 state.r = r;
636 state.heapRel = heapRel;
637 state.is_build = is_build;
638
639 /* Start from the root */
640 firststack.blkno = GIST_ROOT_BLKNO;
641 firststack.lsn = 0;
642 firststack.retry_from_parent = false;
643 firststack.parent = NULL;
644 firststack.downlinkoffnum = InvalidOffsetNumber;
645 state.stack = stack = &firststack;
646
647 /*
648 * Walk down along the path of smallest penalty, updating the parent
649 * pointers with the key we're inserting as we go. If we crash in the
650 * middle, the tree is consistent, although the possible parent updates
651 * were a waste.
652 */
653 for (;;)
654 {
655 /*
656 * If we split an internal page while descending the tree, we have to
657 * retry at the parent. (Normally, the LSN-NSN interlock below would
658 * also catch this and cause us to retry. But LSNs are not updated
659 * during index build.)
660 */
661 while (stack->retry_from_parent)
662 {
663 if (xlocked)
664 LockBuffer(stack->buffer, GIST_UNLOCK);
665 xlocked = false;
666 ReleaseBuffer(stack->buffer);
667 state.stack = stack = stack->parent;
668 }
669
670 if (XLogRecPtrIsInvalid(stack->lsn))
671 stack->buffer = ReadBuffer(state.r, stack->blkno);
672
673 /*
674 * Be optimistic and grab shared lock first. Swap it for an exclusive
675 * lock later if we need to update the page.
676 */
677 if (!xlocked)
678 {
679 LockBuffer(stack->buffer, GIST_SHARE);
680 gistcheckpage(state.r, stack->buffer);
681 }
682
683 stack->page = (Page) BufferGetPage(stack->buffer);
684 stack->lsn = xlocked ?
685 PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
686 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
687
688 /*
689 * If this page was split but the downlink was never inserted to the
690 * parent because the inserting backend crashed before doing that, fix
691 * that now.
692 */
693 if (GistFollowRight(stack->page))
694 {
695 if (!xlocked)
696 {
697 LockBuffer(stack->buffer, GIST_UNLOCK);
698 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
699 xlocked = true;
700 /* someone might've completed the split when we unlocked */
701 if (!GistFollowRight(stack->page))
702 continue;
703 }
704 gistfixsplit(&state, giststate);
705
706 UnlockReleaseBuffer(stack->buffer);
707 xlocked = false;
708 state.stack = stack = stack->parent;
709 continue;
710 }
711
712 if ((stack->blkno != GIST_ROOT_BLKNO &&
713 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
714 GistPageIsDeleted(stack->page))
715 {
716 /*
717 * Concurrent split or page deletion detected. There's no
718 * guarantee that the downlink for this page is consistent with
719 * the tuple we're inserting anymore, so go back to parent and
720 * rechoose the best child.
721 */
722 UnlockReleaseBuffer(stack->buffer);
723 xlocked = false;
724 state.stack = stack = stack->parent;
725 continue;
726 }
727
728 if (!GistPageIsLeaf(stack->page))
729 {
730 /*
731 * This is an internal page so continue to walk down the tree.
732 * Find the child node that has the minimum insertion penalty.
733 */
734 BlockNumber childblkno;
735 IndexTuple newtup;
736 GISTInsertStack *item;
737 OffsetNumber downlinkoffnum;
738
739 downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
740 iid = PageGetItemId(stack->page, downlinkoffnum);
741 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
742 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
743
744 /*
745 * Check that it's not a leftover invalid tuple from pre-9.1
746 */
747 if (GistTupleIsInvalid(idxtuple))
748 ereport(ERROR,
749 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
750 RelationGetRelationName(r)),
751 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
752 errhint("Please REINDEX it.")));
753
754 /*
755 * Check that the key representing the target child node is
756 * consistent with the key we're inserting. Update it if it's not.
757 */
758 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
759 if (newtup)
760 {
761 /*
762 * Swap shared lock for an exclusive one. Beware, the page may
763 * change while we unlock/lock the page...
764 */
765 if (!xlocked)
766 {
767 LockBuffer(stack->buffer, GIST_UNLOCK);
768 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
769 xlocked = true;
770 stack->page = (Page) BufferGetPage(stack->buffer);
771
772 if (PageGetLSN(stack->page) != stack->lsn)
773 {
774 /* the page was changed while we unlocked it, retry */
775 continue;
776 }
777 }
778
779 /*
780 * Update the tuple.
781 *
782 * We still hold the lock after gistinserttuple(), but it
783 * might have to split the page to make the updated tuple fit.
784 * In that case the updated tuple might migrate to the other
785 * half of the split, so we have to go back to the parent and
786 * descend back to the half that's a better fit for the new
787 * tuple.
788 */
789 if (gistinserttuple(&state, stack, giststate, newtup,
790 downlinkoffnum))
791 {
792 /*
793 * If this was a root split, the root page continues to be
794 * the parent and the updated tuple went to one of the
795 * child pages, so we just need to retry from the root
796 * page.
797 */
798 if (stack->blkno != GIST_ROOT_BLKNO)
799 {
800 UnlockReleaseBuffer(stack->buffer);
801 xlocked = false;
802 state.stack = stack = stack->parent;
803 }
804 continue;
805 }
806 }
807 LockBuffer(stack->buffer, GIST_UNLOCK);
808 xlocked = false;
809
810 /* descend to the chosen child */
811 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
812 item->blkno = childblkno;
813 item->parent = stack;
814 item->downlinkoffnum = downlinkoffnum;
815 state.stack = stack = item;
816 }
817 else
818 {
819 /*
820 * Leaf page. Insert the new key. We've already updated all the
821 * parents on the way down, but we might have to split the page if
822 * it doesn't fit. gistinserthere() will take care of that.
823 */
824
825 /*
826 * Swap shared lock for an exclusive one. Be careful, the page may
827 * change while we unlock/lock the page...
828 */
829 if (!xlocked)
830 {
831 LockBuffer(stack->buffer, GIST_UNLOCK);
832 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
833 xlocked = true;
834 stack->page = (Page) BufferGetPage(stack->buffer);
835 stack->lsn = PageGetLSN(stack->page);
836
837 if (stack->blkno == GIST_ROOT_BLKNO)
838 {
839 /*
840 * the only page that can become inner instead of leaf is
841 * the root page, so for root we should recheck it
842 */
843 if (!GistPageIsLeaf(stack->page))
844 {
845 /*
846 * very rare situation: during unlock/lock index with
847 * number of pages = 1 was increased
848 */
849 LockBuffer(stack->buffer, GIST_UNLOCK);
850 xlocked = false;
851 continue;
852 }
853
854 /*
855 * we don't need to check root split, because checking
856 * leaf/inner is enough to recognize split for root
857 */
858 }
859 else if ((GistFollowRight(stack->page) ||
860 stack->parent->lsn < GistPageGetNSN(stack->page)) &&
861 GistPageIsDeleted(stack->page))
862 {
863 /*
864 * The page was split or deleted while we momentarily
865 * unlocked the page. Go back to parent.
866 */
867 UnlockReleaseBuffer(stack->buffer);
868 xlocked = false;
869 state.stack = stack = stack->parent;
870 continue;
871 }
872 }
873
874 /* now state.stack->(page, buffer and blkno) points to leaf page */
875
876 gistinserttuple(&state, stack, giststate, itup,
877 InvalidOffsetNumber);
878 LockBuffer(stack->buffer, GIST_UNLOCK);
879
880 /* Release any pins we might still hold before exiting */
881 for (; stack; stack = stack->parent)
882 ReleaseBuffer(stack->buffer);
883 break;
884 }
885 }
886}
887
888/*
889 * Traverse the tree to find path from root page to specified "child" block.
890 *
891 * returns a new insertion stack, starting from the parent of "child", up
892 * to the root. *downlinkoffnum is set to the offset of the downlink in the
893 * direct parent of child.
894 *
895 * To prevent deadlocks, this should lock only one page at a time.
896 */
897static GISTInsertStack *
898gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
899{
900 Page page;
901 Buffer buffer;
902 OffsetNumber i,
903 maxoff;
904 ItemId iid;
905 IndexTuple idxtuple;
906 List *fifo;
907 GISTInsertStack *top,
908 *ptr;
909 BlockNumber blkno;
910
911 top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
912 top->blkno = GIST_ROOT_BLKNO;
913 top->downlinkoffnum = InvalidOffsetNumber;
914
915 fifo = list_make1(top);
916 while (fifo != NIL)
917 {
918 /* Get next page to visit */
919 top = linitial(fifo);
920 fifo = list_delete_first(fifo);
921
922 buffer = ReadBuffer(r, top->blkno);
923 LockBuffer(buffer, GIST_SHARE);
924 gistcheckpage(r, buffer);
925 page = (Page) BufferGetPage(buffer);
926
927 if (GistPageIsLeaf(page))
928 {
929 /*
930 * Because we scan the index top-down, all the rest of the pages
931 * in the queue must be leaf pages as well.
932 */
933 UnlockReleaseBuffer(buffer);
934 break;
935 }
936
937 /* currently, internal pages are never deleted */
938 Assert(!GistPageIsDeleted(page));
939
940 top->lsn = BufferGetLSNAtomic(buffer);
941
942 /*
943 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
944 * downlink. This should not normally happen..
945 */
946 if (GistFollowRight(page))
947 elog(ERROR, "concurrent GiST page split was incomplete");
948
949 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
950 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
951 {
952 /*
953 * Page was split while we looked elsewhere. We didn't see the
954 * downlink to the right page when we scanned the parent, so add
955 * it to the queue now.
956 *
957 * Put the right page ahead of the queue, so that we visit it
958 * next. That's important, because if this is the lowest internal
959 * level, just above leaves, we might already have queued up some
960 * leaf pages, and we assume that there can't be any non-leaf
961 * pages behind leaf pages.
962 */
963 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
964 ptr->blkno = GistPageGetOpaque(page)->rightlink;
965 ptr->downlinkoffnum = InvalidOffsetNumber;
966 ptr->parent = top->parent;
967
968 fifo = lcons(ptr, fifo);
969 }
970
971 maxoff = PageGetMaxOffsetNumber(page);
972
973 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
974 {
975 iid = PageGetItemId(page, i);
976 idxtuple = (IndexTuple) PageGetItem(page, iid);
977 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
978 if (blkno == child)
979 {
980 /* Found it! */
981 UnlockReleaseBuffer(buffer);
982 *downlinkoffnum = i;
983 return top;
984 }
985 else
986 {
987 /* Append this child to the list of pages to visit later */
988 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
989 ptr->blkno = blkno;
990 ptr->downlinkoffnum = i;
991 ptr->parent = top;
992
993 fifo = lappend(fifo, ptr);
994 }
995 }
996
997 UnlockReleaseBuffer(buffer);
998 }
999
1000 elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
1001 RelationGetRelationName(r), child);
1002 return NULL; /* keep compiler quiet */
1003}
1004
1005/*
1006 * Updates the stack so that child->parent is the correct parent of the
1007 * child. child->parent must be exclusively locked on entry, and will
1008 * remain so at exit, but it might not be the same page anymore.
1009 */
1010static void
1011gistFindCorrectParent(Relation r, GISTInsertStack *child)
1012{
1013 GISTInsertStack *parent = child->parent;
1014
1015 gistcheckpage(r, parent->buffer);
1016 parent->page = (Page) BufferGetPage(parent->buffer);
1017
1018 /* here we don't need to distinguish between split and page update */
1019 if (child->downlinkoffnum == InvalidOffsetNumber ||
1020 parent->lsn != PageGetLSN(parent->page))
1021 {
1022 /* parent is changed, look child in right links until found */
1023 OffsetNumber i,
1024 maxoff;
1025 ItemId iid;
1026 IndexTuple idxtuple;
1027 GISTInsertStack *ptr;
1028
1029 while (true)
1030 {
1031 maxoff = PageGetMaxOffsetNumber(parent->page);
1032 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1033 {
1034 iid = PageGetItemId(parent->page, i);
1035 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1036 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1037 {
1038 /* yes!!, found */
1039 child->downlinkoffnum = i;
1040 return;
1041 }
1042 }
1043
1044 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1045 UnlockReleaseBuffer(parent->buffer);
1046 if (parent->blkno == InvalidBlockNumber)
1047 {
1048 /*
1049 * End of chain and still didn't find parent. It's a very-very
1050 * rare situation when root splited.
1051 */
1052 break;
1053 }
1054 parent->buffer = ReadBuffer(r, parent->blkno);
1055 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1056 gistcheckpage(r, parent->buffer);
1057 parent->page = (Page) BufferGetPage(parent->buffer);
1058 }
1059
1060 /*
1061 * awful!!, we need search tree to find parent ... , but before we
1062 * should release all old parent
1063 */
1064
1065 ptr = child->parent->parent; /* child->parent already released
1066 * above */
1067 while (ptr)
1068 {
1069 ReleaseBuffer(ptr->buffer);
1070 ptr = ptr->parent;
1071 }
1072
1073 /* ok, find new path */
1074 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1075
1076 /* read all buffers as expected by caller */
1077 /* note we don't lock them or gistcheckpage them here! */
1078 while (ptr)
1079 {
1080 ptr->buffer = ReadBuffer(r, ptr->blkno);
1081 ptr->page = (Page) BufferGetPage(ptr->buffer);
1082 ptr = ptr->parent;
1083 }
1084
1085 /* install new chain of parents to stack */
1086 child->parent = parent;
1087
1088 /* make recursive call to normal processing */
1089 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1090 gistFindCorrectParent(r, child);
1091 }
1092
1093 return;
1094}
1095
1096/*
1097 * Form a downlink pointer for the page in 'buf'.
1098 */
1099static IndexTuple
1100gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1101 GISTInsertStack *stack)
1102{
1103 Page page = BufferGetPage(buf);
1104 OffsetNumber maxoff;
1105 OffsetNumber offset;
1106 IndexTuple downlink = NULL;
1107
1108 maxoff = PageGetMaxOffsetNumber(page);
1109 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1110 {
1111 IndexTuple ituple = (IndexTuple)
1112 PageGetItem(page, PageGetItemId(page, offset));
1113
1114 if (downlink == NULL)
1115 downlink = CopyIndexTuple(ituple);
1116 else
1117 {
1118 IndexTuple newdownlink;
1119
1120 newdownlink = gistgetadjusted(rel, downlink, ituple,
1121 giststate);
1122 if (newdownlink)
1123 downlink = newdownlink;
1124 }
1125 }
1126
1127 /*
1128 * If the page is completely empty, we can't form a meaningful downlink
1129 * for it. But we have to insert a downlink for the page. Any key will do,
1130 * as long as its consistent with the downlink of parent page, so that we
1131 * can legally insert it to the parent. A minimal one that matches as few
1132 * scans as possible would be best, to keep scans from doing useless work,
1133 * but we don't know how to construct that. So we just use the downlink of
1134 * the original page that was split - that's as far from optimal as it can
1135 * get but will do..
1136 */
1137 if (!downlink)
1138 {
1139 ItemId iid;
1140
1141 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1142 gistFindCorrectParent(rel, stack);
1143 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1144 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1145 downlink = CopyIndexTuple(downlink);
1146 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1147 }
1148
1149 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1150 GistTupleSetValid(downlink);
1151
1152 return downlink;
1153}
1154
1155
1156/*
1157 * Complete the incomplete split of state->stack->page.
1158 */
1159static void
1160gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1161{
1162 GISTInsertStack *stack = state->stack;
1163 Buffer buf;
1164 Page page;
1165 List *splitinfo = NIL;
1166
1167 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1168 RelationGetRelationName(state->r), stack->blkno);
1169
1170 Assert(GistFollowRight(stack->page));
1171 Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1172
1173 buf = stack->buffer;
1174
1175 /*
1176 * Read the chain of split pages, following the rightlinks. Construct a
1177 * downlink tuple for each page.
1178 */
1179 for (;;)
1180 {
1181 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1182 IndexTuple downlink;
1183
1184 page = BufferGetPage(buf);
1185
1186 /* Form the new downlink tuples to insert to parent */
1187 downlink = gistformdownlink(state->r, buf, giststate, stack);
1188
1189 si->buf = buf;
1190 si->downlink = downlink;
1191
1192 splitinfo = lappend(splitinfo, si);
1193
1194 if (GistFollowRight(page))
1195 {
1196 /* lock next page */
1197 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1198 LockBuffer(buf, GIST_EXCLUSIVE);
1199 }
1200 else
1201 break;
1202 }
1203
1204 /* Insert the downlinks */
1205 gistfinishsplit(state, stack, giststate, splitinfo, false);
1206}
1207
1208/*
1209 * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1210 * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1211 * 'stack' represents the path from the root to the page being updated.
1212 *
1213 * The caller must hold an exclusive lock on stack->buffer. The lock is still
1214 * held on return, but the page might not contain the inserted tuple if the
1215 * page was split. The function returns true if the page was split, false
1216 * otherwise.
1217 */
1218static bool
1219gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1220 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1221{
1222 return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1223 InvalidBuffer, InvalidBuffer, false, false);
1224}
1225
1226/* ----------------
1227 * An extended workhorse version of gistinserttuple(). This version allows
1228 * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1229 * This is used to recursively update the downlinks in the parent when a page
1230 * is split.
1231 *
1232 * If leftchild and rightchild are valid, we're inserting/replacing the
1233 * downlink for rightchild, and leftchild is its left sibling. We clear the
1234 * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1235 * insertion of the downlink.
1236 *
1237 * To avoid holding locks for longer than necessary, when recursing up the
1238 * tree to update the parents, the locking is a bit peculiar here. On entry,
1239 * the caller must hold an exclusive lock on stack->buffer, as well as
1240 * leftchild and rightchild if given. On return:
1241 *
1242 * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1243 * always kept pinned, however.
1244 * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1245 * is kept pinned.
1246 * - Lock and pin on 'rightchild' are always released.
1247 *
1248 * Returns 'true' if the page had to be split. Note that if the page was
1249 * split, the inserted/updated tuples might've been inserted to a right
1250 * sibling of stack->buffer instead of stack->buffer itself.
1251 */
1252static bool
1253gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1254 GISTSTATE *giststate,
1255 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1256 Buffer leftchild, Buffer rightchild,
1257 bool unlockbuf, bool unlockleftchild)
1258{
1259 List *splitinfo;
1260 bool is_split;
1261
1262 /*
1263 * Check for any rw conflicts (in serializable isolation level) just
1264 * before we intend to modify the page
1265 */
1266 CheckForSerializableConflictIn(state->r, NULL, stack->buffer);
1267
1268 /* Insert the tuple(s) to the page, splitting the page if necessary */
1269 is_split = gistplacetopage(state->r, state->freespace, giststate,
1270 stack->buffer,
1271 tuples, ntup,
1272 oldoffnum, NULL,
1273 leftchild,
1274 &splitinfo,
1275 true,
1276 state->heapRel,
1277 state->is_build);
1278
1279 /*
1280 * Before recursing up in case the page was split, release locks on the
1281 * child pages. We don't need to keep them locked when updating the
1282 * parent.
1283 */
1284 if (BufferIsValid(rightchild))
1285 UnlockReleaseBuffer(rightchild);
1286 if (BufferIsValid(leftchild) && unlockleftchild)
1287 LockBuffer(leftchild, GIST_UNLOCK);
1288
1289 /*
1290 * If we had to split, insert/update the downlinks in the parent. If the
1291 * caller requested us to release the lock on stack->buffer, tell
1292 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1293 * didn't have to split, release it ourselves.
1294 */
1295 if (splitinfo)
1296 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1297 else if (unlockbuf)
1298 LockBuffer(stack->buffer, GIST_UNLOCK);
1299
1300 return is_split;
1301}
1302
1303/*
1304 * Finish an incomplete split by inserting/updating the downlinks in parent
1305 * page. 'splitinfo' contains all the child pages involved in the split,
1306 * from left-to-right.
1307 *
1308 * On entry, the caller must hold a lock on stack->buffer and all the child
1309 * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1310 * released on return. The child pages are always unlocked and unpinned.
1311 */
1312static void
1313gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1314 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1315{
1316 ListCell *lc;
1317 List *reversed;
1318 GISTPageSplitInfo *right;
1319 GISTPageSplitInfo *left;
1320 IndexTuple tuples[2];
1321
1322 /* A split always contains at least two halves */
1323 Assert(list_length(splitinfo) >= 2);
1324
1325 /*
1326 * We need to insert downlinks for each new page, and update the downlink
1327 * for the original (leftmost) page in the split. Begin at the rightmost
1328 * page, inserting one downlink at a time until there's only two pages
1329 * left. Finally insert the downlink for the last new page and update the
1330 * downlink for the original page as one operation.
1331 */
1332
1333 /* for convenience, create a copy of the list in reverse order */
1334 reversed = NIL;
1335 foreach(lc, splitinfo)
1336 {
1337 reversed = lcons(lfirst(lc), reversed);
1338 }
1339
1340 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1341 gistFindCorrectParent(state->r, stack);
1342
1343 /*
1344 * insert downlinks for the siblings from right to left, until there are
1345 * only two siblings left.
1346 */
1347 while (list_length(reversed) > 2)
1348 {
1349 right = (GISTPageSplitInfo *) linitial(reversed);
1350 left = (GISTPageSplitInfo *) lsecond(reversed);
1351
1352 if (gistinserttuples(state, stack->parent, giststate,
1353 &right->downlink, 1,
1354 InvalidOffsetNumber,
1355 left->buf, right->buf, false, false))
1356 {
1357 /*
1358 * If the parent page was split, need to relocate the original
1359 * parent pointer.
1360 */
1361 gistFindCorrectParent(state->r, stack);
1362 }
1363 /* gistinserttuples() released the lock on right->buf. */
1364 reversed = list_delete_first(reversed);
1365 }
1366
1367 right = (GISTPageSplitInfo *) linitial(reversed);
1368 left = (GISTPageSplitInfo *) lsecond(reversed);
1369
1370 /*
1371 * Finally insert downlink for the remaining right page and update the
1372 * downlink for the original page to not contain the tuples that were
1373 * moved to the new pages.
1374 */
1375 tuples[0] = left->downlink;
1376 tuples[1] = right->downlink;
1377 gistinserttuples(state, stack->parent, giststate,
1378 tuples, 2,
1379 stack->downlinkoffnum,
1380 left->buf, right->buf,
1381 true, /* Unlock parent */
1382 unlockbuf /* Unlock stack->buffer if caller wants that */
1383 );
1384 Assert(left->buf == stack->buffer);
1385
1386 /*
1387 * If we split the page because we had to adjust the downlink on an
1388 * internal page, while descending the tree for inserting a new tuple,
1389 * then this might no longer be the correct page for the new tuple. The
1390 * downlink to this page might not cover the new tuple anymore, it might
1391 * need to go to the newly-created right sibling instead. Tell the caller
1392 * to walk back up the stack, to re-check at the parent which page to
1393 * insert to.
1394 *
1395 * Normally, the LSN-NSN interlock during the tree descend would also
1396 * detect that a concurrent split happened (by ourselves), and cause us to
1397 * retry at the parent. But that mechanism doesn't work during index
1398 * build, because we don't do WAL-logging, and don't update LSNs, during
1399 * index build.
1400 */
1401 stack->retry_from_parent = true;
1402}
1403
1404/*
1405 * gistSplit -- split a page in the tree and fill struct
1406 * used for XLOG and real writes buffers. Function is recursive, ie
1407 * it will split page until keys will fit in every page.
1408 */
1409SplitedPageLayout *
1410gistSplit(Relation r,
1411 Page page,
1412 IndexTuple *itup, /* contains compressed entry */
1413 int len,
1414 GISTSTATE *giststate)
1415{
1416 IndexTuple *lvectup,
1417 *rvectup;
1418 GistSplitVector v;
1419 int i;
1420 SplitedPageLayout *res = NULL;
1421
1422 /* this should never recurse very deeply, but better safe than sorry */
1423 check_stack_depth();
1424
1425 /* there's no point in splitting an empty page */
1426 Assert(len > 0);
1427
1428 /*
1429 * If a single tuple doesn't fit on a page, no amount of splitting will
1430 * help.
1431 */
1432 if (len == 1)
1433 ereport(ERROR,
1434 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1435 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1436 IndexTupleSize(itup[0]), GiSTPageSize,
1437 RelationGetRelationName(r))));
1438
1439 memset(v.spl_lisnull, true,
1440 sizeof(bool) * giststate->nonLeafTupdesc->natts);
1441 memset(v.spl_risnull, true,
1442 sizeof(bool) * giststate->nonLeafTupdesc->natts);
1443 gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1444
1445 /* form left and right vector */
1446 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1447 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1448
1449 for (i = 0; i < v.splitVector.spl_nleft; i++)
1450 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1451
1452 for (i = 0; i < v.splitVector.spl_nright; i++)
1453 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1454
1455 /* finalize splitting (may need another split) */
1456 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1457 {
1458 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1459 }
1460 else
1461 {
1462 ROTATEDIST(res);
1463 res->block.num = v.splitVector.spl_nright;
1464 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1465 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1466 }
1467
1468 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1469 {
1470 SplitedPageLayout *resptr,
1471 *subres;
1472
1473 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1474
1475 /* install on list's tail */
1476 while (resptr->next)
1477 resptr = resptr->next;
1478
1479 resptr->next = res;
1480 res = subres;
1481 }
1482 else
1483 {
1484 ROTATEDIST(res);
1485 res->block.num = v.splitVector.spl_nleft;
1486 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1487 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1488 }
1489
1490 return res;
1491}
1492
1493/*
1494 * Create a GISTSTATE and fill it with information about the index
1495 */
1496GISTSTATE *
1497initGISTstate(Relation index)
1498{
1499 GISTSTATE *giststate;
1500 MemoryContext scanCxt;
1501 MemoryContext oldCxt;
1502 int i;
1503
1504 /* safety check to protect fixed-size arrays in GISTSTATE */
1505 if (index->rd_att->natts > INDEX_MAX_KEYS)
1506 elog(ERROR, "numberOfAttributes %d > %d",
1507 index->rd_att->natts, INDEX_MAX_KEYS);
1508
1509 /* Create the memory context that will hold the GISTSTATE */
1510 scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1511 "GiST scan context",
1512 ALLOCSET_DEFAULT_SIZES);
1513 oldCxt = MemoryContextSwitchTo(scanCxt);
1514
1515 /* Create and fill in the GISTSTATE */
1516 giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1517
1518 giststate->scanCxt = scanCxt;
1519 giststate->tempCxt = scanCxt; /* caller must change this if needed */
1520 giststate->leafTupdesc = index->rd_att;
1521
1522 /*
1523 * The truncated tupdesc for non-leaf index tuples, which doesn't contain
1524 * the INCLUDE attributes.
1525 *
1526 * It is used to form tuples during tuple adjustment and page split.
1527 * B-tree creates shortened tuple descriptor for every truncated tuple,
1528 * because it is doing this less often: it does not have to form truncated
1529 * tuples during page split. Also, B-tree is not adjusting tuples on
1530 * internal pages the way GiST does.
1531 */
1532 giststate->nonLeafTupdesc = CreateTupleDescCopyConstr(index->rd_att);
1533 giststate->nonLeafTupdesc->natts =
1534 IndexRelationGetNumberOfKeyAttributes(index);
1535
1536 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(index); i++)
1537 {
1538 fmgr_info_copy(&(giststate->consistentFn[i]),
1539 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1540 scanCxt);
1541 fmgr_info_copy(&(giststate->unionFn[i]),
1542 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1543 scanCxt);
1544
1545 /* opclasses are not required to provide a Compress method */
1546 if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1547 fmgr_info_copy(&(giststate->compressFn[i]),
1548 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1549 scanCxt);
1550 else
1551 giststate->compressFn[i].fn_oid = InvalidOid;
1552
1553 /* opclasses are not required to provide a Decompress method */
1554 if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1555 fmgr_info_copy(&(giststate->decompressFn[i]),
1556 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1557 scanCxt);
1558 else
1559 giststate->decompressFn[i].fn_oid = InvalidOid;
1560
1561 fmgr_info_copy(&(giststate->penaltyFn[i]),
1562 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1563 scanCxt);
1564 fmgr_info_copy(&(giststate->picksplitFn[i]),
1565 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1566 scanCxt);
1567 fmgr_info_copy(&(giststate->equalFn[i]),
1568 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1569 scanCxt);
1570
1571 /* opclasses are not required to provide a Distance method */
1572 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1573 fmgr_info_copy(&(giststate->distanceFn[i]),
1574 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1575 scanCxt);
1576 else
1577 giststate->distanceFn[i].fn_oid = InvalidOid;
1578
1579 /* opclasses are not required to provide a Fetch method */
1580 if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1581 fmgr_info_copy(&(giststate->fetchFn[i]),
1582 index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1583 scanCxt);
1584 else
1585 giststate->fetchFn[i].fn_oid = InvalidOid;
1586
1587 /*
1588 * If the index column has a specified collation, we should honor that
1589 * while doing comparisons. However, we may have a collatable storage
1590 * type for a noncollatable indexed data type. If there's no index
1591 * collation then specify default collation in case the support
1592 * functions need collation. This is harmless if the support
1593 * functions don't care about collation, so we just do it
1594 * unconditionally. (We could alternatively call get_typcollation,
1595 * but that seems like expensive overkill --- there aren't going to be
1596 * any cases where a GiST storage type has a nondefault collation.)
1597 */
1598 if (OidIsValid(index->rd_indcollation[i]))
1599 giststate->supportCollation[i] = index->rd_indcollation[i];
1600 else
1601 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1602 }
1603
1604 /* No opclass information for INCLUDE attributes */
1605 for (; i < index->rd_att->natts; i++)
1606 {
1607 giststate->consistentFn[i].fn_oid = InvalidOid;
1608 giststate->unionFn[i].fn_oid = InvalidOid;
1609 giststate->compressFn[i].fn_oid = InvalidOid;
1610 giststate->decompressFn[i].fn_oid = InvalidOid;
1611 giststate->penaltyFn[i].fn_oid = InvalidOid;
1612 giststate->picksplitFn[i].fn_oid = InvalidOid;
1613 giststate->equalFn[i].fn_oid = InvalidOid;
1614 giststate->distanceFn[i].fn_oid = InvalidOid;
1615 giststate->fetchFn[i].fn_oid = InvalidOid;
1616 giststate->supportCollation[i] = InvalidOid;
1617 }
1618
1619 MemoryContextSwitchTo(oldCxt);
1620
1621 return giststate;
1622}
1623
1624void
1625freeGISTstate(GISTSTATE *giststate)
1626{
1627 /* It's sufficient to delete the scanCxt */
1628 MemoryContextDelete(giststate->scanCxt);
1629}
1630
1631/*
1632 * gistprunepage() -- try to remove LP_DEAD items from the given page.
1633 * Function assumes that buffer is exclusively locked.
1634 */
1635static void
1636gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1637{
1638 OffsetNumber deletable[MaxIndexTuplesPerPage];
1639 int ndeletable = 0;
1640 OffsetNumber offnum,
1641 maxoff;
1642 TransactionId latestRemovedXid = InvalidTransactionId;
1643
1644 Assert(GistPageIsLeaf(page));
1645
1646 /*
1647 * Scan over all items to see which ones need to be deleted according to
1648 * LP_DEAD flags.
1649 */
1650 maxoff = PageGetMaxOffsetNumber(page);
1651 for (offnum = FirstOffsetNumber;
1652 offnum <= maxoff;
1653 offnum = OffsetNumberNext(offnum))
1654 {
1655 ItemId itemId = PageGetItemId(page, offnum);
1656
1657 if (ItemIdIsDead(itemId))
1658 deletable[ndeletable++] = offnum;
1659 }
1660
1661 if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
1662 latestRemovedXid =
1663 index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
1664 deletable, ndeletable);
1665
1666 if (ndeletable > 0)
1667 {
1668 START_CRIT_SECTION();
1669
1670 PageIndexMultiDelete(page, deletable, ndeletable);
1671
1672 /*
1673 * Mark the page as not containing any LP_DEAD items. This is not
1674 * certainly true (there might be some that have recently been marked,
1675 * but weren't included in our target-item list), but it will almost
1676 * always be true and it doesn't seem worth an additional page scan to
1677 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1678 */
1679 GistClearPageHasGarbage(page);
1680
1681 MarkBufferDirty(buffer);
1682
1683 /* XLOG stuff */
1684 if (RelationNeedsWAL(rel))
1685 {
1686 XLogRecPtr recptr;
1687
1688 recptr = gistXLogDelete(buffer,
1689 deletable, ndeletable,
1690 latestRemovedXid);
1691
1692 PageSetLSN(page, recptr);
1693 }
1694 else
1695 PageSetLSN(page, gistGetFakeLSN(rel));
1696
1697 END_CRIT_SECTION();
1698 }
1699
1700 /*
1701 * Note: if we didn't find any LP_DEAD items, then the page's
1702 * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1703 * separate write to clear it, however. We will clear it when we split
1704 * the page.
1705 */
1706}
1707