1/*-------------------------------------------------------------------------
2 *
3 * nbtxlog.c
4 * WAL replay logic for btrees.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtxlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/bufmask.h"
18#include "access/nbtree.h"
19#include "access/nbtxlog.h"
20#include "access/transam.h"
21#include "access/xlog.h"
22#include "access/xlogutils.h"
23#include "storage/procarray.h"
24#include "miscadmin.h"
25
26/*
27 * _bt_restore_page -- re-enter all the index tuples on a page
28 *
29 * The page is freshly init'd, and *from (length len) is a copy of what
30 * had been its upper part (pd_upper to pd_special). We assume that the
31 * tuples had been added to the page in item-number order, and therefore
32 * the one with highest item number appears first (lowest on the page).
33 */
34static void
35_bt_restore_page(Page page, char *from, int len)
36{
37 IndexTupleData itupdata;
38 Size itemsz;
39 char *end = from + len;
40 Item items[MaxIndexTuplesPerPage];
41 uint16 itemsizes[MaxIndexTuplesPerPage];
42 int i;
43 int nitems;
44
45 /*
46 * To get the items back in the original order, we add them to the page in
47 * reverse. To figure out where one tuple ends and another begins, we
48 * have to scan them in forward order first.
49 */
50 i = 0;
51 while (from < end)
52 {
53 /*
54 * As we step through the items, 'from' won't always be properly
55 * aligned, so we need to use memcpy(). Further, we use Item (which
56 * is just a char*) here for our items array for the same reason;
57 * wouldn't want the compiler or anyone thinking that an item is
58 * aligned when it isn't.
59 */
60 memcpy(&itupdata, from, sizeof(IndexTupleData));
61 itemsz = IndexTupleSize(&itupdata);
62 itemsz = MAXALIGN(itemsz);
63
64 items[i] = (Item) from;
65 itemsizes[i] = itemsz;
66 i++;
67
68 from += itemsz;
69 }
70 nitems = i;
71
72 for (i = nitems - 1; i >= 0; i--)
73 {
74 if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
75 false, false) == InvalidOffsetNumber)
76 elog(PANIC, "_bt_restore_page: cannot add item to page");
77 from += itemsz;
78 }
79}
80
81static void
82_bt_restore_meta(XLogReaderState *record, uint8 block_id)
83{
84 XLogRecPtr lsn = record->EndRecPtr;
85 Buffer metabuf;
86 Page metapg;
87 BTMetaPageData *md;
88 BTPageOpaque pageop;
89 xl_btree_metadata *xlrec;
90 char *ptr;
91 Size len;
92
93 metabuf = XLogInitBufferForRedo(record, block_id);
94 ptr = XLogRecGetBlockData(record, block_id, &len);
95
96 Assert(len == sizeof(xl_btree_metadata));
97 Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
98 xlrec = (xl_btree_metadata *) ptr;
99 metapg = BufferGetPage(metabuf);
100
101 _bt_pageinit(metapg, BufferGetPageSize(metabuf));
102
103 md = BTPageGetMeta(metapg);
104 md->btm_magic = BTREE_MAGIC;
105 md->btm_version = xlrec->version;
106 md->btm_root = xlrec->root;
107 md->btm_level = xlrec->level;
108 md->btm_fastroot = xlrec->fastroot;
109 md->btm_fastlevel = xlrec->fastlevel;
110 /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
111 Assert(md->btm_version >= BTREE_NOVAC_VERSION);
112 md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
113 md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
114
115 pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
116 pageop->btpo_flags = BTP_META;
117
118 /*
119 * Set pd_lower just past the end of the metadata. This is essential,
120 * because without doing so, metadata will be lost if xlog.c compresses
121 * the page.
122 */
123 ((PageHeader) metapg)->pd_lower =
124 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
125
126 PageSetLSN(metapg, lsn);
127 MarkBufferDirty(metabuf);
128 UnlockReleaseBuffer(metabuf);
129}
130
131/*
132 * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
133 *
134 * This is a common subroutine of the redo functions of all the WAL record
135 * types that can insert a downlink: insert, split, and newroot.
136 */
137static void
138_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
139{
140 XLogRecPtr lsn = record->EndRecPtr;
141 Buffer buf;
142
143 if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
144 {
145 Page page = (Page) BufferGetPage(buf);
146 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
147
148 Assert(P_INCOMPLETE_SPLIT(pageop));
149 pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
150
151 PageSetLSN(page, lsn);
152 MarkBufferDirty(buf);
153 }
154 if (BufferIsValid(buf))
155 UnlockReleaseBuffer(buf);
156}
157
158static void
159btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
160{
161 XLogRecPtr lsn = record->EndRecPtr;
162 xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
163 Buffer buffer;
164 Page page;
165
166 /*
167 * Insertion to an internal page finishes an incomplete split at the child
168 * level. Clear the incomplete-split flag in the child. Note: during
169 * normal operation, the child and parent pages are locked at the same
170 * time, so that clearing the flag and inserting the downlink appear
171 * atomic to other backends. We don't bother with that during replay,
172 * because readers don't care about the incomplete-split flag and there
173 * cannot be updates happening.
174 */
175 if (!isleaf)
176 _bt_clear_incomplete_split(record, 1);
177 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
178 {
179 Size datalen;
180 char *datapos = XLogRecGetBlockData(record, 0, &datalen);
181
182 page = BufferGetPage(buffer);
183
184 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
185 false, false) == InvalidOffsetNumber)
186 elog(PANIC, "btree_xlog_insert: failed to add item");
187
188 PageSetLSN(page, lsn);
189 MarkBufferDirty(buffer);
190 }
191 if (BufferIsValid(buffer))
192 UnlockReleaseBuffer(buffer);
193
194 /*
195 * Note: in normal operation, we'd update the metapage while still holding
196 * lock on the page we inserted into. But during replay it's not
197 * necessary to hold that lock, since no other index updates can be
198 * happening concurrently, and readers will cope fine with following an
199 * obsolete link from the metapage.
200 */
201 if (ismeta)
202 _bt_restore_meta(record, 2);
203}
204
205static void
206btree_xlog_split(bool onleft, XLogReaderState *record)
207{
208 XLogRecPtr lsn = record->EndRecPtr;
209 xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
210 bool isleaf = (xlrec->level == 0);
211 Buffer lbuf;
212 Buffer rbuf;
213 Page rpage;
214 BTPageOpaque ropaque;
215 char *datapos;
216 Size datalen;
217 BlockNumber leftsib;
218 BlockNumber rightsib;
219 BlockNumber rnext;
220
221 XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
222 XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
223 if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
224 rnext = P_NONE;
225
226 /*
227 * Clear the incomplete split flag on the left sibling of the child page
228 * this is a downlink for. (Like in btree_xlog_insert, this can be done
229 * before locking the other pages)
230 */
231 if (!isleaf)
232 _bt_clear_incomplete_split(record, 3);
233
234 /* Reconstruct right (new) sibling page from scratch */
235 rbuf = XLogInitBufferForRedo(record, 1);
236 datapos = XLogRecGetBlockData(record, 1, &datalen);
237 rpage = (Page) BufferGetPage(rbuf);
238
239 _bt_pageinit(rpage, BufferGetPageSize(rbuf));
240 ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
241
242 ropaque->btpo_prev = leftsib;
243 ropaque->btpo_next = rnext;
244 ropaque->btpo.level = xlrec->level;
245 ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
246 ropaque->btpo_cycleid = 0;
247
248 _bt_restore_page(rpage, datapos, datalen);
249
250 PageSetLSN(rpage, lsn);
251 MarkBufferDirty(rbuf);
252
253 /* Now reconstruct left (original) sibling page */
254 if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
255 {
256 /*
257 * To retain the same physical order of the tuples that they had, we
258 * initialize a temporary empty page for the left page and add all the
259 * items to that in item number order. This mirrors how _bt_split()
260 * works. Retaining the same physical order makes WAL consistency
261 * checking possible. See also _bt_restore_page(), which does the
262 * same for the right page.
263 */
264 Page lpage = (Page) BufferGetPage(lbuf);
265 BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
266 OffsetNumber off;
267 IndexTuple newitem = NULL,
268 left_hikey = NULL;
269 Size newitemsz = 0,
270 left_hikeysz = 0;
271 Page newlpage;
272 OffsetNumber leftoff;
273
274 datapos = XLogRecGetBlockData(record, 0, &datalen);
275
276 if (onleft)
277 {
278 newitem = (IndexTuple) datapos;
279 newitemsz = MAXALIGN(IndexTupleSize(newitem));
280 datapos += newitemsz;
281 datalen -= newitemsz;
282 }
283
284 /* Extract left hikey and its size (assuming 16-bit alignment) */
285 left_hikey = (IndexTuple) datapos;
286 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
287 datapos += left_hikeysz;
288 datalen -= left_hikeysz;
289
290 Assert(datalen == 0);
291
292 newlpage = PageGetTempPageCopySpecial(lpage);
293
294 /* Set high key */
295 leftoff = P_HIKEY;
296 if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz,
297 P_HIKEY, false, false) == InvalidOffsetNumber)
298 elog(PANIC, "failed to add high key to left page after split");
299 leftoff = OffsetNumberNext(leftoff);
300
301 for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
302 {
303 ItemId itemid;
304 Size itemsz;
305 IndexTuple item;
306
307 /* add the new item if it was inserted on left page */
308 if (onleft && off == xlrec->newitemoff)
309 {
310 if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
311 false, false) == InvalidOffsetNumber)
312 elog(ERROR, "failed to add new item to left page after split");
313 leftoff = OffsetNumberNext(leftoff);
314 }
315
316 itemid = PageGetItemId(lpage, off);
317 itemsz = ItemIdGetLength(itemid);
318 item = (IndexTuple) PageGetItem(lpage, itemid);
319 if (PageAddItem(newlpage, (Item) item, itemsz, leftoff,
320 false, false) == InvalidOffsetNumber)
321 elog(ERROR, "failed to add old item to left page after split");
322 leftoff = OffsetNumberNext(leftoff);
323 }
324
325 /* cope with possibility that newitem goes at the end */
326 if (onleft && off == xlrec->newitemoff)
327 {
328 if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
329 false, false) == InvalidOffsetNumber)
330 elog(ERROR, "failed to add new item to left page after split");
331 leftoff = OffsetNumberNext(leftoff);
332 }
333
334 PageRestoreTempPage(newlpage, lpage);
335
336 /* Fix opaque fields */
337 lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
338 if (isleaf)
339 lopaque->btpo_flags |= BTP_LEAF;
340 lopaque->btpo_next = rightsib;
341 lopaque->btpo_cycleid = 0;
342
343 PageSetLSN(lpage, lsn);
344 MarkBufferDirty(lbuf);
345 }
346
347 /*
348 * We no longer need the buffers. They must be released together, so that
349 * readers cannot observe two inconsistent halves.
350 */
351 if (BufferIsValid(lbuf))
352 UnlockReleaseBuffer(lbuf);
353 UnlockReleaseBuffer(rbuf);
354
355 /*
356 * Fix left-link of the page to the right of the new right sibling.
357 *
358 * Note: in normal operation, we do this while still holding lock on the
359 * two split pages. However, that's not necessary for correctness in WAL
360 * replay, because no other index update can be in progress, and readers
361 * will cope properly when following an obsolete left-link.
362 */
363 if (rnext != P_NONE)
364 {
365 Buffer buffer;
366
367 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
368 {
369 Page page = (Page) BufferGetPage(buffer);
370 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
371
372 pageop->btpo_prev = rightsib;
373
374 PageSetLSN(page, lsn);
375 MarkBufferDirty(buffer);
376 }
377 if (BufferIsValid(buffer))
378 UnlockReleaseBuffer(buffer);
379 }
380}
381
382static void
383btree_xlog_vacuum(XLogReaderState *record)
384{
385 XLogRecPtr lsn = record->EndRecPtr;
386 Buffer buffer;
387 Page page;
388 BTPageOpaque opaque;
389#ifdef UNUSED
390 xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
391
392 /*
393 * This section of code is thought to be no longer needed, after analysis
394 * of the calling paths. It is retained to allow the code to be reinstated
395 * if a flaw is revealed in that thinking.
396 *
397 * If we are running non-MVCC scans using this index we need to do some
398 * additional work to ensure correctness, which is known as a "pin scan"
399 * described in more detail in next paragraphs. We used to do the extra
400 * work in all cases, whereas we now avoid that work in most cases. If
401 * lastBlockVacuumed is set to InvalidBlockNumber then we skip the
402 * additional work required for the pin scan.
403 *
404 * Avoiding this extra work is important since it requires us to touch
405 * every page in the index, so is an O(N) operation. Worse, it is an
406 * operation performed in the foreground during redo, so it delays
407 * replication directly.
408 *
409 * If queries might be active then we need to ensure every leaf page is
410 * unpinned between the lastBlockVacuumed and the current block, if there
411 * are any. This prevents replay of the VACUUM from reaching the stage of
412 * removing heap tuples while there could still be indexscans "in flight"
413 * to those particular tuples for those scans which could be confused by
414 * finding new tuples at the old TID locations (see nbtree/README).
415 *
416 * It might be worth checking if there are actually any backends running;
417 * if not, we could just skip this.
418 *
419 * Since VACUUM can visit leaf pages out-of-order, it might issue records
420 * with lastBlockVacuumed >= block; that's not an error, it just means
421 * nothing to do now.
422 *
423 * Note: since we touch all pages in the range, we will lock non-leaf
424 * pages, and also any empty (all-zero) pages that may be in the index. It
425 * doesn't seem worth the complexity to avoid that. But it's important
426 * that HotStandbyActiveInReplay() will not return true if the database
427 * isn't yet consistent; so we need not fear reading still-corrupt blocks
428 * here during crash recovery.
429 */
430 if (HotStandbyActiveInReplay() && BlockNumberIsValid(xlrec->lastBlockVacuumed))
431 {
432 RelFileNode thisrnode;
433 BlockNumber thisblkno;
434 BlockNumber blkno;
435
436 XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
437
438 for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
439 {
440 /*
441 * We use RBM_NORMAL_NO_LOG mode because it's not an error
442 * condition to see all-zero pages. The original btvacuumpage
443 * scan would have skipped over all-zero pages, noting them in FSM
444 * but not bothering to initialize them just yet; so we mustn't
445 * throw an error here. (We could skip acquiring the cleanup lock
446 * if PageIsNew, but it's probably not worth the cycles to test.)
447 *
448 * XXX we don't actually need to read the block, we just need to
449 * confirm it is unpinned. If we had a special call into the
450 * buffer manager we could optimise this so that if the block is
451 * not in shared_buffers we confirm it as unpinned. Optimizing
452 * this is now moot, since in most cases we avoid the scan.
453 */
454 buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
455 RBM_NORMAL_NO_LOG);
456 if (BufferIsValid(buffer))
457 {
458 LockBufferForCleanup(buffer);
459 UnlockReleaseBuffer(buffer);
460 }
461 }
462 }
463#endif
464
465 /*
466 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
467 * page. See nbtree/README for details.
468 */
469 if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
470 == BLK_NEEDS_REDO)
471 {
472 char *ptr;
473 Size len;
474
475 ptr = XLogRecGetBlockData(record, 0, &len);
476
477 page = (Page) BufferGetPage(buffer);
478
479 if (len > 0)
480 {
481 OffsetNumber *unused;
482 OffsetNumber *unend;
483
484 unused = (OffsetNumber *) ptr;
485 unend = (OffsetNumber *) ((char *) ptr + len);
486
487 if ((unend - unused) > 0)
488 PageIndexMultiDelete(page, unused, unend - unused);
489 }
490
491 /*
492 * Mark the page as not containing any LP_DEAD items --- see comments
493 * in _bt_delitems_vacuum().
494 */
495 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
496 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
497
498 PageSetLSN(page, lsn);
499 MarkBufferDirty(buffer);
500 }
501 if (BufferIsValid(buffer))
502 UnlockReleaseBuffer(buffer);
503}
504
505static void
506btree_xlog_delete(XLogReaderState *record)
507{
508 XLogRecPtr lsn = record->EndRecPtr;
509 xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
510 Buffer buffer;
511 Page page;
512 BTPageOpaque opaque;
513
514 /*
515 * If we have any conflict processing to do, it must happen before we
516 * update the page.
517 *
518 * Btree delete records can conflict with standby queries. You might
519 * think that vacuum records would conflict as well, but we've handled
520 * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
521 * cleaned by the vacuum of the heap and so we can resolve any conflicts
522 * just once when that arrives. After that we know that no conflicts
523 * exist from individual btree vacuum records on that index.
524 */
525 if (InHotStandby)
526 {
527 RelFileNode rnode;
528
529 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
530
531 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
532 }
533
534 /*
535 * We don't need to take a cleanup lock to apply these changes. See
536 * nbtree/README for details.
537 */
538 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
539 {
540 page = (Page) BufferGetPage(buffer);
541
542 if (XLogRecGetDataLen(record) > SizeOfBtreeDelete)
543 {
544 OffsetNumber *unused;
545
546 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
547
548 PageIndexMultiDelete(page, unused, xlrec->nitems);
549 }
550
551 /*
552 * Mark the page as not containing any LP_DEAD items --- see comments
553 * in _bt_delitems_delete().
554 */
555 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
556 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
557
558 PageSetLSN(page, lsn);
559 MarkBufferDirty(buffer);
560 }
561 if (BufferIsValid(buffer))
562 UnlockReleaseBuffer(buffer);
563}
564
565static void
566btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
567{
568 XLogRecPtr lsn = record->EndRecPtr;
569 xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
570 Buffer buffer;
571 Page page;
572 BTPageOpaque pageop;
573 IndexTupleData trunctuple;
574
575 /*
576 * In normal operation, we would lock all the pages this WAL record
577 * touches before changing any of them. In WAL replay, it should be okay
578 * to lock just one page at a time, since no concurrent index updates can
579 * be happening, and readers should not care whether they arrive at the
580 * target page or not (since it's surely empty).
581 */
582
583 /* parent page */
584 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
585 {
586 OffsetNumber poffset;
587 ItemId itemid;
588 IndexTuple itup;
589 OffsetNumber nextoffset;
590 BlockNumber rightsib;
591
592 page = (Page) BufferGetPage(buffer);
593 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
594
595 poffset = xlrec->poffset;
596
597 nextoffset = OffsetNumberNext(poffset);
598 itemid = PageGetItemId(page, nextoffset);
599 itup = (IndexTuple) PageGetItem(page, itemid);
600 rightsib = BTreeInnerTupleGetDownLink(itup);
601
602 itemid = PageGetItemId(page, poffset);
603 itup = (IndexTuple) PageGetItem(page, itemid);
604 BTreeInnerTupleSetDownLink(itup, rightsib);
605 nextoffset = OffsetNumberNext(poffset);
606 PageIndexTupleDelete(page, nextoffset);
607
608 PageSetLSN(page, lsn);
609 MarkBufferDirty(buffer);
610 }
611 if (BufferIsValid(buffer))
612 UnlockReleaseBuffer(buffer);
613
614 /* Rewrite the leaf page as a halfdead page */
615 buffer = XLogInitBufferForRedo(record, 0);
616 page = (Page) BufferGetPage(buffer);
617
618 _bt_pageinit(page, BufferGetPageSize(buffer));
619 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
620
621 pageop->btpo_prev = xlrec->leftblk;
622 pageop->btpo_next = xlrec->rightblk;
623 pageop->btpo.level = 0;
624 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
625 pageop->btpo_cycleid = 0;
626
627 /*
628 * Construct a dummy hikey item that points to the next parent to be
629 * deleted (if any).
630 */
631 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
632 trunctuple.t_info = sizeof(IndexTupleData);
633 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
634
635 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
636 false, false) == InvalidOffsetNumber)
637 elog(ERROR, "could not add dummy high key to half-dead page");
638
639 PageSetLSN(page, lsn);
640 MarkBufferDirty(buffer);
641 UnlockReleaseBuffer(buffer);
642}
643
644
645static void
646btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
647{
648 XLogRecPtr lsn = record->EndRecPtr;
649 xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
650 BlockNumber leftsib;
651 BlockNumber rightsib;
652 Buffer buffer;
653 Page page;
654 BTPageOpaque pageop;
655
656 leftsib = xlrec->leftsib;
657 rightsib = xlrec->rightsib;
658
659 /*
660 * In normal operation, we would lock all the pages this WAL record
661 * touches before changing any of them. In WAL replay, it should be okay
662 * to lock just one page at a time, since no concurrent index updates can
663 * be happening, and readers should not care whether they arrive at the
664 * target page or not (since it's surely empty).
665 */
666
667 /* Fix left-link of right sibling */
668 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
669 {
670 page = (Page) BufferGetPage(buffer);
671 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
672 pageop->btpo_prev = leftsib;
673
674 PageSetLSN(page, lsn);
675 MarkBufferDirty(buffer);
676 }
677 if (BufferIsValid(buffer))
678 UnlockReleaseBuffer(buffer);
679
680 /* Fix right-link of left sibling, if any */
681 if (leftsib != P_NONE)
682 {
683 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
684 {
685 page = (Page) BufferGetPage(buffer);
686 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
687 pageop->btpo_next = rightsib;
688
689 PageSetLSN(page, lsn);
690 MarkBufferDirty(buffer);
691 }
692 if (BufferIsValid(buffer))
693 UnlockReleaseBuffer(buffer);
694 }
695
696 /* Rewrite target page as empty deleted page */
697 buffer = XLogInitBufferForRedo(record, 0);
698 page = (Page) BufferGetPage(buffer);
699
700 _bt_pageinit(page, BufferGetPageSize(buffer));
701 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
702
703 pageop->btpo_prev = leftsib;
704 pageop->btpo_next = rightsib;
705 pageop->btpo.xact = xlrec->btpo_xact;
706 pageop->btpo_flags = BTP_DELETED;
707 pageop->btpo_cycleid = 0;
708
709 PageSetLSN(page, lsn);
710 MarkBufferDirty(buffer);
711 UnlockReleaseBuffer(buffer);
712
713 /*
714 * If we deleted a parent of the targeted leaf page, instead of the leaf
715 * itself, update the leaf to point to the next remaining child in the
716 * branch.
717 */
718 if (XLogRecHasBlockRef(record, 3))
719 {
720 /*
721 * There is no real data on the page, so we just re-create it from
722 * scratch using the information from the WAL record.
723 */
724 IndexTupleData trunctuple;
725
726 buffer = XLogInitBufferForRedo(record, 3);
727 page = (Page) BufferGetPage(buffer);
728
729 _bt_pageinit(page, BufferGetPageSize(buffer));
730 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
731
732 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
733 pageop->btpo_prev = xlrec->leafleftsib;
734 pageop->btpo_next = xlrec->leafrightsib;
735 pageop->btpo.level = 0;
736 pageop->btpo_cycleid = 0;
737
738 /* Add a dummy hikey item */
739 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
740 trunctuple.t_info = sizeof(IndexTupleData);
741 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
742
743 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
744 false, false) == InvalidOffsetNumber)
745 elog(ERROR, "could not add dummy high key to half-dead page");
746
747 PageSetLSN(page, lsn);
748 MarkBufferDirty(buffer);
749 UnlockReleaseBuffer(buffer);
750 }
751
752 /* Update metapage if needed */
753 if (info == XLOG_BTREE_UNLINK_PAGE_META)
754 _bt_restore_meta(record, 4);
755}
756
757static void
758btree_xlog_newroot(XLogReaderState *record)
759{
760 XLogRecPtr lsn = record->EndRecPtr;
761 xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
762 Buffer buffer;
763 Page page;
764 BTPageOpaque pageop;
765 char *ptr;
766 Size len;
767
768 buffer = XLogInitBufferForRedo(record, 0);
769 page = (Page) BufferGetPage(buffer);
770
771 _bt_pageinit(page, BufferGetPageSize(buffer));
772 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
773
774 pageop->btpo_flags = BTP_ROOT;
775 pageop->btpo_prev = pageop->btpo_next = P_NONE;
776 pageop->btpo.level = xlrec->level;
777 if (xlrec->level == 0)
778 pageop->btpo_flags |= BTP_LEAF;
779 pageop->btpo_cycleid = 0;
780
781 if (xlrec->level > 0)
782 {
783 ptr = XLogRecGetBlockData(record, 0, &len);
784 _bt_restore_page(page, ptr, len);
785
786 /* Clear the incomplete-split flag in left child */
787 _bt_clear_incomplete_split(record, 1);
788 }
789
790 PageSetLSN(page, lsn);
791 MarkBufferDirty(buffer);
792 UnlockReleaseBuffer(buffer);
793
794 _bt_restore_meta(record, 2);
795}
796
797static void
798btree_xlog_reuse_page(XLogReaderState *record)
799{
800 xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
801
802 /*
803 * Btree reuse_page records exist to provide a conflict point when we
804 * reuse pages in the index via the FSM. That's all they do though.
805 *
806 * latestRemovedXid was the page's btpo.xact. The btpo.xact <
807 * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
808 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
809 * Consequently, one XID value achieves the same exclusion effect on
810 * master and standby.
811 */
812 if (InHotStandby)
813 {
814 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
815 xlrec->node);
816 }
817}
818
819void
820btree_redo(XLogReaderState *record)
821{
822 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
823
824 switch (info)
825 {
826 case XLOG_BTREE_INSERT_LEAF:
827 btree_xlog_insert(true, false, record);
828 break;
829 case XLOG_BTREE_INSERT_UPPER:
830 btree_xlog_insert(false, false, record);
831 break;
832 case XLOG_BTREE_INSERT_META:
833 btree_xlog_insert(false, true, record);
834 break;
835 case XLOG_BTREE_SPLIT_L:
836 btree_xlog_split(true, record);
837 break;
838 case XLOG_BTREE_SPLIT_R:
839 btree_xlog_split(false, record);
840 break;
841 case XLOG_BTREE_VACUUM:
842 btree_xlog_vacuum(record);
843 break;
844 case XLOG_BTREE_DELETE:
845 btree_xlog_delete(record);
846 break;
847 case XLOG_BTREE_MARK_PAGE_HALFDEAD:
848 btree_xlog_mark_page_halfdead(info, record);
849 break;
850 case XLOG_BTREE_UNLINK_PAGE:
851 case XLOG_BTREE_UNLINK_PAGE_META:
852 btree_xlog_unlink_page(info, record);
853 break;
854 case XLOG_BTREE_NEWROOT:
855 btree_xlog_newroot(record);
856 break;
857 case XLOG_BTREE_REUSE_PAGE:
858 btree_xlog_reuse_page(record);
859 break;
860 case XLOG_BTREE_META_CLEANUP:
861 _bt_restore_meta(record, 0);
862 break;
863 default:
864 elog(PANIC, "btree_redo: unknown op code %u", info);
865 }
866}
867
868/*
869 * Mask a btree page before performing consistency checks on it.
870 */
871void
872btree_mask(char *pagedata, BlockNumber blkno)
873{
874 Page page = (Page) pagedata;
875 BTPageOpaque maskopaq;
876
877 mask_page_lsn_and_checksum(page);
878
879 mask_page_hint_bits(page);
880 mask_unused_space(page);
881
882 maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
883
884 if (P_ISDELETED(maskopaq))
885 {
886 /*
887 * Mask page content on a DELETED page since it will be re-initialized
888 * during replay. See btree_xlog_unlink_page() for details.
889 */
890 mask_page_content(page);
891 }
892 else if (P_ISLEAF(maskopaq))
893 {
894 /*
895 * In btree leaf pages, it is possible to modify the LP_FLAGS without
896 * emitting any WAL record. Hence, mask the line pointer flags. See
897 * _bt_killitems(), _bt_check_unique() for details.
898 */
899 mask_lp_flags(page);
900 }
901
902 /*
903 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
904 * _bt_killitems(), _bt_check_unique() for details.
905 */
906 maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
907
908 /*
909 * During replay of a btree page split, we don't set the BTP_SPLIT_END
910 * flag of the right sibling and initialize the cycle_id to 0 for the same
911 * page. See btree_xlog_split() for details.
912 */
913 maskopaq->btpo_flags &= ~BTP_SPLIT_END;
914 maskopaq->btpo_cycleid = 0;
915}
916