1/*-------------------------------------------------------------------------
2 *
3 * gistxlog.c
4 * WAL replay logic for GiST.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gistxlog.c
12 *-------------------------------------------------------------------------
13 */
14#include "postgres.h"
15
16#include "access/bufmask.h"
17#include "access/gist_private.h"
18#include "access/gistxlog.h"
19#include "access/heapam_xlog.h"
20#include "access/transam.h"
21#include "access/xloginsert.h"
22#include "access/xlogutils.h"
23#include "miscadmin.h"
24#include "storage/procarray.h"
25#include "utils/memutils.h"
26#include "utils/rel.h"
27
28static MemoryContext opCtx; /* working memory for operations */
29
30/*
31 * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
32 *
33 * Even if the WAL record includes a full-page image, we have to update the
34 * follow-right flag, because that change is not included in the full-page
35 * image. To be sure that the intermediate state with the wrong flag value is
36 * not visible to concurrent Hot Standby queries, this function handles
37 * restoring the full-page image as well as updating the flag. (Note that
38 * we never need to do anything else to the child page in the current WAL
39 * action.)
40 */
41static void
42gistRedoClearFollowRight(XLogReaderState *record, uint8 block_id)
43{
44 XLogRecPtr lsn = record->EndRecPtr;
45 Buffer buffer;
46 Page page;
47 XLogRedoAction action;
48
49 /*
50 * Note that we still update the page even if it was restored from a full
51 * page image, because the updated NSN is not included in the image.
52 */
53 action = XLogReadBufferForRedo(record, block_id, &buffer);
54 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
55 {
56 page = BufferGetPage(buffer);
57
58 GistPageSetNSN(page, lsn);
59 GistClearFollowRight(page);
60
61 PageSetLSN(page, lsn);
62 MarkBufferDirty(buffer);
63 }
64 if (BufferIsValid(buffer))
65 UnlockReleaseBuffer(buffer);
66}
67
68/*
69 * redo any page update (except page split)
70 */
71static void
72gistRedoPageUpdateRecord(XLogReaderState *record)
73{
74 XLogRecPtr lsn = record->EndRecPtr;
75 gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
76 Buffer buffer;
77 Page page;
78
79 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
80 {
81 char *begin;
82 char *data;
83 Size datalen;
84 int ninserted = 0;
85
86 data = begin = XLogRecGetBlockData(record, 0, &datalen);
87
88 page = (Page) BufferGetPage(buffer);
89
90 if (xldata->ntodelete == 1 && xldata->ntoinsert == 1)
91 {
92 /*
93 * When replacing one tuple with one other tuple, we must use
94 * PageIndexTupleOverwrite for consistency with gistplacetopage.
95 */
96 OffsetNumber offnum = *((OffsetNumber *) data);
97 IndexTuple itup;
98 Size itupsize;
99
100 data += sizeof(OffsetNumber);
101 itup = (IndexTuple) data;
102 itupsize = IndexTupleSize(itup);
103 if (!PageIndexTupleOverwrite(page, offnum, (Item) itup, itupsize))
104 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
105 (int) itupsize);
106 data += itupsize;
107 /* should be nothing left after consuming 1 tuple */
108 Assert(data - begin == datalen);
109 /* update insertion count for assert check below */
110 ninserted++;
111 }
112 else if (xldata->ntodelete > 0)
113 {
114 /* Otherwise, delete old tuples if any */
115 OffsetNumber *todelete = (OffsetNumber *) data;
116
117 data += sizeof(OffsetNumber) * xldata->ntodelete;
118
119 PageIndexMultiDelete(page, todelete, xldata->ntodelete);
120 if (GistPageIsLeaf(page))
121 GistMarkTuplesDeleted(page);
122 }
123
124 /* Add new tuples if any */
125 if (data - begin < datalen)
126 {
127 OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
128 OffsetNumberNext(PageGetMaxOffsetNumber(page));
129
130 while (data - begin < datalen)
131 {
132 IndexTuple itup = (IndexTuple) data;
133 Size sz = IndexTupleSize(itup);
134 OffsetNumber l;
135
136 data += sz;
137
138 l = PageAddItem(page, (Item) itup, sz, off, false, false);
139 if (l == InvalidOffsetNumber)
140 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
141 (int) sz);
142 off++;
143 ninserted++;
144 }
145 }
146
147 /* Check that XLOG record contained expected number of tuples */
148 Assert(ninserted == xldata->ntoinsert);
149
150 PageSetLSN(page, lsn);
151 MarkBufferDirty(buffer);
152 }
153
154 /*
155 * Fix follow-right data on left child page
156 *
157 * This must be done while still holding the lock on the target page. Note
158 * that even if the target page no longer exists, we still attempt to
159 * replay the change on the child page.
160 */
161 if (XLogRecHasBlockRef(record, 1))
162 gistRedoClearFollowRight(record, 1);
163
164 if (BufferIsValid(buffer))
165 UnlockReleaseBuffer(buffer);
166}
167
168
169/*
170 * redo delete on gist index page to remove tuples marked as DEAD during index
171 * tuple insertion
172 */
173static void
174gistRedoDeleteRecord(XLogReaderState *record)
175{
176 XLogRecPtr lsn = record->EndRecPtr;
177 gistxlogDelete *xldata = (gistxlogDelete *) XLogRecGetData(record);
178 Buffer buffer;
179 Page page;
180
181 /*
182 * If we have any conflict processing to do, it must happen before we
183 * update the page.
184 *
185 * GiST delete records can conflict with standby queries. You might think
186 * that vacuum records would conflict as well, but we've handled that
187 * already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
188 * cleaned by the vacuum of the heap and so we can resolve any conflicts
189 * just once when that arrives. After that we know that no conflicts
190 * exist from individual gist vacuum records on that index.
191 */
192 if (InHotStandby)
193 {
194 RelFileNode rnode;
195
196 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
197
198 ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
199 }
200
201 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
202 {
203 page = (Page) BufferGetPage(buffer);
204
205 if (XLogRecGetDataLen(record) > SizeOfGistxlogDelete)
206 {
207 OffsetNumber *todelete;
208
209 todelete = (OffsetNumber *) ((char *) xldata + SizeOfGistxlogDelete);
210
211 PageIndexMultiDelete(page, todelete, xldata->ntodelete);
212 }
213
214 GistClearPageHasGarbage(page);
215 GistMarkTuplesDeleted(page);
216
217 PageSetLSN(page, lsn);
218 MarkBufferDirty(buffer);
219 }
220
221 if (BufferIsValid(buffer))
222 UnlockReleaseBuffer(buffer);
223}
224
225/*
226 * Returns an array of index pointers.
227 */
228static IndexTuple *
229decodePageSplitRecord(char *begin, int len, int *n)
230{
231 char *ptr;
232 int i = 0;
233 IndexTuple *tuples;
234
235 /* extract the number of tuples */
236 memcpy(n, begin, sizeof(int));
237 ptr = begin + sizeof(int);
238
239 tuples = palloc(*n * sizeof(IndexTuple));
240
241 for (i = 0; i < *n; i++)
242 {
243 Assert(ptr - begin < len);
244 tuples[i] = (IndexTuple) ptr;
245 ptr += IndexTupleSize((IndexTuple) ptr);
246 }
247 Assert(ptr - begin == len);
248
249 return tuples;
250}
251
252static void
253gistRedoPageSplitRecord(XLogReaderState *record)
254{
255 XLogRecPtr lsn = record->EndRecPtr;
256 gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
257 Buffer firstbuffer = InvalidBuffer;
258 Buffer buffer;
259 Page page;
260 int i;
261 bool isrootsplit = false;
262
263 /*
264 * We must hold lock on the first-listed page throughout the action,
265 * including while updating the left child page (if any). We can unlock
266 * remaining pages in the list as soon as they've been written, because
267 * there is no path for concurrent queries to reach those pages without
268 * first visiting the first-listed page.
269 */
270
271 /* loop around all pages */
272 for (i = 0; i < xldata->npage; i++)
273 {
274 int flags;
275 char *data;
276 Size datalen;
277 int num;
278 BlockNumber blkno;
279 IndexTuple *tuples;
280
281 XLogRecGetBlockTag(record, i + 1, NULL, NULL, &blkno);
282 if (blkno == GIST_ROOT_BLKNO)
283 {
284 Assert(i == 0);
285 isrootsplit = true;
286 }
287
288 buffer = XLogInitBufferForRedo(record, i + 1);
289 page = (Page) BufferGetPage(buffer);
290 data = XLogRecGetBlockData(record, i + 1, &datalen);
291
292 tuples = decodePageSplitRecord(data, datalen, &num);
293
294 /* ok, clear buffer */
295 if (xldata->origleaf && blkno != GIST_ROOT_BLKNO)
296 flags = F_LEAF;
297 else
298 flags = 0;
299 GISTInitBuffer(buffer, flags);
300
301 /* and fill it */
302 gistfillbuffer(page, tuples, num, FirstOffsetNumber);
303
304 if (blkno == GIST_ROOT_BLKNO)
305 {
306 GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
307 GistPageSetNSN(page, xldata->orignsn);
308 GistClearFollowRight(page);
309 }
310 else
311 {
312 if (i < xldata->npage - 1)
313 {
314 BlockNumber nextblkno;
315
316 XLogRecGetBlockTag(record, i + 2, NULL, NULL, &nextblkno);
317 GistPageGetOpaque(page)->rightlink = nextblkno;
318 }
319 else
320 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
321 GistPageSetNSN(page, xldata->orignsn);
322 if (i < xldata->npage - 1 && !isrootsplit &&
323 xldata->markfollowright)
324 GistMarkFollowRight(page);
325 else
326 GistClearFollowRight(page);
327 }
328
329 PageSetLSN(page, lsn);
330 MarkBufferDirty(buffer);
331
332 if (i == 0)
333 firstbuffer = buffer;
334 else
335 UnlockReleaseBuffer(buffer);
336 }
337
338 /* Fix follow-right data on left child page, if any */
339 if (XLogRecHasBlockRef(record, 0))
340 gistRedoClearFollowRight(record, 0);
341
342 /* Finally, release lock on the first page */
343 UnlockReleaseBuffer(firstbuffer);
344}
345
346/* redo page deletion */
347static void
348gistRedoPageDelete(XLogReaderState *record)
349{
350 XLogRecPtr lsn = record->EndRecPtr;
351 gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
352 Buffer parentBuffer;
353 Buffer leafBuffer;
354
355 if (XLogReadBufferForRedo(record, 0, &leafBuffer) == BLK_NEEDS_REDO)
356 {
357 Page page = (Page) BufferGetPage(leafBuffer);
358
359 GistPageSetDeleted(page, xldata->deleteXid);
360
361 PageSetLSN(page, lsn);
362 MarkBufferDirty(leafBuffer);
363 }
364
365 if (XLogReadBufferForRedo(record, 1, &parentBuffer) == BLK_NEEDS_REDO)
366 {
367 Page page = (Page) BufferGetPage(parentBuffer);
368
369 PageIndexTupleDelete(page, xldata->downlinkOffset);
370
371 PageSetLSN(page, lsn);
372 MarkBufferDirty(parentBuffer);
373 }
374
375 if (BufferIsValid(parentBuffer))
376 UnlockReleaseBuffer(parentBuffer);
377 if (BufferIsValid(leafBuffer))
378 UnlockReleaseBuffer(leafBuffer);
379}
380
381static void
382gistRedoPageReuse(XLogReaderState *record)
383{
384 gistxlogPageReuse *xlrec = (gistxlogPageReuse *) XLogRecGetData(record);
385
386 /*
387 * PAGE_REUSE records exist to provide a conflict point when we reuse
388 * pages in the index via the FSM. That's all they do though.
389 *
390 * latestRemovedXid was the page's deleteXid. The deleteXid <
391 * RecentGlobalXmin test in gistPageRecyclable() conceptually mirrors the
392 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
393 * Consequently, one XID value achieves the same exclusion effect on
394 * master and standby.
395 */
396 if (InHotStandby)
397 {
398 FullTransactionId latestRemovedFullXid = xlrec->latestRemovedFullXid;
399 FullTransactionId nextFullXid = ReadNextFullTransactionId();
400 uint64 diff;
401
402 /*
403 * ResolveRecoveryConflictWithSnapshot operates on 32-bit
404 * TransactionIds, so truncate the logged FullTransactionId. If the
405 * logged value is very old, so that XID wrap-around already happened
406 * on it, there can't be any snapshots that still see it.
407 */
408 nextFullXid = ReadNextFullTransactionId();
409 diff = U64FromFullTransactionId(nextFullXid) -
410 U64FromFullTransactionId(latestRemovedFullXid);
411 if (diff < MaxTransactionId / 2)
412 {
413 TransactionId latestRemovedXid;
414
415 latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid);
416 ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
417 xlrec->node);
418 }
419 }
420}
421
422void
423gist_redo(XLogReaderState *record)
424{
425 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
426 MemoryContext oldCxt;
427
428 /*
429 * GiST indexes do not require any conflict processing. NB: If we ever
430 * implement a similar optimization we have in b-tree, and remove killed
431 * tuples outside VACUUM, we'll need to handle that here.
432 */
433
434 oldCxt = MemoryContextSwitchTo(opCtx);
435 switch (info)
436 {
437 case XLOG_GIST_PAGE_UPDATE:
438 gistRedoPageUpdateRecord(record);
439 break;
440 case XLOG_GIST_DELETE:
441 gistRedoDeleteRecord(record);
442 break;
443 case XLOG_GIST_PAGE_REUSE:
444 gistRedoPageReuse(record);
445 break;
446 case XLOG_GIST_PAGE_SPLIT:
447 gistRedoPageSplitRecord(record);
448 break;
449 case XLOG_GIST_PAGE_DELETE:
450 gistRedoPageDelete(record);
451 break;
452 default:
453 elog(PANIC, "gist_redo: unknown op code %u", info);
454 }
455
456 MemoryContextSwitchTo(oldCxt);
457 MemoryContextReset(opCtx);
458}
459
460void
461gist_xlog_startup(void)
462{
463 opCtx = createTempGistContext();
464}
465
466void
467gist_xlog_cleanup(void)
468{
469 MemoryContextDelete(opCtx);
470}
471
472/*
473 * Mask a Gist page before running consistency checks on it.
474 */
475void
476gist_mask(char *pagedata, BlockNumber blkno)
477{
478 Page page = (Page) pagedata;
479
480 mask_page_lsn_and_checksum(page);
481
482 mask_page_hint_bits(page);
483 mask_unused_space(page);
484
485 /*
486 * NSN is nothing but a special purpose LSN. Hence, mask it for the same
487 * reason as mask_page_lsn_and_checksum.
488 */
489 GistPageSetNSN(page, (uint64) MASK_MARKER);
490
491 /*
492 * We update F_FOLLOW_RIGHT flag on the left child after writing WAL
493 * record. Hence, mask this flag. See gistplacetopage() for details.
494 */
495 GistMarkFollowRight(page);
496
497 if (GistPageIsLeaf(page))
498 {
499 /*
500 * In gist leaf pages, it is possible to modify the LP_FLAGS without
501 * emitting any WAL record. Hence, mask the line pointer flags. See
502 * gistkillitems() for details.
503 */
504 mask_lp_flags(page);
505 }
506
507 /*
508 * During gist redo, we never mark a page as garbage. Hence, mask it to
509 * ignore any differences.
510 */
511 GistClearPageHasGarbage(page);
512}
513
514/*
515 * Write WAL record of a page split.
516 */
517XLogRecPtr
518gistXLogSplit(bool page_is_leaf,
519 SplitedPageLayout *dist,
520 BlockNumber origrlink, GistNSN orignsn,
521 Buffer leftchildbuf, bool markfollowright)
522{
523 gistxlogPageSplit xlrec;
524 SplitedPageLayout *ptr;
525 int npage = 0;
526 XLogRecPtr recptr;
527 int i;
528
529 for (ptr = dist; ptr; ptr = ptr->next)
530 npage++;
531
532 xlrec.origrlink = origrlink;
533 xlrec.orignsn = orignsn;
534 xlrec.origleaf = page_is_leaf;
535 xlrec.npage = (uint16) npage;
536 xlrec.markfollowright = markfollowright;
537
538 XLogBeginInsert();
539
540 /*
541 * Include a full page image of the child buf. (only necessary if a
542 * checkpoint happened since the child page was split)
543 */
544 if (BufferIsValid(leftchildbuf))
545 XLogRegisterBuffer(0, leftchildbuf, REGBUF_STANDARD);
546
547 /*
548 * NOTE: We register a lot of data. The caller must've called
549 * XLogEnsureRecordSpace() to prepare for that. We cannot do it here,
550 * because we're already in a critical section. If you change the number
551 * of buffer or data registrations here, make sure you modify the
552 * XLogEnsureRecordSpace() calls accordingly!
553 */
554 XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageSplit));
555
556 i = 1;
557 for (ptr = dist; ptr; ptr = ptr->next)
558 {
559 XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT);
560 XLogRegisterBufData(i, (char *) &(ptr->block.num), sizeof(int));
561 XLogRegisterBufData(i, (char *) ptr->list, ptr->lenlist);
562 i++;
563 }
564
565 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT);
566
567 return recptr;
568}
569
570/*
571 * Write XLOG record describing a page deletion. This also includes removal of
572 * downlink from the parent page.
573 */
574XLogRecPtr
575gistXLogPageDelete(Buffer buffer, FullTransactionId xid,
576 Buffer parentBuffer, OffsetNumber downlinkOffset)
577{
578 gistxlogPageDelete xlrec;
579 XLogRecPtr recptr;
580
581 xlrec.deleteXid = xid;
582 xlrec.downlinkOffset = downlinkOffset;
583
584 XLogBeginInsert();
585 XLogRegisterData((char *) &xlrec, SizeOfGistxlogPageDelete);
586
587 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
588 XLogRegisterBuffer(1, parentBuffer, REGBUF_STANDARD);
589
590 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE);
591
592 return recptr;
593}
594
595/*
596 * Write XLOG record about reuse of a deleted page.
597 */
598void
599gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid)
600{
601 gistxlogPageReuse xlrec_reuse;
602
603 /*
604 * Note that we don't register the buffer with the record, because this
605 * operation doesn't modify the page. This record only exists to provide a
606 * conflict point for Hot Standby.
607 */
608
609 /* XLOG stuff */
610 xlrec_reuse.node = rel->rd_node;
611 xlrec_reuse.block = blkno;
612 xlrec_reuse.latestRemovedFullXid = latestRemovedXid;
613
614 XLogBeginInsert();
615 XLogRegisterData((char *) &xlrec_reuse, SizeOfGistxlogPageReuse);
616
617 XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_REUSE);
618}
619
620/*
621 * Write XLOG record describing a page update. The update can include any
622 * number of deletions and/or insertions of tuples on a single index page.
623 *
624 * If this update inserts a downlink for a split page, also record that
625 * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
626 *
627 * Note that both the todelete array and the tuples are marked as belonging
628 * to the target buffer; they need not be stored in XLOG if XLogInsert decides
629 * to log the whole buffer contents instead.
630 */
631XLogRecPtr
632gistXLogUpdate(Buffer buffer,
633 OffsetNumber *todelete, int ntodelete,
634 IndexTuple *itup, int ituplen,
635 Buffer leftchildbuf)
636{
637 gistxlogPageUpdate xlrec;
638 int i;
639 XLogRecPtr recptr;
640
641 xlrec.ntodelete = ntodelete;
642 xlrec.ntoinsert = ituplen;
643
644 XLogBeginInsert();
645 XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate));
646
647 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
648 XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete);
649
650 /* new tuples */
651 for (i = 0; i < ituplen; i++)
652 XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i]));
653
654 /*
655 * Include a full page image of the child buf. (only necessary if a
656 * checkpoint happened since the child page was split)
657 */
658 if (BufferIsValid(leftchildbuf))
659 XLogRegisterBuffer(1, leftchildbuf, REGBUF_STANDARD);
660
661 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE);
662
663 return recptr;
664}
665
666/*
667 * Write XLOG record describing a delete of leaf index tuples marked as DEAD
668 * during new tuple insertion. One may think that this case is already covered
669 * by gistXLogUpdate(). But deletion of index tuples might conflict with
670 * standby queries and needs special handling.
671 */
672XLogRecPtr
673gistXLogDelete(Buffer buffer, OffsetNumber *todelete, int ntodelete,
674 TransactionId latestRemovedXid)
675{
676 gistxlogDelete xlrec;
677 XLogRecPtr recptr;
678
679 xlrec.latestRemovedXid = latestRemovedXid;
680 xlrec.ntodelete = ntodelete;
681
682 XLogBeginInsert();
683 XLogRegisterData((char *) &xlrec, SizeOfGistxlogDelete);
684
685 /*
686 * We need the target-offsets array whether or not we store the whole
687 * buffer, to allow us to find the latestRemovedXid on a standby server.
688 */
689 XLogRegisterData((char *) todelete, ntodelete * sizeof(OffsetNumber));
690
691 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
692
693 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_DELETE);
694
695 return recptr;
696}
697