1/*-------------------------------------------------------------------------
2 *
3 * spgvacuum.c
4 * vacuum for SP-GiST
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgvacuum.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/genam.h"
19#include "access/spgist_private.h"
20#include "access/spgxlog.h"
21#include "access/transam.h"
22#include "access/xloginsert.h"
23#include "catalog/storage_xlog.h"
24#include "commands/vacuum.h"
25#include "miscadmin.h"
26#include "storage/bufmgr.h"
27#include "storage/indexfsm.h"
28#include "storage/lmgr.h"
29#include "utils/snapmgr.h"
30
31
32/* Entry in pending-list of TIDs we need to revisit */
33typedef struct spgVacPendingItem
34{
35 ItemPointerData tid; /* redirection target to visit */
36 bool done; /* have we dealt with this? */
37 struct spgVacPendingItem *next; /* list link */
38} spgVacPendingItem;
39
40/* Local state for vacuum operations */
41typedef struct spgBulkDeleteState
42{
43 /* Parameters passed in to spgvacuumscan */
44 IndexVacuumInfo *info;
45 IndexBulkDeleteResult *stats;
46 IndexBulkDeleteCallback callback;
47 void *callback_state;
48
49 /* Additional working state */
50 SpGistState spgstate; /* for SPGiST operations that need one */
51 spgVacPendingItem *pendingList; /* TIDs we need to (re)visit */
52 TransactionId myXmin; /* for detecting newly-added redirects */
53 BlockNumber lastFilledBlock; /* last non-deletable block */
54} spgBulkDeleteState;
55
56
57/*
58 * Add TID to pendingList, but only if not already present.
59 *
60 * Note that new items are always appended at the end of the list; this
61 * ensures that scans of the list don't miss items added during the scan.
62 */
63static void
64spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
65{
66 spgVacPendingItem *pitem;
67 spgVacPendingItem **listLink;
68
69 /* search the list for pre-existing entry */
70 listLink = &bds->pendingList;
71 while (*listLink != NULL)
72 {
73 pitem = *listLink;
74 if (ItemPointerEquals(tid, &pitem->tid))
75 return; /* already in list, do nothing */
76 listLink = &pitem->next;
77 }
78 /* not there, so append new entry */
79 pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
80 pitem->tid = *tid;
81 pitem->done = false;
82 pitem->next = NULL;
83 *listLink = pitem;
84}
85
86/*
87 * Clear pendingList
88 */
89static void
90spgClearPendingList(spgBulkDeleteState *bds)
91{
92 spgVacPendingItem *pitem;
93 spgVacPendingItem *nitem;
94
95 for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
96 {
97 nitem = pitem->next;
98 /* All items in list should have been dealt with */
99 Assert(pitem->done);
100 pfree(pitem);
101 }
102 bds->pendingList = NULL;
103}
104
105/*
106 * Vacuum a regular (non-root) leaf page
107 *
108 * We must delete tuples that are targeted for deletion by the VACUUM,
109 * but not move any tuples that are referenced by outside links; we assume
110 * those are the ones that are heads of chains.
111 *
112 * If we find a REDIRECT that was made by a concurrently-running transaction,
113 * we must add its target TID to pendingList. (We don't try to visit the
114 * target immediately, first because we don't want VACUUM locking more than
115 * one buffer at a time, and second because the duplicate-filtering logic
116 * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
117 * loop in the face of continuous concurrent insertions.)
118 *
119 * If forPending is true, we are examining the page as a consequence of
120 * chasing a redirect link, not as part of the normal sequential scan.
121 * We still vacuum the page normally, but we don't increment the stats
122 * about live tuples; else we'd double-count those tuples, since the page
123 * has been or will be visited in the sequential scan as well.
124 */
125static void
126vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
127 bool forPending)
128{
129 Page page = BufferGetPage(buffer);
130 spgxlogVacuumLeaf xlrec;
131 OffsetNumber toDead[MaxIndexTuplesPerPage];
132 OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
133 OffsetNumber moveSrc[MaxIndexTuplesPerPage];
134 OffsetNumber moveDest[MaxIndexTuplesPerPage];
135 OffsetNumber chainSrc[MaxIndexTuplesPerPage];
136 OffsetNumber chainDest[MaxIndexTuplesPerPage];
137 OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
138 bool deletable[MaxIndexTuplesPerPage + 1];
139 int nDeletable;
140 OffsetNumber i,
141 max = PageGetMaxOffsetNumber(page);
142
143 memset(predecessor, 0, sizeof(predecessor));
144 memset(deletable, 0, sizeof(deletable));
145 nDeletable = 0;
146
147 /* Scan page, identify tuples to delete, accumulate stats */
148 for (i = FirstOffsetNumber; i <= max; i++)
149 {
150 SpGistLeafTuple lt;
151
152 lt = (SpGistLeafTuple) PageGetItem(page,
153 PageGetItemId(page, i));
154 if (lt->tupstate == SPGIST_LIVE)
155 {
156 Assert(ItemPointerIsValid(&lt->heapPtr));
157
158 if (bds->callback(&lt->heapPtr, bds->callback_state))
159 {
160 bds->stats->tuples_removed += 1;
161 deletable[i] = true;
162 nDeletable++;
163 }
164 else
165 {
166 if (!forPending)
167 bds->stats->num_index_tuples += 1;
168 }
169
170 /* Form predecessor map, too */
171 if (lt->nextOffset != InvalidOffsetNumber)
172 {
173 /* paranoia about corrupted chain links */
174 if (lt->nextOffset < FirstOffsetNumber ||
175 lt->nextOffset > max ||
176 predecessor[lt->nextOffset] != InvalidOffsetNumber)
177 elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
178 BufferGetBlockNumber(buffer),
179 RelationGetRelationName(index));
180 predecessor[lt->nextOffset] = i;
181 }
182 }
183 else if (lt->tupstate == SPGIST_REDIRECT)
184 {
185 SpGistDeadTuple dt = (SpGistDeadTuple) lt;
186
187 Assert(dt->nextOffset == InvalidOffsetNumber);
188 Assert(ItemPointerIsValid(&dt->pointer));
189
190 /*
191 * Add target TID to pending list if the redirection could have
192 * happened since VACUUM started.
193 *
194 * Note: we could make a tighter test by seeing if the xid is
195 * "running" according to the active snapshot; but snapmgr.c
196 * doesn't currently export a suitable API, and it's not entirely
197 * clear that a tighter test is worth the cycles anyway.
198 */
199 if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
200 spgAddPendingTID(bds, &dt->pointer);
201 }
202 else
203 {
204 Assert(lt->nextOffset == InvalidOffsetNumber);
205 }
206 }
207
208 if (nDeletable == 0)
209 return; /* nothing more to do */
210
211 /*----------
212 * Figure out exactly what we have to do. We do this separately from
213 * actually modifying the page, mainly so that we have a representation
214 * that can be dumped into WAL and then the replay code can do exactly
215 * the same thing. The output of this step consists of six arrays
216 * describing four kinds of operations, to be performed in this order:
217 *
218 * toDead[]: tuple numbers to be replaced with DEAD tuples
219 * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
220 * moveSrc[]: tuple numbers that need to be relocated to another offset
221 * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
222 * moveDest[]: new locations for moveSrc tuples
223 * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
224 * chainDest[]: new values of nextOffset for chainSrc members
225 *
226 * It's easiest to figure out what we have to do by processing tuple
227 * chains, so we iterate over all the tuples (not just the deletable
228 * ones!) to identify chain heads, then chase down each chain and make
229 * work item entries for deletable tuples within the chain.
230 *----------
231 */
232 xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
233
234 for (i = FirstOffsetNumber; i <= max; i++)
235 {
236 SpGistLeafTuple head;
237 bool interveningDeletable;
238 OffsetNumber prevLive;
239 OffsetNumber j;
240
241 head = (SpGistLeafTuple) PageGetItem(page,
242 PageGetItemId(page, i));
243 if (head->tupstate != SPGIST_LIVE)
244 continue; /* can't be a chain member */
245 if (predecessor[i] != 0)
246 continue; /* not a chain head */
247
248 /* initialize ... */
249 interveningDeletable = false;
250 prevLive = deletable[i] ? InvalidOffsetNumber : i;
251
252 /* scan down the chain ... */
253 j = head->nextOffset;
254 while (j != InvalidOffsetNumber)
255 {
256 SpGistLeafTuple lt;
257
258 lt = (SpGistLeafTuple) PageGetItem(page,
259 PageGetItemId(page, j));
260 if (lt->tupstate != SPGIST_LIVE)
261 {
262 /* all tuples in chain should be live */
263 elog(ERROR, "unexpected SPGiST tuple state: %d",
264 lt->tupstate);
265 }
266
267 if (deletable[j])
268 {
269 /* This tuple should be replaced by a placeholder */
270 toPlaceholder[xlrec.nPlaceholder] = j;
271 xlrec.nPlaceholder++;
272 /* previous live tuple's chain link will need an update */
273 interveningDeletable = true;
274 }
275 else if (prevLive == InvalidOffsetNumber)
276 {
277 /*
278 * This is the first live tuple in the chain. It has to move
279 * to the head position.
280 */
281 moveSrc[xlrec.nMove] = j;
282 moveDest[xlrec.nMove] = i;
283 xlrec.nMove++;
284 /* Chain updates will be applied after the move */
285 prevLive = i;
286 interveningDeletable = false;
287 }
288 else
289 {
290 /*
291 * Second or later live tuple. Arrange to re-chain it to the
292 * previous live one, if there was a gap.
293 */
294 if (interveningDeletable)
295 {
296 chainSrc[xlrec.nChain] = prevLive;
297 chainDest[xlrec.nChain] = j;
298 xlrec.nChain++;
299 }
300 prevLive = j;
301 interveningDeletable = false;
302 }
303
304 j = lt->nextOffset;
305 }
306
307 if (prevLive == InvalidOffsetNumber)
308 {
309 /* The chain is entirely removable, so we need a DEAD tuple */
310 toDead[xlrec.nDead] = i;
311 xlrec.nDead++;
312 }
313 else if (interveningDeletable)
314 {
315 /* One or more deletions at end of chain, so close it off */
316 chainSrc[xlrec.nChain] = prevLive;
317 chainDest[xlrec.nChain] = InvalidOffsetNumber;
318 xlrec.nChain++;
319 }
320 }
321
322 /* sanity check ... */
323 if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
324 elog(ERROR, "inconsistent counts of deletable tuples");
325
326 /* Do the updates */
327 START_CRIT_SECTION();
328
329 spgPageIndexMultiDelete(&bds->spgstate, page,
330 toDead, xlrec.nDead,
331 SPGIST_DEAD, SPGIST_DEAD,
332 InvalidBlockNumber, InvalidOffsetNumber);
333
334 spgPageIndexMultiDelete(&bds->spgstate, page,
335 toPlaceholder, xlrec.nPlaceholder,
336 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
337 InvalidBlockNumber, InvalidOffsetNumber);
338
339 /*
340 * We implement the move step by swapping the line pointers of the source
341 * and target tuples, then replacing the newly-source tuples with
342 * placeholders. This is perhaps unduly friendly with the page data
343 * representation, but it's fast and doesn't risk page overflow when a
344 * tuple to be relocated is large.
345 */
346 for (i = 0; i < xlrec.nMove; i++)
347 {
348 ItemId idSrc = PageGetItemId(page, moveSrc[i]);
349 ItemId idDest = PageGetItemId(page, moveDest[i]);
350 ItemIdData tmp;
351
352 tmp = *idSrc;
353 *idSrc = *idDest;
354 *idDest = tmp;
355 }
356
357 spgPageIndexMultiDelete(&bds->spgstate, page,
358 moveSrc, xlrec.nMove,
359 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
360 InvalidBlockNumber, InvalidOffsetNumber);
361
362 for (i = 0; i < xlrec.nChain; i++)
363 {
364 SpGistLeafTuple lt;
365
366 lt = (SpGistLeafTuple) PageGetItem(page,
367 PageGetItemId(page, chainSrc[i]));
368 Assert(lt->tupstate == SPGIST_LIVE);
369 lt->nextOffset = chainDest[i];
370 }
371
372 MarkBufferDirty(buffer);
373
374 if (RelationNeedsWAL(index))
375 {
376 XLogRecPtr recptr;
377
378 XLogBeginInsert();
379
380 STORE_STATE(&bds->spgstate, xlrec.stateSrc);
381
382 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf);
383 /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
384 XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead);
385 XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder);
386 XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove);
387 XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove);
388 XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain);
389 XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain);
390
391 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
392
393 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF);
394
395 PageSetLSN(page, recptr);
396 }
397
398 END_CRIT_SECTION();
399}
400
401/*
402 * Vacuum a root page when it is also a leaf
403 *
404 * On the root, we just delete any dead leaf tuples; no fancy business
405 */
406static void
407vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
408{
409 Page page = BufferGetPage(buffer);
410 spgxlogVacuumRoot xlrec;
411 OffsetNumber toDelete[MaxIndexTuplesPerPage];
412 OffsetNumber i,
413 max = PageGetMaxOffsetNumber(page);
414
415 xlrec.nDelete = 0;
416
417 /* Scan page, identify tuples to delete, accumulate stats */
418 for (i = FirstOffsetNumber; i <= max; i++)
419 {
420 SpGistLeafTuple lt;
421
422 lt = (SpGistLeafTuple) PageGetItem(page,
423 PageGetItemId(page, i));
424 if (lt->tupstate == SPGIST_LIVE)
425 {
426 Assert(ItemPointerIsValid(&lt->heapPtr));
427
428 if (bds->callback(&lt->heapPtr, bds->callback_state))
429 {
430 bds->stats->tuples_removed += 1;
431 toDelete[xlrec.nDelete] = i;
432 xlrec.nDelete++;
433 }
434 else
435 {
436 bds->stats->num_index_tuples += 1;
437 }
438 }
439 else
440 {
441 /* all tuples on root should be live */
442 elog(ERROR, "unexpected SPGiST tuple state: %d",
443 lt->tupstate);
444 }
445 }
446
447 if (xlrec.nDelete == 0)
448 return; /* nothing more to do */
449
450 /* Do the update */
451 START_CRIT_SECTION();
452
453 /* The tuple numbers are in order, so we can use PageIndexMultiDelete */
454 PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
455
456 MarkBufferDirty(buffer);
457
458 if (RelationNeedsWAL(index))
459 {
460 XLogRecPtr recptr;
461
462 XLogBeginInsert();
463
464 /* Prepare WAL record */
465 STORE_STATE(&bds->spgstate, xlrec.stateSrc);
466
467 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot);
468 /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
469 XLogRegisterData((char *) toDelete,
470 sizeof(OffsetNumber) * xlrec.nDelete);
471
472 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
473
474 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT);
475
476 PageSetLSN(page, recptr);
477 }
478
479 END_CRIT_SECTION();
480}
481
482/*
483 * Clean up redirect and placeholder tuples on the given page
484 *
485 * Redirect tuples can be marked placeholder once they're old enough.
486 * Placeholder tuples can be removed if it won't change the offsets of
487 * non-placeholder ones.
488 *
489 * Unlike the routines above, this works on both leaf and inner pages.
490 */
491static void
492vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
493{
494 Page page = BufferGetPage(buffer);
495 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
496 OffsetNumber i,
497 max = PageGetMaxOffsetNumber(page),
498 firstPlaceholder = InvalidOffsetNumber;
499 bool hasNonPlaceholder = false;
500 bool hasUpdate = false;
501 OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
502 OffsetNumber itemnos[MaxIndexTuplesPerPage];
503 spgxlogVacuumRedirect xlrec;
504
505 xlrec.nToPlaceholder = 0;
506 xlrec.newestRedirectXid = InvalidTransactionId;
507
508 START_CRIT_SECTION();
509
510 /*
511 * Scan backwards to convert old redirection tuples to placeholder tuples,
512 * and identify location of last non-placeholder tuple while at it.
513 */
514 for (i = max;
515 i >= FirstOffsetNumber &&
516 (opaque->nRedirection > 0 || !hasNonPlaceholder);
517 i--)
518 {
519 SpGistDeadTuple dt;
520
521 dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
522
523 if (dt->tupstate == SPGIST_REDIRECT &&
524 TransactionIdPrecedes(dt->xid, RecentGlobalXmin))
525 {
526 dt->tupstate = SPGIST_PLACEHOLDER;
527 Assert(opaque->nRedirection > 0);
528 opaque->nRedirection--;
529 opaque->nPlaceholder++;
530
531 /* remember newest XID among the removed redirects */
532 if (!TransactionIdIsValid(xlrec.newestRedirectXid) ||
533 TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid))
534 xlrec.newestRedirectXid = dt->xid;
535
536 ItemPointerSetInvalid(&dt->pointer);
537
538 itemToPlaceholder[xlrec.nToPlaceholder] = i;
539 xlrec.nToPlaceholder++;
540
541 hasUpdate = true;
542 }
543
544 if (dt->tupstate == SPGIST_PLACEHOLDER)
545 {
546 if (!hasNonPlaceholder)
547 firstPlaceholder = i;
548 }
549 else
550 {
551 hasNonPlaceholder = true;
552 }
553 }
554
555 /*
556 * Any placeholder tuples at the end of page can safely be removed. We
557 * can't remove ones before the last non-placeholder, though, because we
558 * can't alter the offset numbers of non-placeholder tuples.
559 */
560 if (firstPlaceholder != InvalidOffsetNumber)
561 {
562 /*
563 * We do not store this array to rdata because it's easy to recreate.
564 */
565 for (i = firstPlaceholder; i <= max; i++)
566 itemnos[i - firstPlaceholder] = i;
567
568 i = max - firstPlaceholder + 1;
569 Assert(opaque->nPlaceholder >= i);
570 opaque->nPlaceholder -= i;
571
572 /* The array is surely sorted, so can use PageIndexMultiDelete */
573 PageIndexMultiDelete(page, itemnos, i);
574
575 hasUpdate = true;
576 }
577
578 xlrec.firstPlaceholder = firstPlaceholder;
579
580 if (hasUpdate)
581 MarkBufferDirty(buffer);
582
583 if (hasUpdate && RelationNeedsWAL(index))
584 {
585 XLogRecPtr recptr;
586
587 XLogBeginInsert();
588
589 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect);
590 XLogRegisterData((char *) itemToPlaceholder,
591 sizeof(OffsetNumber) * xlrec.nToPlaceholder);
592
593 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
594
595 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT);
596
597 PageSetLSN(page, recptr);
598 }
599
600 END_CRIT_SECTION();
601}
602
603/*
604 * Process one page during a bulkdelete scan
605 */
606static void
607spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
608{
609 Relation index = bds->info->index;
610 Buffer buffer;
611 Page page;
612
613 /* call vacuum_delay_point while not holding any buffer lock */
614 vacuum_delay_point();
615
616 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
617 RBM_NORMAL, bds->info->strategy);
618 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
619 page = (Page) BufferGetPage(buffer);
620
621 if (PageIsNew(page))
622 {
623 /*
624 * We found an all-zero page, which could happen if the database
625 * crashed just after extending the file. Recycle it.
626 */
627 }
628 else if (PageIsEmpty(page))
629 {
630 /* nothing to do */
631 }
632 else if (SpGistPageIsLeaf(page))
633 {
634 if (SpGistBlockIsRoot(blkno))
635 {
636 vacuumLeafRoot(bds, index, buffer);
637 /* no need for vacuumRedirectAndPlaceholder */
638 }
639 else
640 {
641 vacuumLeafPage(bds, index, buffer, false);
642 vacuumRedirectAndPlaceholder(index, buffer);
643 }
644 }
645 else
646 {
647 /* inner page */
648 vacuumRedirectAndPlaceholder(index, buffer);
649 }
650
651 /*
652 * The root pages must never be deleted, nor marked as available in FSM,
653 * because we don't want them ever returned by a search for a place to put
654 * a new tuple. Otherwise, check for empty page, and make sure the FSM
655 * knows about it.
656 */
657 if (!SpGistBlockIsRoot(blkno))
658 {
659 if (PageIsNew(page) || PageIsEmpty(page))
660 {
661 RecordFreeIndexPage(index, blkno);
662 bds->stats->pages_deleted++;
663 }
664 else
665 {
666 SpGistSetLastUsedPage(index, buffer);
667 bds->lastFilledBlock = blkno;
668 }
669 }
670
671 UnlockReleaseBuffer(buffer);
672}
673
674/*
675 * Process the pending-TID list between pages of the main scan
676 */
677static void
678spgprocesspending(spgBulkDeleteState *bds)
679{
680 Relation index = bds->info->index;
681 spgVacPendingItem *pitem;
682 spgVacPendingItem *nitem;
683 BlockNumber blkno;
684 Buffer buffer;
685 Page page;
686
687 for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
688 {
689 if (pitem->done)
690 continue; /* ignore already-done items */
691
692 /* call vacuum_delay_point while not holding any buffer lock */
693 vacuum_delay_point();
694
695 /* examine the referenced page */
696 blkno = ItemPointerGetBlockNumber(&pitem->tid);
697 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
698 RBM_NORMAL, bds->info->strategy);
699 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
700 page = (Page) BufferGetPage(buffer);
701
702 if (PageIsNew(page) || SpGistPageIsDeleted(page))
703 {
704 /* Probably shouldn't happen, but ignore it */
705 }
706 else if (SpGistPageIsLeaf(page))
707 {
708 if (SpGistBlockIsRoot(blkno))
709 {
710 /* this should definitely not happen */
711 elog(ERROR, "redirection leads to root page of index \"%s\"",
712 RelationGetRelationName(index));
713 }
714
715 /* deal with any deletable tuples */
716 vacuumLeafPage(bds, index, buffer, true);
717 /* might as well do this while we are here */
718 vacuumRedirectAndPlaceholder(index, buffer);
719
720 SpGistSetLastUsedPage(index, buffer);
721
722 /*
723 * We can mark as done not only this item, but any later ones
724 * pointing at the same page, since we vacuumed the whole page.
725 */
726 pitem->done = true;
727 for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
728 {
729 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
730 nitem->done = true;
731 }
732 }
733 else
734 {
735 /*
736 * On an inner page, visit the referenced inner tuple and add all
737 * its downlinks to the pending list. We might have pending items
738 * for more than one inner tuple on the same page (in fact this is
739 * pretty likely given the way space allocation works), so get
740 * them all while we are here.
741 */
742 for (nitem = pitem; nitem != NULL; nitem = nitem->next)
743 {
744 if (nitem->done)
745 continue;
746 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
747 {
748 OffsetNumber offset;
749 SpGistInnerTuple innerTuple;
750
751 offset = ItemPointerGetOffsetNumber(&nitem->tid);
752 innerTuple = (SpGistInnerTuple) PageGetItem(page,
753 PageGetItemId(page, offset));
754 if (innerTuple->tupstate == SPGIST_LIVE)
755 {
756 SpGistNodeTuple node;
757 int i;
758
759 SGITITERATE(innerTuple, i, node)
760 {
761 if (ItemPointerIsValid(&node->t_tid))
762 spgAddPendingTID(bds, &node->t_tid);
763 }
764 }
765 else if (innerTuple->tupstate == SPGIST_REDIRECT)
766 {
767 /* transfer attention to redirect point */
768 spgAddPendingTID(bds,
769 &((SpGistDeadTuple) innerTuple)->pointer);
770 }
771 else
772 elog(ERROR, "unexpected SPGiST tuple state: %d",
773 innerTuple->tupstate);
774
775 nitem->done = true;
776 }
777 }
778 }
779
780 UnlockReleaseBuffer(buffer);
781 }
782
783 spgClearPendingList(bds);
784}
785
786/*
787 * Perform a bulkdelete scan
788 */
789static void
790spgvacuumscan(spgBulkDeleteState *bds)
791{
792 Relation index = bds->info->index;
793 bool needLock;
794 BlockNumber num_pages,
795 blkno;
796
797 /* Finish setting up spgBulkDeleteState */
798 initSpGistState(&bds->spgstate, index);
799 bds->pendingList = NULL;
800 bds->myXmin = GetActiveSnapshot()->xmin;
801 bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
802
803 /*
804 * Reset counts that will be incremented during the scan; needed in case
805 * of multiple scans during a single VACUUM command
806 */
807 bds->stats->estimated_count = false;
808 bds->stats->num_index_tuples = 0;
809 bds->stats->pages_deleted = 0;
810
811 /* We can skip locking for new or temp relations */
812 needLock = !RELATION_IS_LOCAL(index);
813
814 /*
815 * The outer loop iterates over all index pages except the metapage, in
816 * physical order (we hope the kernel will cooperate in providing
817 * read-ahead for speed). It is critical that we visit all leaf pages,
818 * including ones added after we start the scan, else we might fail to
819 * delete some deletable tuples. See more extensive comments about this
820 * in btvacuumscan().
821 */
822 blkno = SPGIST_METAPAGE_BLKNO + 1;
823 for (;;)
824 {
825 /* Get the current relation length */
826 if (needLock)
827 LockRelationForExtension(index, ExclusiveLock);
828 num_pages = RelationGetNumberOfBlocks(index);
829 if (needLock)
830 UnlockRelationForExtension(index, ExclusiveLock);
831
832 /* Quit if we've scanned the whole relation */
833 if (blkno >= num_pages)
834 break;
835 /* Iterate over pages, then loop back to recheck length */
836 for (; blkno < num_pages; blkno++)
837 {
838 spgvacuumpage(bds, blkno);
839 /* empty the pending-list after each page */
840 if (bds->pendingList != NULL)
841 spgprocesspending(bds);
842 }
843 }
844
845 /* Propagate local lastUsedPage cache to metablock */
846 SpGistUpdateMetaPage(index);
847
848 /*
849 * If we found any empty pages (and recorded them in the FSM), then
850 * forcibly update the upper-level FSM pages to ensure that searchers can
851 * find them. It's possible that the pages were also found during
852 * previous scans and so this is a waste of time, but it's cheap enough
853 * relative to scanning the index that it shouldn't matter much, and
854 * making sure that free pages are available sooner not later seems
855 * worthwhile.
856 *
857 * Note that if no empty pages exist, we don't bother vacuuming the FSM at
858 * all.
859 */
860 if (bds->stats->pages_deleted > 0)
861 IndexFreeSpaceMapVacuum(index);
862
863 /*
864 * Truncate index if possible
865 *
866 * XXX disabled because it's unsafe due to possible concurrent inserts.
867 * We'd have to rescan the pages to make sure they're still empty, and it
868 * doesn't seem worth it. Note that btree doesn't do this either.
869 *
870 * Another reason not to truncate is that it could invalidate the cached
871 * pages-with-freespace pointers in the metapage and other backends'
872 * relation caches, that is leave them pointing to nonexistent pages.
873 * Adding RelationGetNumberOfBlocks calls to protect the places that use
874 * those pointers would be unduly expensive.
875 */
876#ifdef NOT_USED
877 if (num_pages > bds->lastFilledBlock + 1)
878 {
879 BlockNumber lastBlock = num_pages - 1;
880
881 num_pages = bds->lastFilledBlock + 1;
882 RelationTruncate(index, num_pages);
883 bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
884 bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
885 }
886#endif
887
888 /* Report final stats */
889 bds->stats->num_pages = num_pages;
890 bds->stats->pages_free = bds->stats->pages_deleted;
891}
892
893/*
894 * Bulk deletion of all index entries pointing to a set of heap tuples.
895 * The set of target tuples is specified via a callback routine that tells
896 * whether any given heap tuple (identified by ItemPointer) is being deleted.
897 *
898 * Result: a palloc'd struct containing statistical info for VACUUM displays.
899 */
900IndexBulkDeleteResult *
901spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
902 IndexBulkDeleteCallback callback, void *callback_state)
903{
904 spgBulkDeleteState bds;
905
906 /* allocate stats if first time through, else re-use existing struct */
907 if (stats == NULL)
908 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
909 bds.info = info;
910 bds.stats = stats;
911 bds.callback = callback;
912 bds.callback_state = callback_state;
913
914 spgvacuumscan(&bds);
915
916 return stats;
917}
918
919/* Dummy callback to delete no tuples during spgvacuumcleanup */
920static bool
921dummy_callback(ItemPointer itemptr, void *state)
922{
923 return false;
924}
925
926/*
927 * Post-VACUUM cleanup.
928 *
929 * Result: a palloc'd struct containing statistical info for VACUUM displays.
930 */
931IndexBulkDeleteResult *
932spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
933{
934 spgBulkDeleteState bds;
935
936 /* No-op in ANALYZE ONLY mode */
937 if (info->analyze_only)
938 return stats;
939
940 /*
941 * We don't need to scan the index if there was a preceding bulkdelete
942 * pass. Otherwise, make a pass that won't delete any live tuples, but
943 * might still accomplish useful stuff with redirect/placeholder cleanup
944 * and/or FSM housekeeping, and in any case will provide stats.
945 */
946 if (stats == NULL)
947 {
948 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
949 bds.info = info;
950 bds.stats = stats;
951 bds.callback = dummy_callback;
952 bds.callback_state = NULL;
953
954 spgvacuumscan(&bds);
955 }
956
957 /*
958 * It's quite possible for us to be fooled by concurrent tuple moves into
959 * double-counting some index tuples, so disbelieve any total that exceeds
960 * the underlying heap's count ... if we know that accurately. Otherwise
961 * this might just make matters worse.
962 */
963 if (!info->estimated_count)
964 {
965 if (stats->num_index_tuples > info->num_heap_tuples)
966 stats->num_index_tuples = info->num_heap_tuples;
967 }
968
969 return stats;
970}
971