1/*-------------------------------------------------------------------------
2 *
3 * ginxlog.c
4 * WAL replay logic for inverted index.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gin/ginxlog.c
12 *-------------------------------------------------------------------------
13 */
14#include "postgres.h"
15
16#include "access/bufmask.h"
17#include "access/gin_private.h"
18#include "access/ginxlog.h"
19#include "access/xlogutils.h"
20#include "utils/memutils.h"
21
22static MemoryContext opCtx; /* working memory for operations */
23
24static void
25ginRedoClearIncompleteSplit(XLogReaderState *record, uint8 block_id)
26{
27 XLogRecPtr lsn = record->EndRecPtr;
28 Buffer buffer;
29 Page page;
30
31 if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO)
32 {
33 page = (Page) BufferGetPage(buffer);
34 GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
35
36 PageSetLSN(page, lsn);
37 MarkBufferDirty(buffer);
38 }
39 if (BufferIsValid(buffer))
40 UnlockReleaseBuffer(buffer);
41}
42
43static void
44ginRedoCreatePTree(XLogReaderState *record)
45{
46 XLogRecPtr lsn = record->EndRecPtr;
47 ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
48 char *ptr;
49 Buffer buffer;
50 Page page;
51
52 buffer = XLogInitBufferForRedo(record, 0);
53 page = (Page) BufferGetPage(buffer);
54
55 GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
56
57 ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
58
59 /* Place page data */
60 memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
61
62 GinDataPageSetDataSize(page, data->size);
63
64 PageSetLSN(page, lsn);
65
66 MarkBufferDirty(buffer);
67 UnlockReleaseBuffer(buffer);
68}
69
70static void
71ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
72{
73 Page page = BufferGetPage(buffer);
74 ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
75 OffsetNumber offset = data->offset;
76 IndexTuple itup;
77
78 if (rightblkno != InvalidBlockNumber)
79 {
80 /* update link to right page after split */
81 Assert(!GinPageIsLeaf(page));
82 Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
83 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
84 GinSetDownlink(itup, rightblkno);
85 }
86
87 if (data->isDelete)
88 {
89 Assert(GinPageIsLeaf(page));
90 Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
91 PageIndexTupleDelete(page, offset);
92 }
93
94 itup = &data->tuple;
95
96 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
97 {
98 RelFileNode node;
99 ForkNumber forknum;
100 BlockNumber blknum;
101
102 BufferGetTag(buffer, &node, &forknum, &blknum);
103 elog(ERROR, "failed to add item to index page in %u/%u/%u",
104 node.spcNode, node.dbNode, node.relNode);
105 }
106}
107
108/*
109 * Redo recompression of posting list. Doing all the changes in-place is not
110 * always possible, because it might require more space than we've on the page.
111 * Instead, once modification is required we copy unprocessed tail of the page
112 * into separately allocated chunk of memory for further reading original
113 * versions of segments. Thanks to that we don't bother about moving page data
114 * in-place.
115 */
116static void
117ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
118{
119 int actionno;
120 int segno;
121 GinPostingList *oldseg;
122 Pointer segmentend;
123 char *walbuf;
124 int totalsize;
125 Pointer tailCopy = NULL;
126 Pointer writePtr;
127 Pointer segptr;
128
129 /*
130 * If the page is in pre-9.4 format, convert to new format first.
131 */
132 if (!GinPageIsCompressed(page))
133 {
134 ItemPointer uncompressed = (ItemPointer) GinDataPageGetData(page);
135 int nuncompressed = GinPageGetOpaque(page)->maxoff;
136 int npacked;
137
138 /*
139 * Empty leaf pages are deleted as part of vacuum, but leftmost and
140 * rightmost pages are never deleted. So, pg_upgrade'd from pre-9.4
141 * instances might contain empty leaf pages, and we need to handle
142 * them correctly.
143 */
144 if (nuncompressed > 0)
145 {
146 GinPostingList *plist;
147
148 plist = ginCompressPostingList(uncompressed, nuncompressed,
149 BLCKSZ, &npacked);
150 totalsize = SizeOfGinPostingList(plist);
151
152 Assert(npacked == nuncompressed);
153
154 memcpy(GinDataLeafPageGetPostingList(page), plist, totalsize);
155 }
156 else
157 {
158 totalsize = 0;
159 }
160
161 GinDataPageSetDataSize(page, totalsize);
162 GinPageSetCompressed(page);
163 GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
164 }
165
166 oldseg = GinDataLeafPageGetPostingList(page);
167 writePtr = (Pointer) oldseg;
168 segmentend = (Pointer) oldseg + GinDataLeafPageGetPostingListSize(page);
169 segno = 0;
170
171 walbuf = ((char *) data) + sizeof(ginxlogRecompressDataLeaf);
172 for (actionno = 0; actionno < data->nactions; actionno++)
173 {
174 uint8 a_segno = *((uint8 *) (walbuf++));
175 uint8 a_action = *((uint8 *) (walbuf++));
176 GinPostingList *newseg = NULL;
177 int newsegsize = 0;
178 ItemPointerData *items = NULL;
179 uint16 nitems = 0;
180 ItemPointerData *olditems;
181 int nolditems;
182 ItemPointerData *newitems;
183 int nnewitems;
184 int segsize;
185
186 /* Extract all the information we need from the WAL record */
187 if (a_action == GIN_SEGMENT_INSERT ||
188 a_action == GIN_SEGMENT_REPLACE)
189 {
190 newseg = (GinPostingList *) walbuf;
191 newsegsize = SizeOfGinPostingList(newseg);
192 walbuf += SHORTALIGN(newsegsize);
193 }
194
195 if (a_action == GIN_SEGMENT_ADDITEMS)
196 {
197 memcpy(&nitems, walbuf, sizeof(uint16));
198 walbuf += sizeof(uint16);
199 items = (ItemPointerData *) walbuf;
200 walbuf += nitems * sizeof(ItemPointerData);
201 }
202
203 /* Skip to the segment that this action concerns */
204 Assert(segno <= a_segno);
205 while (segno < a_segno)
206 {
207 /*
208 * Once modification is started and page tail is copied, we've to
209 * copy unmodified segments.
210 */
211 segsize = SizeOfGinPostingList(oldseg);
212 if (tailCopy)
213 {
214 Assert(writePtr + segsize < PageGetSpecialPointer(page));
215 memcpy(writePtr, (Pointer) oldseg, segsize);
216 }
217 writePtr += segsize;
218 oldseg = GinNextPostingListSegment(oldseg);
219 segno++;
220 }
221
222 /*
223 * ADDITEMS action is handled like REPLACE, but the new segment to
224 * replace the old one is reconstructed using the old segment from
225 * disk and the new items from the WAL record.
226 */
227 if (a_action == GIN_SEGMENT_ADDITEMS)
228 {
229 int npacked;
230
231 olditems = ginPostingListDecode(oldseg, &nolditems);
232
233 newitems = ginMergeItemPointers(items, nitems,
234 olditems, nolditems,
235 &nnewitems);
236 Assert(nnewitems == nolditems + nitems);
237
238 newseg = ginCompressPostingList(newitems, nnewitems,
239 BLCKSZ, &npacked);
240 Assert(npacked == nnewitems);
241
242 newsegsize = SizeOfGinPostingList(newseg);
243 a_action = GIN_SEGMENT_REPLACE;
244 }
245
246 segptr = (Pointer) oldseg;
247 if (segptr != segmentend)
248 segsize = SizeOfGinPostingList(oldseg);
249 else
250 {
251 /*
252 * Positioned after the last existing segment. Only INSERTs
253 * expected here.
254 */
255 Assert(a_action == GIN_SEGMENT_INSERT);
256 segsize = 0;
257 }
258
259 /*
260 * We're about to start modification of the page. So, copy tail of
261 * the page if it's not done already.
262 */
263 if (!tailCopy && segptr != segmentend)
264 {
265 int tailSize = segmentend - segptr;
266
267 tailCopy = (Pointer) palloc(tailSize);
268 memcpy(tailCopy, segptr, tailSize);
269 segptr = tailCopy;
270 oldseg = (GinPostingList *) segptr;
271 segmentend = segptr + tailSize;
272 }
273
274 switch (a_action)
275 {
276 case GIN_SEGMENT_DELETE:
277 segptr += segsize;
278 segno++;
279 break;
280
281 case GIN_SEGMENT_INSERT:
282 /* copy the new segment in place */
283 Assert(writePtr + newsegsize <= PageGetSpecialPointer(page));
284 memcpy(writePtr, newseg, newsegsize);
285 writePtr += newsegsize;
286 break;
287
288 case GIN_SEGMENT_REPLACE:
289 /* copy the new version of segment in place */
290 Assert(writePtr + newsegsize <= PageGetSpecialPointer(page));
291 memcpy(writePtr, newseg, newsegsize);
292 writePtr += newsegsize;
293 segptr += segsize;
294 segno++;
295 break;
296
297 default:
298 elog(ERROR, "unexpected GIN leaf action: %u", a_action);
299 }
300 oldseg = (GinPostingList *) segptr;
301 }
302
303 /* Copy the rest of unmodified segments if any. */
304 segptr = (Pointer) oldseg;
305 if (segptr != segmentend && tailCopy)
306 {
307 int restSize = segmentend - segptr;
308
309 Assert(writePtr + restSize <= PageGetSpecialPointer(page));
310 memcpy(writePtr, segptr, restSize);
311 writePtr += restSize;
312 }
313
314 totalsize = writePtr - (Pointer) GinDataLeafPageGetPostingList(page);
315 GinDataPageSetDataSize(page, totalsize);
316}
317
318static void
319ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
320{
321 Page page = BufferGetPage(buffer);
322
323 if (isLeaf)
324 {
325 ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
326
327 Assert(GinPageIsLeaf(page));
328
329 ginRedoRecompress(page, data);
330 }
331 else
332 {
333 ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
334 PostingItem *oldpitem;
335
336 Assert(!GinPageIsLeaf(page));
337
338 /* update link to right page after split */
339 oldpitem = GinDataPageGetPostingItem(page, data->offset);
340 PostingItemSetBlockNumber(oldpitem, rightblkno);
341
342 GinDataPageAddPostingItem(page, &data->newitem, data->offset);
343 }
344}
345
346static void
347ginRedoInsert(XLogReaderState *record)
348{
349 XLogRecPtr lsn = record->EndRecPtr;
350 ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
351 Buffer buffer;
352#ifdef NOT_USED
353 BlockNumber leftChildBlkno = InvalidBlockNumber;
354#endif
355 BlockNumber rightChildBlkno = InvalidBlockNumber;
356 bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
357
358 /*
359 * First clear incomplete-split flag on child page if this finishes a
360 * split.
361 */
362 if (!isLeaf)
363 {
364 char *payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
365
366#ifdef NOT_USED
367 leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
368#endif
369 payload += sizeof(BlockIdData);
370 rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
371 payload += sizeof(BlockIdData);
372
373 ginRedoClearIncompleteSplit(record, 1);
374 }
375
376 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
377 {
378 Page page = BufferGetPage(buffer);
379 Size len;
380 char *payload = XLogRecGetBlockData(record, 0, &len);
381
382 /* How to insert the payload is tree-type specific */
383 if (data->flags & GIN_INSERT_ISDATA)
384 {
385 Assert(GinPageIsData(page));
386 ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
387 }
388 else
389 {
390 Assert(!GinPageIsData(page));
391 ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
392 }
393
394 PageSetLSN(page, lsn);
395 MarkBufferDirty(buffer);
396 }
397 if (BufferIsValid(buffer))
398 UnlockReleaseBuffer(buffer);
399}
400
401static void
402ginRedoSplit(XLogReaderState *record)
403{
404 ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
405 Buffer lbuffer,
406 rbuffer,
407 rootbuf;
408 bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
409 bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
410
411 /*
412 * First clear incomplete-split flag on child page if this finishes a
413 * split
414 */
415 if (!isLeaf)
416 ginRedoClearIncompleteSplit(record, 3);
417
418 if (XLogReadBufferForRedo(record, 0, &lbuffer) != BLK_RESTORED)
419 elog(ERROR, "GIN split record did not contain a full-page image of left page");
420
421 if (XLogReadBufferForRedo(record, 1, &rbuffer) != BLK_RESTORED)
422 elog(ERROR, "GIN split record did not contain a full-page image of right page");
423
424 if (isRoot)
425 {
426 if (XLogReadBufferForRedo(record, 2, &rootbuf) != BLK_RESTORED)
427 elog(ERROR, "GIN split record did not contain a full-page image of root page");
428 UnlockReleaseBuffer(rootbuf);
429 }
430
431 UnlockReleaseBuffer(rbuffer);
432 UnlockReleaseBuffer(lbuffer);
433}
434
435/*
436 * VACUUM_PAGE record contains simply a full image of the page, similar to
437 * an XLOG_FPI record.
438 */
439static void
440ginRedoVacuumPage(XLogReaderState *record)
441{
442 Buffer buffer;
443
444 if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
445 {
446 elog(ERROR, "replay of gin entry tree page vacuum did not restore the page");
447 }
448 UnlockReleaseBuffer(buffer);
449}
450
451static void
452ginRedoVacuumDataLeafPage(XLogReaderState *record)
453{
454 XLogRecPtr lsn = record->EndRecPtr;
455 Buffer buffer;
456
457 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
458 {
459 Page page = BufferGetPage(buffer);
460 Size len;
461 ginxlogVacuumDataLeafPage *xlrec;
462
463 xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetBlockData(record, 0, &len);
464
465 Assert(GinPageIsLeaf(page));
466 Assert(GinPageIsData(page));
467
468 ginRedoRecompress(page, &xlrec->data);
469 PageSetLSN(page, lsn);
470 MarkBufferDirty(buffer);
471 }
472 if (BufferIsValid(buffer))
473 UnlockReleaseBuffer(buffer);
474}
475
476static void
477ginRedoDeletePage(XLogReaderState *record)
478{
479 XLogRecPtr lsn = record->EndRecPtr;
480 ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
481 Buffer dbuffer;
482 Buffer pbuffer;
483 Buffer lbuffer;
484 Page page;
485
486 /*
487 * Lock left page first in order to prevent possible deadlock with
488 * ginStepRight().
489 */
490 if (XLogReadBufferForRedo(record, 2, &lbuffer) == BLK_NEEDS_REDO)
491 {
492 page = BufferGetPage(lbuffer);
493 Assert(GinPageIsData(page));
494 GinPageGetOpaque(page)->rightlink = data->rightLink;
495 PageSetLSN(page, lsn);
496 MarkBufferDirty(lbuffer);
497 }
498
499 if (XLogReadBufferForRedo(record, 0, &dbuffer) == BLK_NEEDS_REDO)
500 {
501 page = BufferGetPage(dbuffer);
502 Assert(GinPageIsData(page));
503 GinPageGetOpaque(page)->flags = GIN_DELETED;
504 GinPageSetDeleteXid(page, data->deleteXid);
505 PageSetLSN(page, lsn);
506 MarkBufferDirty(dbuffer);
507 }
508
509 if (XLogReadBufferForRedo(record, 1, &pbuffer) == BLK_NEEDS_REDO)
510 {
511 page = BufferGetPage(pbuffer);
512 Assert(GinPageIsData(page));
513 Assert(!GinPageIsLeaf(page));
514 GinPageDeletePostingItem(page, data->parentOffset);
515 PageSetLSN(page, lsn);
516 MarkBufferDirty(pbuffer);
517 }
518
519 if (BufferIsValid(lbuffer))
520 UnlockReleaseBuffer(lbuffer);
521 if (BufferIsValid(pbuffer))
522 UnlockReleaseBuffer(pbuffer);
523 if (BufferIsValid(dbuffer))
524 UnlockReleaseBuffer(dbuffer);
525}
526
527static void
528ginRedoUpdateMetapage(XLogReaderState *record)
529{
530 XLogRecPtr lsn = record->EndRecPtr;
531 ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
532 Buffer metabuffer;
533 Page metapage;
534 Buffer buffer;
535
536 /*
537 * Restore the metapage. This is essentially the same as a full-page
538 * image, so restore the metapage unconditionally without looking at the
539 * LSN, to avoid torn page hazards.
540 */
541 metabuffer = XLogInitBufferForRedo(record, 0);
542 Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
543 metapage = BufferGetPage(metabuffer);
544
545 GinInitMetabuffer(metabuffer);
546 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
547 PageSetLSN(metapage, lsn);
548 MarkBufferDirty(metabuffer);
549
550 if (data->ntuples > 0)
551 {
552 /*
553 * insert into tail page
554 */
555 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
556 {
557 Page page = BufferGetPage(buffer);
558 OffsetNumber off;
559 int i;
560 Size tupsize;
561 char *payload;
562 IndexTuple tuples;
563 Size totaltupsize;
564
565 payload = XLogRecGetBlockData(record, 1, &totaltupsize);
566 tuples = (IndexTuple) payload;
567
568 if (PageIsEmpty(page))
569 off = FirstOffsetNumber;
570 else
571 off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
572
573 for (i = 0; i < data->ntuples; i++)
574 {
575 tupsize = IndexTupleSize(tuples);
576
577 if (PageAddItem(page, (Item) tuples, tupsize, off,
578 false, false) == InvalidOffsetNumber)
579 elog(ERROR, "failed to add item to index page");
580
581 tuples = (IndexTuple) (((char *) tuples) + tupsize);
582
583 off++;
584 }
585 Assert(payload + totaltupsize == (char *) tuples);
586
587 /*
588 * Increase counter of heap tuples
589 */
590 GinPageGetOpaque(page)->maxoff++;
591
592 PageSetLSN(page, lsn);
593 MarkBufferDirty(buffer);
594 }
595 if (BufferIsValid(buffer))
596 UnlockReleaseBuffer(buffer);
597 }
598 else if (data->prevTail != InvalidBlockNumber)
599 {
600 /*
601 * New tail
602 */
603 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
604 {
605 Page page = BufferGetPage(buffer);
606
607 GinPageGetOpaque(page)->rightlink = data->newRightlink;
608
609 PageSetLSN(page, lsn);
610 MarkBufferDirty(buffer);
611 }
612 if (BufferIsValid(buffer))
613 UnlockReleaseBuffer(buffer);
614 }
615
616 UnlockReleaseBuffer(metabuffer);
617}
618
619static void
620ginRedoInsertListPage(XLogReaderState *record)
621{
622 XLogRecPtr lsn = record->EndRecPtr;
623 ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
624 Buffer buffer;
625 Page page;
626 OffsetNumber l,
627 off = FirstOffsetNumber;
628 int i,
629 tupsize;
630 char *payload;
631 IndexTuple tuples;
632 Size totaltupsize;
633
634 /* We always re-initialize the page. */
635 buffer = XLogInitBufferForRedo(record, 0);
636 page = BufferGetPage(buffer);
637
638 GinInitBuffer(buffer, GIN_LIST);
639 GinPageGetOpaque(page)->rightlink = data->rightlink;
640 if (data->rightlink == InvalidBlockNumber)
641 {
642 /* tail of sublist */
643 GinPageSetFullRow(page);
644 GinPageGetOpaque(page)->maxoff = 1;
645 }
646 else
647 {
648 GinPageGetOpaque(page)->maxoff = 0;
649 }
650
651 payload = XLogRecGetBlockData(record, 0, &totaltupsize);
652
653 tuples = (IndexTuple) payload;
654 for (i = 0; i < data->ntuples; i++)
655 {
656 tupsize = IndexTupleSize(tuples);
657
658 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
659
660 if (l == InvalidOffsetNumber)
661 elog(ERROR, "failed to add item to index page");
662
663 tuples = (IndexTuple) (((char *) tuples) + tupsize);
664 off++;
665 }
666 Assert((char *) tuples == payload + totaltupsize);
667
668 PageSetLSN(page, lsn);
669 MarkBufferDirty(buffer);
670
671 UnlockReleaseBuffer(buffer);
672}
673
674static void
675ginRedoDeleteListPages(XLogReaderState *record)
676{
677 XLogRecPtr lsn = record->EndRecPtr;
678 ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
679 Buffer metabuffer;
680 Page metapage;
681 int i;
682
683 metabuffer = XLogInitBufferForRedo(record, 0);
684 Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
685 metapage = BufferGetPage(metabuffer);
686
687 GinInitMetabuffer(metabuffer);
688
689 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
690 PageSetLSN(metapage, lsn);
691 MarkBufferDirty(metabuffer);
692
693 /*
694 * In normal operation, shiftList() takes exclusive lock on all the
695 * pages-to-be-deleted simultaneously. During replay, however, it should
696 * be all right to lock them one at a time. This is dependent on the fact
697 * that we are deleting pages from the head of the list, and that readers
698 * share-lock the next page before releasing the one they are on. So we
699 * cannot get past a reader that is on, or due to visit, any page we are
700 * going to delete. New incoming readers will block behind our metapage
701 * lock and then see a fully updated page list.
702 *
703 * No full-page images are taken of the deleted pages. Instead, they are
704 * re-initialized as empty, deleted pages. Their right-links don't need to
705 * be preserved, because no new readers can see the pages, as explained
706 * above.
707 */
708 for (i = 0; i < data->ndeleted; i++)
709 {
710 Buffer buffer;
711 Page page;
712
713 buffer = XLogInitBufferForRedo(record, i + 1);
714 page = BufferGetPage(buffer);
715 GinInitBuffer(buffer, GIN_DELETED);
716
717 PageSetLSN(page, lsn);
718 MarkBufferDirty(buffer);
719
720 UnlockReleaseBuffer(buffer);
721 }
722 UnlockReleaseBuffer(metabuffer);
723}
724
725void
726gin_redo(XLogReaderState *record)
727{
728 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
729 MemoryContext oldCtx;
730
731 /*
732 * GIN indexes do not require any conflict processing. NB: If we ever
733 * implement a similar optimization as we have in b-tree, and remove
734 * killed tuples outside VACUUM, we'll need to handle that here.
735 */
736
737 oldCtx = MemoryContextSwitchTo(opCtx);
738 switch (info)
739 {
740 case XLOG_GIN_CREATE_PTREE:
741 ginRedoCreatePTree(record);
742 break;
743 case XLOG_GIN_INSERT:
744 ginRedoInsert(record);
745 break;
746 case XLOG_GIN_SPLIT:
747 ginRedoSplit(record);
748 break;
749 case XLOG_GIN_VACUUM_PAGE:
750 ginRedoVacuumPage(record);
751 break;
752 case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
753 ginRedoVacuumDataLeafPage(record);
754 break;
755 case XLOG_GIN_DELETE_PAGE:
756 ginRedoDeletePage(record);
757 break;
758 case XLOG_GIN_UPDATE_META_PAGE:
759 ginRedoUpdateMetapage(record);
760 break;
761 case XLOG_GIN_INSERT_LISTPAGE:
762 ginRedoInsertListPage(record);
763 break;
764 case XLOG_GIN_DELETE_LISTPAGE:
765 ginRedoDeleteListPages(record);
766 break;
767 default:
768 elog(PANIC, "gin_redo: unknown op code %u", info);
769 }
770 MemoryContextSwitchTo(oldCtx);
771 MemoryContextReset(opCtx);
772}
773
774void
775gin_xlog_startup(void)
776{
777 opCtx = AllocSetContextCreate(CurrentMemoryContext,
778 "GIN recovery temporary context",
779 ALLOCSET_DEFAULT_SIZES);
780}
781
782void
783gin_xlog_cleanup(void)
784{
785 MemoryContextDelete(opCtx);
786 opCtx = NULL;
787}
788
789/*
790 * Mask a GIN page before running consistency checks on it.
791 */
792void
793gin_mask(char *pagedata, BlockNumber blkno)
794{
795 Page page = (Page) pagedata;
796 PageHeader pagehdr = (PageHeader) page;
797 GinPageOpaque opaque;
798
799 mask_page_lsn_and_checksum(page);
800 opaque = GinPageGetOpaque(page);
801
802 mask_page_hint_bits(page);
803
804 /*
805 * For a GIN_DELETED page, the page is initialized to empty. Hence, mask
806 * the whole page content. For other pages, mask the hole if pd_lower
807 * appears to have been set correctly.
808 */
809 if (opaque->flags & GIN_DELETED)
810 mask_page_content(page);
811 else if (pagehdr->pd_lower > SizeOfPageHeaderData)
812 mask_unused_space(page);
813}
814