1/*-------------------------------------------------------------------------
2 *
3 * bufpage.c
4 * POSTGRES standard buffer page code.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/page/bufpage.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/htup_details.h"
18#include "access/itup.h"
19#include "access/xlog.h"
20#include "pgstat.h"
21#include "storage/checksum.h"
22#include "utils/memdebug.h"
23#include "utils/memutils.h"
24
25
26/* GUC variable */
27bool ignore_checksum_failure = false;
28
29
30/* ----------------------------------------------------------------
31 * Page support functions
32 * ----------------------------------------------------------------
33 */
34
35/*
36 * PageInit
37 * Initializes the contents of a page.
38 * Note that we don't calculate an initial checksum here; that's not done
39 * until it's time to write.
40 */
41void
42PageInit(Page page, Size pageSize, Size specialSize)
43{
44 PageHeader p = (PageHeader) page;
45
46 specialSize = MAXALIGN(specialSize);
47
48 Assert(pageSize == BLCKSZ);
49 Assert(pageSize > specialSize + SizeOfPageHeaderData);
50
51 /* Make sure all fields of page are zero, as well as unused space */
52 MemSet(p, 0, pageSize);
53
54 p->pd_flags = 0;
55 p->pd_lower = SizeOfPageHeaderData;
56 p->pd_upper = pageSize - specialSize;
57 p->pd_special = pageSize - specialSize;
58 PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
59 /* p->pd_prune_xid = InvalidTransactionId; done by above MemSet */
60}
61
62
63/*
64 * PageIsVerified
65 * Check that the page header and checksum (if any) appear valid.
66 *
67 * This is called when a page has just been read in from disk. The idea is
68 * to cheaply detect trashed pages before we go nuts following bogus line
69 * pointers, testing invalid transaction identifiers, etc.
70 *
71 * It turns out to be necessary to allow zeroed pages here too. Even though
72 * this routine is *not* called when deliberately adding a page to a relation,
73 * there are scenarios in which a zeroed page might be found in a table.
74 * (Example: a backend extends a relation, then crashes before it can write
75 * any WAL entry about the new page. The kernel will already have the
76 * zeroed page in the file, and it will stay that way after restart.) So we
77 * allow zeroed pages here, and are careful that the page access macros
78 * treat such a page as empty and without free space. Eventually, VACUUM
79 * will clean up such a page and make it usable.
80 */
81bool
82PageIsVerified(Page page, BlockNumber blkno)
83{
84 PageHeader p = (PageHeader) page;
85 size_t *pagebytes;
86 int i;
87 bool checksum_failure = false;
88 bool header_sane = false;
89 bool all_zeroes = false;
90 uint16 checksum = 0;
91
92 /*
93 * Don't verify page data unless the page passes basic non-zero test
94 */
95 if (!PageIsNew(page))
96 {
97 if (DataChecksumsEnabled())
98 {
99 checksum = pg_checksum_page((char *) page, blkno);
100
101 if (checksum != p->pd_checksum)
102 checksum_failure = true;
103 }
104
105 /*
106 * The following checks don't prove the header is correct, only that
107 * it looks sane enough to allow into the buffer pool. Later usage of
108 * the block can still reveal problems, which is why we offer the
109 * checksum option.
110 */
111 if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
112 p->pd_lower <= p->pd_upper &&
113 p->pd_upper <= p->pd_special &&
114 p->pd_special <= BLCKSZ &&
115 p->pd_special == MAXALIGN(p->pd_special))
116 header_sane = true;
117
118 if (header_sane && !checksum_failure)
119 return true;
120 }
121
122 /*
123 * Check all-zeroes case. Luckily BLCKSZ is guaranteed to always be a
124 * multiple of size_t - and it's much faster to compare memory using the
125 * native word size.
126 */
127 StaticAssertStmt(BLCKSZ == (BLCKSZ / sizeof(size_t)) * sizeof(size_t),
128 "BLCKSZ has to be a multiple of sizeof(size_t)");
129
130 all_zeroes = true;
131 pagebytes = (size_t *) page;
132 for (i = 0; i < (BLCKSZ / sizeof(size_t)); i++)
133 {
134 if (pagebytes[i] != 0)
135 {
136 all_zeroes = false;
137 break;
138 }
139 }
140
141 if (all_zeroes)
142 return true;
143
144 /*
145 * Throw a WARNING if the checksum fails, but only after we've checked for
146 * the all-zeroes case.
147 */
148 if (checksum_failure)
149 {
150 ereport(WARNING,
151 (ERRCODE_DATA_CORRUPTED,
152 errmsg("page verification failed, calculated checksum %u but expected %u",
153 checksum, p->pd_checksum)));
154
155 pgstat_report_checksum_failure();
156
157 if (header_sane && ignore_checksum_failure)
158 return true;
159 }
160
161 return false;
162}
163
164
165/*
166 * PageAddItemExtended
167 *
168 * Add an item to a page. Return value is the offset at which it was
169 * inserted, or InvalidOffsetNumber if the item is not inserted for any
170 * reason. A WARNING is issued indicating the reason for the refusal.
171 *
172 * offsetNumber must be either InvalidOffsetNumber to specify finding a
173 * free line pointer, or a value between FirstOffsetNumber and one past
174 * the last existing item, to specify using that particular line pointer.
175 *
176 * If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store
177 * the item at the specified offsetNumber, which must be either a
178 * currently-unused line pointer, or one past the last existing item.
179 *
180 * If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert
181 * the item at the specified offsetNumber, moving existing items later
182 * in the array to make room.
183 *
184 * If offsetNumber is not valid, then assign a slot by finding the first
185 * one that is both unused and deallocated.
186 *
187 * If flag PAI_IS_HEAP is set, we enforce that there can't be more than
188 * MaxHeapTuplesPerPage line pointers on the page.
189 *
190 * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!
191 */
192OffsetNumber
193PageAddItemExtended(Page page,
194 Item item,
195 Size size,
196 OffsetNumber offsetNumber,
197 int flags)
198{
199 PageHeader phdr = (PageHeader) page;
200 Size alignedSize;
201 int lower;
202 int upper;
203 ItemId itemId;
204 OffsetNumber limit;
205 bool needshuffle = false;
206
207 /*
208 * Be wary about corrupted page pointers
209 */
210 if (phdr->pd_lower < SizeOfPageHeaderData ||
211 phdr->pd_lower > phdr->pd_upper ||
212 phdr->pd_upper > phdr->pd_special ||
213 phdr->pd_special > BLCKSZ)
214 ereport(PANIC,
215 (errcode(ERRCODE_DATA_CORRUPTED),
216 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
217 phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
218
219 /*
220 * Select offsetNumber to place the new item at
221 */
222 limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
223
224 /* was offsetNumber passed in? */
225 if (OffsetNumberIsValid(offsetNumber))
226 {
227 /* yes, check it */
228 if ((flags & PAI_OVERWRITE) != 0)
229 {
230 if (offsetNumber < limit)
231 {
232 itemId = PageGetItemId(phdr, offsetNumber);
233 if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
234 {
235 elog(WARNING, "will not overwrite a used ItemId");
236 return InvalidOffsetNumber;
237 }
238 }
239 }
240 else
241 {
242 if (offsetNumber < limit)
243 needshuffle = true; /* need to move existing linp's */
244 }
245 }
246 else
247 {
248 /* offsetNumber was not passed in, so find a free slot */
249 /* if no free slot, we'll put it at limit (1st open slot) */
250 if (PageHasFreeLinePointers(phdr))
251 {
252 /*
253 * Look for "recyclable" (unused) ItemId. We check for no storage
254 * as well, just to be paranoid --- unused items should never have
255 * storage.
256 */
257 for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
258 {
259 itemId = PageGetItemId(phdr, offsetNumber);
260 if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
261 break;
262 }
263 if (offsetNumber >= limit)
264 {
265 /* the hint is wrong, so reset it */
266 PageClearHasFreeLinePointers(phdr);
267 }
268 }
269 else
270 {
271 /* don't bother searching if hint says there's no free slot */
272 offsetNumber = limit;
273 }
274 }
275
276 /* Reject placing items beyond the first unused line pointer */
277 if (offsetNumber > limit)
278 {
279 elog(WARNING, "specified item offset is too large");
280 return InvalidOffsetNumber;
281 }
282
283 /* Reject placing items beyond heap boundary, if heap */
284 if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
285 {
286 elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
287 return InvalidOffsetNumber;
288 }
289
290 /*
291 * Compute new lower and upper pointers for page, see if it'll fit.
292 *
293 * Note: do arithmetic as signed ints, to avoid mistakes if, say,
294 * alignedSize > pd_upper.
295 */
296 if (offsetNumber == limit || needshuffle)
297 lower = phdr->pd_lower + sizeof(ItemIdData);
298 else
299 lower = phdr->pd_lower;
300
301 alignedSize = MAXALIGN(size);
302
303 upper = (int) phdr->pd_upper - (int) alignedSize;
304
305 if (lower > upper)
306 return InvalidOffsetNumber;
307
308 /*
309 * OK to insert the item. First, shuffle the existing pointers if needed.
310 */
311 itemId = PageGetItemId(phdr, offsetNumber);
312
313 if (needshuffle)
314 memmove(itemId + 1, itemId,
315 (limit - offsetNumber) * sizeof(ItemIdData));
316
317 /* set the line pointer */
318 ItemIdSetNormal(itemId, upper, size);
319
320 /*
321 * Items normally contain no uninitialized bytes. Core bufpage consumers
322 * conform, but this is not a necessary coding rule; a new index AM could
323 * opt to depart from it. However, data type input functions and other
324 * C-language functions that synthesize datums should initialize all
325 * bytes; datumIsEqual() relies on this. Testing here, along with the
326 * similar check in printtup(), helps to catch such mistakes.
327 *
328 * Values of the "name" type retrieved via index-only scans may contain
329 * uninitialized bytes; see comment in btrescan(). Valgrind will report
330 * this as an error, but it is safe to ignore.
331 */
332 VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
333
334 /* copy the item's data onto the page */
335 memcpy((char *) page + upper, item, size);
336
337 /* adjust page header */
338 phdr->pd_lower = (LocationIndex) lower;
339 phdr->pd_upper = (LocationIndex) upper;
340
341 return offsetNumber;
342}
343
344
345/*
346 * PageGetTempPage
347 * Get a temporary page in local memory for special processing.
348 * The returned page is not initialized at all; caller must do that.
349 */
350Page
351PageGetTempPage(Page page)
352{
353 Size pageSize;
354 Page temp;
355
356 pageSize = PageGetPageSize(page);
357 temp = (Page) palloc(pageSize);
358
359 return temp;
360}
361
362/*
363 * PageGetTempPageCopy
364 * Get a temporary page in local memory for special processing.
365 * The page is initialized by copying the contents of the given page.
366 */
367Page
368PageGetTempPageCopy(Page page)
369{
370 Size pageSize;
371 Page temp;
372
373 pageSize = PageGetPageSize(page);
374 temp = (Page) palloc(pageSize);
375
376 memcpy(temp, page, pageSize);
377
378 return temp;
379}
380
381/*
382 * PageGetTempPageCopySpecial
383 * Get a temporary page in local memory for special processing.
384 * The page is PageInit'd with the same special-space size as the
385 * given page, and the special space is copied from the given page.
386 */
387Page
388PageGetTempPageCopySpecial(Page page)
389{
390 Size pageSize;
391 Page temp;
392
393 pageSize = PageGetPageSize(page);
394 temp = (Page) palloc(pageSize);
395
396 PageInit(temp, pageSize, PageGetSpecialSize(page));
397 memcpy(PageGetSpecialPointer(temp),
398 PageGetSpecialPointer(page),
399 PageGetSpecialSize(page));
400
401 return temp;
402}
403
404/*
405 * PageRestoreTempPage
406 * Copy temporary page back to permanent page after special processing
407 * and release the temporary page.
408 */
409void
410PageRestoreTempPage(Page tempPage, Page oldPage)
411{
412 Size pageSize;
413
414 pageSize = PageGetPageSize(tempPage);
415 memcpy((char *) oldPage, (char *) tempPage, pageSize);
416
417 pfree(tempPage);
418}
419
420/*
421 * sorting support for PageRepairFragmentation and PageIndexMultiDelete
422 */
423typedef struct itemIdSortData
424{
425 uint16 offsetindex; /* linp array index */
426 int16 itemoff; /* page offset of item data */
427 uint16 alignedlen; /* MAXALIGN(item data len) */
428} itemIdSortData;
429typedef itemIdSortData *itemIdSort;
430
431static int
432itemoffcompare(const void *itemidp1, const void *itemidp2)
433{
434 /* Sort in decreasing itemoff order */
435 return ((itemIdSort) itemidp2)->itemoff -
436 ((itemIdSort) itemidp1)->itemoff;
437}
438
439/*
440 * After removing or marking some line pointers unused, move the tuples to
441 * remove the gaps caused by the removed items.
442 */
443static void
444compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
445{
446 PageHeader phdr = (PageHeader) page;
447 Offset upper;
448 int i;
449
450 /* sort itemIdSortData array into decreasing itemoff order */
451 qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
452 itemoffcompare);
453
454 upper = phdr->pd_special;
455 for (i = 0; i < nitems; i++)
456 {
457 itemIdSort itemidptr = &itemidbase[i];
458 ItemId lp;
459
460 lp = PageGetItemId(page, itemidptr->offsetindex + 1);
461 upper -= itemidptr->alignedlen;
462 memmove((char *) page + upper,
463 (char *) page + itemidptr->itemoff,
464 itemidptr->alignedlen);
465 lp->lp_off = upper;
466 }
467
468 phdr->pd_upper = upper;
469}
470
471/*
472 * PageRepairFragmentation
473 *
474 * Frees fragmented space on a page.
475 * It doesn't remove unused line pointers! Please don't change this.
476 *
477 * This routine is usable for heap pages only, but see PageIndexMultiDelete.
478 *
479 * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
480 */
481void
482PageRepairFragmentation(Page page)
483{
484 Offset pd_lower = ((PageHeader) page)->pd_lower;
485 Offset pd_upper = ((PageHeader) page)->pd_upper;
486 Offset pd_special = ((PageHeader) page)->pd_special;
487 itemIdSortData itemidbase[MaxHeapTuplesPerPage];
488 itemIdSort itemidptr;
489 ItemId lp;
490 int nline,
491 nstorage,
492 nunused;
493 int i;
494 Size totallen;
495
496 /*
497 * It's worth the trouble to be more paranoid here than in most places,
498 * because we are about to reshuffle data in (what is usually) a shared
499 * disk buffer. If we aren't careful then corrupted pointers, lengths,
500 * etc could cause us to clobber adjacent disk buffers, spreading the data
501 * loss further. So, check everything.
502 */
503 if (pd_lower < SizeOfPageHeaderData ||
504 pd_lower > pd_upper ||
505 pd_upper > pd_special ||
506 pd_special > BLCKSZ ||
507 pd_special != MAXALIGN(pd_special))
508 ereport(ERROR,
509 (errcode(ERRCODE_DATA_CORRUPTED),
510 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
511 pd_lower, pd_upper, pd_special)));
512
513 /*
514 * Run through the line pointer array and collect data about live items.
515 */
516 nline = PageGetMaxOffsetNumber(page);
517 itemidptr = itemidbase;
518 nunused = totallen = 0;
519 for (i = FirstOffsetNumber; i <= nline; i++)
520 {
521 lp = PageGetItemId(page, i);
522 if (ItemIdIsUsed(lp))
523 {
524 if (ItemIdHasStorage(lp))
525 {
526 itemidptr->offsetindex = i - 1;
527 itemidptr->itemoff = ItemIdGetOffset(lp);
528 if (unlikely(itemidptr->itemoff < (int) pd_upper ||
529 itemidptr->itemoff >= (int) pd_special))
530 ereport(ERROR,
531 (errcode(ERRCODE_DATA_CORRUPTED),
532 errmsg("corrupted line pointer: %u",
533 itemidptr->itemoff)));
534 itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
535 totallen += itemidptr->alignedlen;
536 itemidptr++;
537 }
538 }
539 else
540 {
541 /* Unused entries should have lp_len = 0, but make sure */
542 ItemIdSetUnused(lp);
543 nunused++;
544 }
545 }
546
547 nstorage = itemidptr - itemidbase;
548 if (nstorage == 0)
549 {
550 /* Page is completely empty, so just reset it quickly */
551 ((PageHeader) page)->pd_upper = pd_special;
552 }
553 else
554 {
555 /* Need to compact the page the hard way */
556 if (totallen > (Size) (pd_special - pd_lower))
557 ereport(ERROR,
558 (errcode(ERRCODE_DATA_CORRUPTED),
559 errmsg("corrupted item lengths: total %u, available space %u",
560 (unsigned int) totallen, pd_special - pd_lower)));
561
562 compactify_tuples(itemidbase, nstorage, page);
563 }
564
565 /* Set hint bit for PageAddItem */
566 if (nunused > 0)
567 PageSetHasFreeLinePointers(page);
568 else
569 PageClearHasFreeLinePointers(page);
570}
571
572/*
573 * PageGetFreeSpace
574 * Returns the size of the free (allocatable) space on a page,
575 * reduced by the space needed for a new line pointer.
576 *
577 * Note: this should usually only be used on index pages. Use
578 * PageGetHeapFreeSpace on heap pages.
579 */
580Size
581PageGetFreeSpace(Page page)
582{
583 int space;
584
585 /*
586 * Use signed arithmetic here so that we behave sensibly if pd_lower >
587 * pd_upper.
588 */
589 space = (int) ((PageHeader) page)->pd_upper -
590 (int) ((PageHeader) page)->pd_lower;
591
592 if (space < (int) sizeof(ItemIdData))
593 return 0;
594 space -= sizeof(ItemIdData);
595
596 return (Size) space;
597}
598
599/*
600 * PageGetFreeSpaceForMultipleTuples
601 * Returns the size of the free (allocatable) space on a page,
602 * reduced by the space needed for multiple new line pointers.
603 *
604 * Note: this should usually only be used on index pages. Use
605 * PageGetHeapFreeSpace on heap pages.
606 */
607Size
608PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
609{
610 int space;
611
612 /*
613 * Use signed arithmetic here so that we behave sensibly if pd_lower >
614 * pd_upper.
615 */
616 space = (int) ((PageHeader) page)->pd_upper -
617 (int) ((PageHeader) page)->pd_lower;
618
619 if (space < (int) (ntups * sizeof(ItemIdData)))
620 return 0;
621 space -= ntups * sizeof(ItemIdData);
622
623 return (Size) space;
624}
625
626/*
627 * PageGetExactFreeSpace
628 * Returns the size of the free (allocatable) space on a page,
629 * without any consideration for adding/removing line pointers.
630 */
631Size
632PageGetExactFreeSpace(Page page)
633{
634 int space;
635
636 /*
637 * Use signed arithmetic here so that we behave sensibly if pd_lower >
638 * pd_upper.
639 */
640 space = (int) ((PageHeader) page)->pd_upper -
641 (int) ((PageHeader) page)->pd_lower;
642
643 if (space < 0)
644 return 0;
645
646 return (Size) space;
647}
648
649
650/*
651 * PageGetHeapFreeSpace
652 * Returns the size of the free (allocatable) space on a page,
653 * reduced by the space needed for a new line pointer.
654 *
655 * The difference between this and PageGetFreeSpace is that this will return
656 * zero if there are already MaxHeapTuplesPerPage line pointers in the page
657 * and none are free. We use this to enforce that no more than
658 * MaxHeapTuplesPerPage line pointers are created on a heap page. (Although
659 * no more tuples than that could fit anyway, in the presence of redirected
660 * or dead line pointers it'd be possible to have too many line pointers.
661 * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
662 * on the number of line pointers, we make this extra check.)
663 */
664Size
665PageGetHeapFreeSpace(Page page)
666{
667 Size space;
668
669 space = PageGetFreeSpace(page);
670 if (space > 0)
671 {
672 OffsetNumber offnum,
673 nline;
674
675 /*
676 * Are there already MaxHeapTuplesPerPage line pointers in the page?
677 */
678 nline = PageGetMaxOffsetNumber(page);
679 if (nline >= MaxHeapTuplesPerPage)
680 {
681 if (PageHasFreeLinePointers((PageHeader) page))
682 {
683 /*
684 * Since this is just a hint, we must confirm that there is
685 * indeed a free line pointer
686 */
687 for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
688 {
689 ItemId lp = PageGetItemId(page, offnum);
690
691 if (!ItemIdIsUsed(lp))
692 break;
693 }
694
695 if (offnum > nline)
696 {
697 /*
698 * The hint is wrong, but we can't clear it here since we
699 * don't have the ability to mark the page dirty.
700 */
701 space = 0;
702 }
703 }
704 else
705 {
706 /*
707 * Although the hint might be wrong, PageAddItem will believe
708 * it anyway, so we must believe it too.
709 */
710 space = 0;
711 }
712 }
713 }
714 return space;
715}
716
717
718/*
719 * PageIndexTupleDelete
720 *
721 * This routine does the work of removing a tuple from an index page.
722 *
723 * Unlike heap pages, we compact out the line pointer for the removed tuple.
724 */
725void
726PageIndexTupleDelete(Page page, OffsetNumber offnum)
727{
728 PageHeader phdr = (PageHeader) page;
729 char *addr;
730 ItemId tup;
731 Size size;
732 unsigned offset;
733 int nbytes;
734 int offidx;
735 int nline;
736
737 /*
738 * As with PageRepairFragmentation, paranoia seems justified.
739 */
740 if (phdr->pd_lower < SizeOfPageHeaderData ||
741 phdr->pd_lower > phdr->pd_upper ||
742 phdr->pd_upper > phdr->pd_special ||
743 phdr->pd_special > BLCKSZ ||
744 phdr->pd_special != MAXALIGN(phdr->pd_special))
745 ereport(ERROR,
746 (errcode(ERRCODE_DATA_CORRUPTED),
747 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
748 phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
749
750 nline = PageGetMaxOffsetNumber(page);
751 if ((int) offnum <= 0 || (int) offnum > nline)
752 elog(ERROR, "invalid index offnum: %u", offnum);
753
754 /* change offset number to offset index */
755 offidx = offnum - 1;
756
757 tup = PageGetItemId(page, offnum);
758 Assert(ItemIdHasStorage(tup));
759 size = ItemIdGetLength(tup);
760 offset = ItemIdGetOffset(tup);
761
762 if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
763 offset != MAXALIGN(offset))
764 ereport(ERROR,
765 (errcode(ERRCODE_DATA_CORRUPTED),
766 errmsg("corrupted line pointer: offset = %u, size = %u",
767 offset, (unsigned int) size)));
768
769 /* Amount of space to actually be deleted */
770 size = MAXALIGN(size);
771
772 /*
773 * First, we want to get rid of the pd_linp entry for the index tuple. We
774 * copy all subsequent linp's back one slot in the array. We don't use
775 * PageGetItemId, because we are manipulating the _array_, not individual
776 * linp's.
777 */
778 nbytes = phdr->pd_lower -
779 ((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
780
781 if (nbytes > 0)
782 memmove((char *) &(phdr->pd_linp[offidx]),
783 (char *) &(phdr->pd_linp[offidx + 1]),
784 nbytes);
785
786 /*
787 * Now move everything between the old upper bound (beginning of tuple
788 * space) and the beginning of the deleted tuple forward, so that space in
789 * the middle of the page is left free. If we've just deleted the tuple
790 * at the beginning of tuple space, then there's no need to do the copy.
791 */
792
793 /* beginning of tuple space */
794 addr = (char *) page + phdr->pd_upper;
795
796 if (offset > phdr->pd_upper)
797 memmove(addr + size, addr, offset - phdr->pd_upper);
798
799 /* adjust free space boundary pointers */
800 phdr->pd_upper += size;
801 phdr->pd_lower -= sizeof(ItemIdData);
802
803 /*
804 * Finally, we need to adjust the linp entries that remain.
805 *
806 * Anything that used to be before the deleted tuple's data was moved
807 * forward by the size of the deleted tuple.
808 */
809 if (!PageIsEmpty(page))
810 {
811 int i;
812
813 nline--; /* there's one less than when we started */
814 for (i = 1; i <= nline; i++)
815 {
816 ItemId ii = PageGetItemId(phdr, i);
817
818 Assert(ItemIdHasStorage(ii));
819 if (ItemIdGetOffset(ii) <= offset)
820 ii->lp_off += size;
821 }
822 }
823}
824
825
826/*
827 * PageIndexMultiDelete
828 *
829 * This routine handles the case of deleting multiple tuples from an
830 * index page at once. It is considerably faster than a loop around
831 * PageIndexTupleDelete ... however, the caller *must* supply the array
832 * of item numbers to be deleted in item number order!
833 */
834void
835PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
836{
837 PageHeader phdr = (PageHeader) page;
838 Offset pd_lower = phdr->pd_lower;
839 Offset pd_upper = phdr->pd_upper;
840 Offset pd_special = phdr->pd_special;
841 itemIdSortData itemidbase[MaxIndexTuplesPerPage];
842 ItemIdData newitemids[MaxIndexTuplesPerPage];
843 itemIdSort itemidptr;
844 ItemId lp;
845 int nline,
846 nused;
847 Size totallen;
848 Size size;
849 unsigned offset;
850 int nextitm;
851 OffsetNumber offnum;
852
853 Assert(nitems <= MaxIndexTuplesPerPage);
854
855 /*
856 * If there aren't very many items to delete, then retail
857 * PageIndexTupleDelete is the best way. Delete the items in reverse
858 * order so we don't have to think about adjusting item numbers for
859 * previous deletions.
860 *
861 * TODO: tune the magic number here
862 */
863 if (nitems <= 2)
864 {
865 while (--nitems >= 0)
866 PageIndexTupleDelete(page, itemnos[nitems]);
867 return;
868 }
869
870 /*
871 * As with PageRepairFragmentation, paranoia seems justified.
872 */
873 if (pd_lower < SizeOfPageHeaderData ||
874 pd_lower > pd_upper ||
875 pd_upper > pd_special ||
876 pd_special > BLCKSZ ||
877 pd_special != MAXALIGN(pd_special))
878 ereport(ERROR,
879 (errcode(ERRCODE_DATA_CORRUPTED),
880 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
881 pd_lower, pd_upper, pd_special)));
882
883 /*
884 * Scan the line pointer array and build a list of just the ones we are
885 * going to keep. Notice we do not modify the page yet, since we are
886 * still validity-checking.
887 */
888 nline = PageGetMaxOffsetNumber(page);
889 itemidptr = itemidbase;
890 totallen = 0;
891 nused = 0;
892 nextitm = 0;
893 for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
894 {
895 lp = PageGetItemId(page, offnum);
896 Assert(ItemIdHasStorage(lp));
897 size = ItemIdGetLength(lp);
898 offset = ItemIdGetOffset(lp);
899 if (offset < pd_upper ||
900 (offset + size) > pd_special ||
901 offset != MAXALIGN(offset))
902 ereport(ERROR,
903 (errcode(ERRCODE_DATA_CORRUPTED),
904 errmsg("corrupted line pointer: offset = %u, size = %u",
905 offset, (unsigned int) size)));
906
907 if (nextitm < nitems && offnum == itemnos[nextitm])
908 {
909 /* skip item to be deleted */
910 nextitm++;
911 }
912 else
913 {
914 itemidptr->offsetindex = nused; /* where it will go */
915 itemidptr->itemoff = offset;
916 itemidptr->alignedlen = MAXALIGN(size);
917 totallen += itemidptr->alignedlen;
918 newitemids[nused] = *lp;
919 itemidptr++;
920 nused++;
921 }
922 }
923
924 /* this will catch invalid or out-of-order itemnos[] */
925 if (nextitm != nitems)
926 elog(ERROR, "incorrect index offsets supplied");
927
928 if (totallen > (Size) (pd_special - pd_lower))
929 ereport(ERROR,
930 (errcode(ERRCODE_DATA_CORRUPTED),
931 errmsg("corrupted item lengths: total %u, available space %u",
932 (unsigned int) totallen, pd_special - pd_lower)));
933
934 /*
935 * Looks good. Overwrite the line pointers with the copy, from which we've
936 * removed all the unused items.
937 */
938 memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
939 phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
940
941 /* and compactify the tuple data */
942 compactify_tuples(itemidbase, nused, page);
943}
944
945
946/*
947 * PageIndexTupleDeleteNoCompact
948 *
949 * Remove the specified tuple from an index page, but set its line pointer
950 * to "unused" instead of compacting it out, except that it can be removed
951 * if it's the last line pointer on the page.
952 *
953 * This is used for index AMs that require that existing TIDs of live tuples
954 * remain unchanged, and are willing to allow unused line pointers instead.
955 */
956void
957PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
958{
959 PageHeader phdr = (PageHeader) page;
960 char *addr;
961 ItemId tup;
962 Size size;
963 unsigned offset;
964 int nline;
965
966 /*
967 * As with PageRepairFragmentation, paranoia seems justified.
968 */
969 if (phdr->pd_lower < SizeOfPageHeaderData ||
970 phdr->pd_lower > phdr->pd_upper ||
971 phdr->pd_upper > phdr->pd_special ||
972 phdr->pd_special > BLCKSZ ||
973 phdr->pd_special != MAXALIGN(phdr->pd_special))
974 ereport(ERROR,
975 (errcode(ERRCODE_DATA_CORRUPTED),
976 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
977 phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
978
979 nline = PageGetMaxOffsetNumber(page);
980 if ((int) offnum <= 0 || (int) offnum > nline)
981 elog(ERROR, "invalid index offnum: %u", offnum);
982
983 tup = PageGetItemId(page, offnum);
984 Assert(ItemIdHasStorage(tup));
985 size = ItemIdGetLength(tup);
986 offset = ItemIdGetOffset(tup);
987
988 if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
989 offset != MAXALIGN(offset))
990 ereport(ERROR,
991 (errcode(ERRCODE_DATA_CORRUPTED),
992 errmsg("corrupted line pointer: offset = %u, size = %u",
993 offset, (unsigned int) size)));
994
995 /* Amount of space to actually be deleted */
996 size = MAXALIGN(size);
997
998 /*
999 * Either set the line pointer to "unused", or zap it if it's the last
1000 * one. (Note: it's possible that the next-to-last one(s) are already
1001 * unused, but we do not trouble to try to compact them out if so.)
1002 */
1003 if ((int) offnum < nline)
1004 ItemIdSetUnused(tup);
1005 else
1006 {
1007 phdr->pd_lower -= sizeof(ItemIdData);
1008 nline--; /* there's one less than when we started */
1009 }
1010
1011 /*
1012 * Now move everything between the old upper bound (beginning of tuple
1013 * space) and the beginning of the deleted tuple forward, so that space in
1014 * the middle of the page is left free. If we've just deleted the tuple
1015 * at the beginning of tuple space, then there's no need to do the copy.
1016 */
1017
1018 /* beginning of tuple space */
1019 addr = (char *) page + phdr->pd_upper;
1020
1021 if (offset > phdr->pd_upper)
1022 memmove(addr + size, addr, offset - phdr->pd_upper);
1023
1024 /* adjust free space boundary pointer */
1025 phdr->pd_upper += size;
1026
1027 /*
1028 * Finally, we need to adjust the linp entries that remain.
1029 *
1030 * Anything that used to be before the deleted tuple's data was moved
1031 * forward by the size of the deleted tuple.
1032 */
1033 if (!PageIsEmpty(page))
1034 {
1035 int i;
1036
1037 for (i = 1; i <= nline; i++)
1038 {
1039 ItemId ii = PageGetItemId(phdr, i);
1040
1041 if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1042 ii->lp_off += size;
1043 }
1044 }
1045}
1046
1047
1048/*
1049 * PageIndexTupleOverwrite
1050 *
1051 * Replace a specified tuple on an index page.
1052 *
1053 * The new tuple is placed exactly where the old one had been, shifting
1054 * other tuples' data up or down as needed to keep the page compacted.
1055 * This is better than deleting and reinserting the tuple, because it
1056 * avoids any data shifting when the tuple size doesn't change; and
1057 * even when it does, we avoid moving the line pointers around.
1058 * Conceivably this could also be of use to an index AM that cares about
1059 * the physical order of tuples as well as their ItemId order.
1060 *
1061 * If there's insufficient space for the new tuple, return false. Other
1062 * errors represent data-corruption problems, so we just elog.
1063 */
1064bool
1065PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
1066 Item newtup, Size newsize)
1067{
1068 PageHeader phdr = (PageHeader) page;
1069 ItemId tupid;
1070 int oldsize;
1071 unsigned offset;
1072 Size alignednewsize;
1073 int size_diff;
1074 int itemcount;
1075
1076 /*
1077 * As with PageRepairFragmentation, paranoia seems justified.
1078 */
1079 if (phdr->pd_lower < SizeOfPageHeaderData ||
1080 phdr->pd_lower > phdr->pd_upper ||
1081 phdr->pd_upper > phdr->pd_special ||
1082 phdr->pd_special > BLCKSZ ||
1083 phdr->pd_special != MAXALIGN(phdr->pd_special))
1084 ereport(ERROR,
1085 (errcode(ERRCODE_DATA_CORRUPTED),
1086 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
1087 phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
1088
1089 itemcount = PageGetMaxOffsetNumber(page);
1090 if ((int) offnum <= 0 || (int) offnum > itemcount)
1091 elog(ERROR, "invalid index offnum: %u", offnum);
1092
1093 tupid = PageGetItemId(page, offnum);
1094 Assert(ItemIdHasStorage(tupid));
1095 oldsize = ItemIdGetLength(tupid);
1096 offset = ItemIdGetOffset(tupid);
1097
1098 if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
1099 offset != MAXALIGN(offset))
1100 ereport(ERROR,
1101 (errcode(ERRCODE_DATA_CORRUPTED),
1102 errmsg("corrupted line pointer: offset = %u, size = %u",
1103 offset, (unsigned int) oldsize)));
1104
1105 /*
1106 * Determine actual change in space requirement, check for page overflow.
1107 */
1108 oldsize = MAXALIGN(oldsize);
1109 alignednewsize = MAXALIGN(newsize);
1110 if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
1111 return false;
1112
1113 /*
1114 * Relocate existing data and update line pointers, unless the new tuple
1115 * is the same size as the old (after alignment), in which case there's
1116 * nothing to do. Notice that what we have to relocate is data before the
1117 * target tuple, not data after, so it's convenient to express size_diff
1118 * as the amount by which the tuple's size is decreasing, making it the
1119 * delta to add to pd_upper and affected line pointers.
1120 */
1121 size_diff = oldsize - (int) alignednewsize;
1122 if (size_diff != 0)
1123 {
1124 char *addr = (char *) page + phdr->pd_upper;
1125 int i;
1126
1127 /* relocate all tuple data before the target tuple */
1128 memmove(addr + size_diff, addr, offset - phdr->pd_upper);
1129
1130 /* adjust free space boundary pointer */
1131 phdr->pd_upper += size_diff;
1132
1133 /* adjust affected line pointers too */
1134 for (i = FirstOffsetNumber; i <= itemcount; i++)
1135 {
1136 ItemId ii = PageGetItemId(phdr, i);
1137
1138 /* Allow items without storage; currently only BRIN needs that */
1139 if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
1140 ii->lp_off += size_diff;
1141 }
1142 }
1143
1144 /* Update the item's tuple length (other fields shouldn't change) */
1145 ItemIdSetNormal(tupid, offset + size_diff, newsize);
1146
1147 /* Copy new tuple data onto page */
1148 memcpy(PageGetItem(page, tupid), newtup, newsize);
1149
1150 return true;
1151}
1152
1153
1154/*
1155 * Set checksum for a page in shared buffers.
1156 *
1157 * If checksums are disabled, or if the page is not initialized, just return
1158 * the input. Otherwise, we must make a copy of the page before calculating
1159 * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
1160 * from making the final checksum invalid. It doesn't matter if we include or
1161 * exclude hints during the copy, as long as we write a valid page and
1162 * associated checksum.
1163 *
1164 * Returns a pointer to the block-sized data that needs to be written. Uses
1165 * statically-allocated memory, so the caller must immediately write the
1166 * returned page and not refer to it again.
1167 */
1168char *
1169PageSetChecksumCopy(Page page, BlockNumber blkno)
1170{
1171 static char *pageCopy = NULL;
1172
1173 /* If we don't need a checksum, just return the passed-in data */
1174 if (PageIsNew(page) || !DataChecksumsEnabled())
1175 return (char *) page;
1176
1177 /*
1178 * We allocate the copy space once and use it over on each subsequent
1179 * call. The point of palloc'ing here, rather than having a static char
1180 * array, is first to ensure adequate alignment for the checksumming code
1181 * and second to avoid wasting space in processes that never call this.
1182 */
1183 if (pageCopy == NULL)
1184 pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
1185
1186 memcpy(pageCopy, (char *) page, BLCKSZ);
1187 ((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
1188 return pageCopy;
1189}
1190
1191/*
1192 * Set checksum for a page in private memory.
1193 *
1194 * This must only be used when we know that no other process can be modifying
1195 * the page buffer.
1196 */
1197void
1198PageSetChecksumInplace(Page page, BlockNumber blkno)
1199{
1200 /* If we don't need a checksum, just return */
1201 if (PageIsNew(page) || !DataChecksumsEnabled())
1202 return;
1203
1204 ((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
1205}
1206