1/*-------------------------------------------------------------------------
2 *
3 * spgxlog.c
4 * WAL replay logic for SP-GiST
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgxlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/bufmask.h"
18#include "access/spgist_private.h"
19#include "access/spgxlog.h"
20#include "access/transam.h"
21#include "access/xlog.h"
22#include "access/xlogutils.h"
23#include "storage/standby.h"
24#include "utils/memutils.h"
25
26
27static MemoryContext opCtx; /* working memory for operations */
28
29
30/*
31 * Prepare a dummy SpGistState, with just the minimum info needed for replay.
32 *
33 * At present, all we need is enough info to support spgFormDeadTuple(),
34 * plus the isBuild flag.
35 */
36static void
37fillFakeState(SpGistState *state, spgxlogState stateSrc)
38{
39 memset(state, 0, sizeof(*state));
40
41 state->myXid = stateSrc.myXid;
42 state->isBuild = stateSrc.isBuild;
43 state->deadTupleStorage = palloc0(SGDTSIZE);
44}
45
46/*
47 * Add a leaf tuple, or replace an existing placeholder tuple. This is used
48 * to replay SpGistPageAddNewItem() operations. If the offset points at an
49 * existing tuple, it had better be a placeholder tuple.
50 */
51static void
52addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
53{
54 if (offset <= PageGetMaxOffsetNumber(page))
55 {
56 SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
57 PageGetItemId(page, offset));
58
59 if (dt->tupstate != SPGIST_PLACEHOLDER)
60 elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
61
62 Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
63 SpGistPageGetOpaque(page)->nPlaceholder--;
64
65 PageIndexTupleDelete(page, offset);
66 }
67
68 Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
69
70 if (PageAddItem(page, tuple, size, offset, false, false) != offset)
71 elog(ERROR, "failed to add item of size %u to SPGiST index page",
72 size);
73}
74
75static void
76spgRedoAddLeaf(XLogReaderState *record)
77{
78 XLogRecPtr lsn = record->EndRecPtr;
79 char *ptr = XLogRecGetData(record);
80 spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
81 char *leafTuple;
82 SpGistLeafTupleData leafTupleHdr;
83 Buffer buffer;
84 Page page;
85 XLogRedoAction action;
86
87 ptr += sizeof(spgxlogAddLeaf);
88 leafTuple = ptr;
89 /* the leaf tuple is unaligned, so make a copy to access its header */
90 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
91
92 /*
93 * In normal operation we would have both current and parent pages locked
94 * simultaneously; but in WAL replay it should be safe to update the leaf
95 * page before updating the parent.
96 */
97 if (xldata->newPage)
98 {
99 buffer = XLogInitBufferForRedo(record, 0);
100 SpGistInitBuffer(buffer,
101 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
102 action = BLK_NEEDS_REDO;
103 }
104 else
105 action = XLogReadBufferForRedo(record, 0, &buffer);
106
107 if (action == BLK_NEEDS_REDO)
108 {
109 page = BufferGetPage(buffer);
110
111 /* insert new tuple */
112 if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
113 {
114 /* normal cases, tuple was added by SpGistPageAddNewItem */
115 addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
116 xldata->offnumLeaf);
117
118 /* update head tuple's chain link if needed */
119 if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
120 {
121 SpGistLeafTuple head;
122
123 head = (SpGistLeafTuple) PageGetItem(page,
124 PageGetItemId(page, xldata->offnumHeadLeaf));
125 Assert(head->nextOffset == leafTupleHdr.nextOffset);
126 head->nextOffset = xldata->offnumLeaf;
127 }
128 }
129 else
130 {
131 /* replacing a DEAD tuple */
132 PageIndexTupleDelete(page, xldata->offnumLeaf);
133 if (PageAddItem(page,
134 (Item) leafTuple, leafTupleHdr.size,
135 xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
136 elog(ERROR, "failed to add item of size %u to SPGiST index page",
137 leafTupleHdr.size);
138 }
139
140 PageSetLSN(page, lsn);
141 MarkBufferDirty(buffer);
142 }
143 if (BufferIsValid(buffer))
144 UnlockReleaseBuffer(buffer);
145
146 /* update parent downlink if necessary */
147 if (xldata->offnumParent != InvalidOffsetNumber)
148 {
149 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
150 {
151 SpGistInnerTuple tuple;
152 BlockNumber blknoLeaf;
153
154 XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
155
156 page = BufferGetPage(buffer);
157
158 tuple = (SpGistInnerTuple) PageGetItem(page,
159 PageGetItemId(page, xldata->offnumParent));
160
161 spgUpdateNodeLink(tuple, xldata->nodeI,
162 blknoLeaf, xldata->offnumLeaf);
163
164 PageSetLSN(page, lsn);
165 MarkBufferDirty(buffer);
166 }
167 if (BufferIsValid(buffer))
168 UnlockReleaseBuffer(buffer);
169 }
170}
171
172static void
173spgRedoMoveLeafs(XLogReaderState *record)
174{
175 XLogRecPtr lsn = record->EndRecPtr;
176 char *ptr = XLogRecGetData(record);
177 spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
178 SpGistState state;
179 OffsetNumber *toDelete;
180 OffsetNumber *toInsert;
181 int nInsert;
182 Buffer buffer;
183 Page page;
184 XLogRedoAction action;
185 BlockNumber blknoDst;
186
187 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
188
189 fillFakeState(&state, xldata->stateSrc);
190
191 nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
192
193 ptr += SizeOfSpgxlogMoveLeafs;
194 toDelete = (OffsetNumber *) ptr;
195 ptr += sizeof(OffsetNumber) * xldata->nMoves;
196 toInsert = (OffsetNumber *) ptr;
197 ptr += sizeof(OffsetNumber) * nInsert;
198
199 /* now ptr points to the list of leaf tuples */
200
201 /*
202 * In normal operation we would have all three pages (source, dest, and
203 * parent) locked simultaneously; but in WAL replay it should be safe to
204 * update them one at a time, as long as we do it in the right order.
205 */
206
207 /* Insert tuples on the dest page (do first, so redirect is valid) */
208 if (xldata->newPage)
209 {
210 buffer = XLogInitBufferForRedo(record, 1);
211 SpGistInitBuffer(buffer,
212 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
213 action = BLK_NEEDS_REDO;
214 }
215 else
216 action = XLogReadBufferForRedo(record, 1, &buffer);
217
218 if (action == BLK_NEEDS_REDO)
219 {
220 int i;
221
222 page = BufferGetPage(buffer);
223
224 for (i = 0; i < nInsert; i++)
225 {
226 char *leafTuple;
227 SpGistLeafTupleData leafTupleHdr;
228
229 /*
230 * the tuples are not aligned, so must copy to access the size
231 * field.
232 */
233 leafTuple = ptr;
234 memcpy(&leafTupleHdr, leafTuple,
235 sizeof(SpGistLeafTupleData));
236
237 addOrReplaceTuple(page, (Item) leafTuple,
238 leafTupleHdr.size, toInsert[i]);
239 ptr += leafTupleHdr.size;
240 }
241
242 PageSetLSN(page, lsn);
243 MarkBufferDirty(buffer);
244 }
245 if (BufferIsValid(buffer))
246 UnlockReleaseBuffer(buffer);
247
248 /* Delete tuples from the source page, inserting a redirection pointer */
249 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
250 {
251 page = BufferGetPage(buffer);
252
253 spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
254 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
255 SPGIST_PLACEHOLDER,
256 blknoDst,
257 toInsert[nInsert - 1]);
258
259 PageSetLSN(page, lsn);
260 MarkBufferDirty(buffer);
261 }
262 if (BufferIsValid(buffer))
263 UnlockReleaseBuffer(buffer);
264
265 /* And update the parent downlink */
266 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
267 {
268 SpGistInnerTuple tuple;
269
270 page = BufferGetPage(buffer);
271
272 tuple = (SpGistInnerTuple) PageGetItem(page,
273 PageGetItemId(page, xldata->offnumParent));
274
275 spgUpdateNodeLink(tuple, xldata->nodeI,
276 blknoDst, toInsert[nInsert - 1]);
277
278 PageSetLSN(page, lsn);
279 MarkBufferDirty(buffer);
280 }
281 if (BufferIsValid(buffer))
282 UnlockReleaseBuffer(buffer);
283}
284
285static void
286spgRedoAddNode(XLogReaderState *record)
287{
288 XLogRecPtr lsn = record->EndRecPtr;
289 char *ptr = XLogRecGetData(record);
290 spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
291 char *innerTuple;
292 SpGistInnerTupleData innerTupleHdr;
293 SpGistState state;
294 Buffer buffer;
295 Page page;
296 XLogRedoAction action;
297
298 ptr += sizeof(spgxlogAddNode);
299 innerTuple = ptr;
300 /* the tuple is unaligned, so make a copy to access its header */
301 memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
302
303 fillFakeState(&state, xldata->stateSrc);
304
305 if (!XLogRecHasBlockRef(record, 1))
306 {
307 /* update in place */
308 Assert(xldata->parentBlk == -1);
309 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
310 {
311 page = BufferGetPage(buffer);
312
313 PageIndexTupleDelete(page, xldata->offnum);
314 if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
315 xldata->offnum,
316 false, false) != xldata->offnum)
317 elog(ERROR, "failed to add item of size %u to SPGiST index page",
318 innerTupleHdr.size);
319
320 PageSetLSN(page, lsn);
321 MarkBufferDirty(buffer);
322 }
323 if (BufferIsValid(buffer))
324 UnlockReleaseBuffer(buffer);
325 }
326 else
327 {
328 BlockNumber blkno;
329 BlockNumber blknoNew;
330
331 XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
332 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
333
334 /*
335 * In normal operation we would have all three pages (source, dest,
336 * and parent) locked simultaneously; but in WAL replay it should be
337 * safe to update them one at a time, as long as we do it in the right
338 * order. We must insert the new tuple before replacing the old tuple
339 * with the redirect tuple.
340 */
341
342 /* Install new tuple first so redirect is valid */
343 if (xldata->newPage)
344 {
345 /* AddNode is not used for nulls pages */
346 buffer = XLogInitBufferForRedo(record, 1);
347 SpGistInitBuffer(buffer, 0);
348 action = BLK_NEEDS_REDO;
349 }
350 else
351 action = XLogReadBufferForRedo(record, 1, &buffer);
352 if (action == BLK_NEEDS_REDO)
353 {
354 page = BufferGetPage(buffer);
355
356 addOrReplaceTuple(page, (Item) innerTuple,
357 innerTupleHdr.size, xldata->offnumNew);
358
359 /*
360 * If parent is in this same page, update it now.
361 */
362 if (xldata->parentBlk == 1)
363 {
364 SpGistInnerTuple parentTuple;
365
366 parentTuple = (SpGistInnerTuple) PageGetItem(page,
367 PageGetItemId(page, xldata->offnumParent));
368
369 spgUpdateNodeLink(parentTuple, xldata->nodeI,
370 blknoNew, xldata->offnumNew);
371 }
372 PageSetLSN(page, lsn);
373 MarkBufferDirty(buffer);
374 }
375 if (BufferIsValid(buffer))
376 UnlockReleaseBuffer(buffer);
377
378 /* Delete old tuple, replacing it with redirect or placeholder tuple */
379 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
380 {
381 SpGistDeadTuple dt;
382
383 page = BufferGetPage(buffer);
384
385 if (state.isBuild)
386 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
387 InvalidBlockNumber,
388 InvalidOffsetNumber);
389 else
390 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
391 blknoNew,
392 xldata->offnumNew);
393
394 PageIndexTupleDelete(page, xldata->offnum);
395 if (PageAddItem(page, (Item) dt, dt->size,
396 xldata->offnum,
397 false, false) != xldata->offnum)
398 elog(ERROR, "failed to add item of size %u to SPGiST index page",
399 dt->size);
400
401 if (state.isBuild)
402 SpGistPageGetOpaque(page)->nPlaceholder++;
403 else
404 SpGistPageGetOpaque(page)->nRedirection++;
405
406 /*
407 * If parent is in this same page, update it now.
408 */
409 if (xldata->parentBlk == 0)
410 {
411 SpGistInnerTuple parentTuple;
412
413 parentTuple = (SpGistInnerTuple) PageGetItem(page,
414 PageGetItemId(page, xldata->offnumParent));
415
416 spgUpdateNodeLink(parentTuple, xldata->nodeI,
417 blknoNew, xldata->offnumNew);
418 }
419 PageSetLSN(page, lsn);
420 MarkBufferDirty(buffer);
421 }
422 if (BufferIsValid(buffer))
423 UnlockReleaseBuffer(buffer);
424
425 /*
426 * Update parent downlink (if we didn't do it as part of the source or
427 * destination page update already).
428 */
429 if (xldata->parentBlk == 2)
430 {
431 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
432 {
433 SpGistInnerTuple parentTuple;
434
435 page = BufferGetPage(buffer);
436
437 parentTuple = (SpGistInnerTuple) PageGetItem(page,
438 PageGetItemId(page, xldata->offnumParent));
439
440 spgUpdateNodeLink(parentTuple, xldata->nodeI,
441 blknoNew, xldata->offnumNew);
442
443 PageSetLSN(page, lsn);
444 MarkBufferDirty(buffer);
445 }
446 if (BufferIsValid(buffer))
447 UnlockReleaseBuffer(buffer);
448 }
449 }
450}
451
452static void
453spgRedoSplitTuple(XLogReaderState *record)
454{
455 XLogRecPtr lsn = record->EndRecPtr;
456 char *ptr = XLogRecGetData(record);
457 spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
458 char *prefixTuple;
459 SpGistInnerTupleData prefixTupleHdr;
460 char *postfixTuple;
461 SpGistInnerTupleData postfixTupleHdr;
462 Buffer buffer;
463 Page page;
464 XLogRedoAction action;
465
466 ptr += sizeof(spgxlogSplitTuple);
467 prefixTuple = ptr;
468 /* the prefix tuple is unaligned, so make a copy to access its header */
469 memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
470 ptr += prefixTupleHdr.size;
471 postfixTuple = ptr;
472 /* postfix tuple is also unaligned */
473 memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
474
475 /*
476 * In normal operation we would have both pages locked simultaneously; but
477 * in WAL replay it should be safe to update them one at a time, as long
478 * as we do it in the right order.
479 */
480
481 /* insert postfix tuple first to avoid dangling link */
482 if (!xldata->postfixBlkSame)
483 {
484 if (xldata->newPage)
485 {
486 buffer = XLogInitBufferForRedo(record, 1);
487 /* SplitTuple is not used for nulls pages */
488 SpGistInitBuffer(buffer, 0);
489 action = BLK_NEEDS_REDO;
490 }
491 else
492 action = XLogReadBufferForRedo(record, 1, &buffer);
493 if (action == BLK_NEEDS_REDO)
494 {
495 page = BufferGetPage(buffer);
496
497 addOrReplaceTuple(page, (Item) postfixTuple,
498 postfixTupleHdr.size, xldata->offnumPostfix);
499
500 PageSetLSN(page, lsn);
501 MarkBufferDirty(buffer);
502 }
503 if (BufferIsValid(buffer))
504 UnlockReleaseBuffer(buffer);
505 }
506
507 /* now handle the original page */
508 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
509 {
510 page = BufferGetPage(buffer);
511
512 PageIndexTupleDelete(page, xldata->offnumPrefix);
513 if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
514 xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
515 elog(ERROR, "failed to add item of size %u to SPGiST index page",
516 prefixTupleHdr.size);
517
518 if (xldata->postfixBlkSame)
519 addOrReplaceTuple(page, (Item) postfixTuple,
520 postfixTupleHdr.size,
521 xldata->offnumPostfix);
522
523 PageSetLSN(page, lsn);
524 MarkBufferDirty(buffer);
525 }
526 if (BufferIsValid(buffer))
527 UnlockReleaseBuffer(buffer);
528}
529
530static void
531spgRedoPickSplit(XLogReaderState *record)
532{
533 XLogRecPtr lsn = record->EndRecPtr;
534 char *ptr = XLogRecGetData(record);
535 spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
536 char *innerTuple;
537 SpGistInnerTupleData innerTupleHdr;
538 SpGistState state;
539 OffsetNumber *toDelete;
540 OffsetNumber *toInsert;
541 uint8 *leafPageSelect;
542 Buffer srcBuffer;
543 Buffer destBuffer;
544 Buffer innerBuffer;
545 Page srcPage;
546 Page destPage;
547 Page page;
548 int i;
549 BlockNumber blknoInner;
550 XLogRedoAction action;
551
552 XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
553
554 fillFakeState(&state, xldata->stateSrc);
555
556 ptr += SizeOfSpgxlogPickSplit;
557 toDelete = (OffsetNumber *) ptr;
558 ptr += sizeof(OffsetNumber) * xldata->nDelete;
559 toInsert = (OffsetNumber *) ptr;
560 ptr += sizeof(OffsetNumber) * xldata->nInsert;
561 leafPageSelect = (uint8 *) ptr;
562 ptr += sizeof(uint8) * xldata->nInsert;
563
564 innerTuple = ptr;
565 /* the inner tuple is unaligned, so make a copy to access its header */
566 memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
567 ptr += innerTupleHdr.size;
568
569 /* now ptr points to the list of leaf tuples */
570
571 if (xldata->isRootSplit)
572 {
573 /* when splitting root, we touch it only in the guise of new inner */
574 srcBuffer = InvalidBuffer;
575 srcPage = NULL;
576 }
577 else if (xldata->initSrc)
578 {
579 /* just re-init the source page */
580 srcBuffer = XLogInitBufferForRedo(record, 0);
581 srcPage = (Page) BufferGetPage(srcBuffer);
582
583 SpGistInitBuffer(srcBuffer,
584 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
585 /* don't update LSN etc till we're done with it */
586 }
587 else
588 {
589 /*
590 * Delete the specified tuples from source page. (In case we're in
591 * Hot Standby, we need to hold lock on the page till we're done
592 * inserting leaf tuples and the new inner tuple, else the added
593 * redirect tuple will be a dangling link.)
594 */
595 srcPage = NULL;
596 if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
597 {
598 srcPage = BufferGetPage(srcBuffer);
599
600 /*
601 * We have it a bit easier here than in doPickSplit(), because we
602 * know the inner tuple's location already, so we can inject the
603 * correct redirection tuple now.
604 */
605 if (!state.isBuild)
606 spgPageIndexMultiDelete(&state, srcPage,
607 toDelete, xldata->nDelete,
608 SPGIST_REDIRECT,
609 SPGIST_PLACEHOLDER,
610 blknoInner,
611 xldata->offnumInner);
612 else
613 spgPageIndexMultiDelete(&state, srcPage,
614 toDelete, xldata->nDelete,
615 SPGIST_PLACEHOLDER,
616 SPGIST_PLACEHOLDER,
617 InvalidBlockNumber,
618 InvalidOffsetNumber);
619
620 /* don't update LSN etc till we're done with it */
621 }
622 }
623
624 /* try to access dest page if any */
625 if (!XLogRecHasBlockRef(record, 1))
626 {
627 destBuffer = InvalidBuffer;
628 destPage = NULL;
629 }
630 else if (xldata->initDest)
631 {
632 /* just re-init the dest page */
633 destBuffer = XLogInitBufferForRedo(record, 1);
634 destPage = (Page) BufferGetPage(destBuffer);
635
636 SpGistInitBuffer(destBuffer,
637 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
638 /* don't update LSN etc till we're done with it */
639 }
640 else
641 {
642 /*
643 * We could probably release the page lock immediately in the
644 * full-page-image case, but for safety let's hold it till later.
645 */
646 if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
647 destPage = (Page) BufferGetPage(destBuffer);
648 else
649 destPage = NULL; /* don't do any page updates */
650 }
651
652 /* restore leaf tuples to src and/or dest page */
653 for (i = 0; i < xldata->nInsert; i++)
654 {
655 char *leafTuple;
656 SpGistLeafTupleData leafTupleHdr;
657
658 /* the tuples are not aligned, so must copy to access the size field. */
659 leafTuple = ptr;
660 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
661 ptr += leafTupleHdr.size;
662
663 page = leafPageSelect[i] ? destPage : srcPage;
664 if (page == NULL)
665 continue; /* no need to touch this page */
666
667 addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
668 toInsert[i]);
669 }
670
671 /* Now update src and dest page LSNs if needed */
672 if (srcPage != NULL)
673 {
674 PageSetLSN(srcPage, lsn);
675 MarkBufferDirty(srcBuffer);
676 }
677 if (destPage != NULL)
678 {
679 PageSetLSN(destPage, lsn);
680 MarkBufferDirty(destBuffer);
681 }
682
683 /* restore new inner tuple */
684 if (xldata->initInner)
685 {
686 innerBuffer = XLogInitBufferForRedo(record, 2);
687 SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
688 action = BLK_NEEDS_REDO;
689 }
690 else
691 action = XLogReadBufferForRedo(record, 2, &innerBuffer);
692
693 if (action == BLK_NEEDS_REDO)
694 {
695 page = BufferGetPage(innerBuffer);
696
697 addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
698 xldata->offnumInner);
699
700 /* if inner is also parent, update link while we're here */
701 if (xldata->innerIsParent)
702 {
703 SpGistInnerTuple parent;
704
705 parent = (SpGistInnerTuple) PageGetItem(page,
706 PageGetItemId(page, xldata->offnumParent));
707 spgUpdateNodeLink(parent, xldata->nodeI,
708 blknoInner, xldata->offnumInner);
709 }
710
711 PageSetLSN(page, lsn);
712 MarkBufferDirty(innerBuffer);
713 }
714 if (BufferIsValid(innerBuffer))
715 UnlockReleaseBuffer(innerBuffer);
716
717 /*
718 * Now we can release the leaf-page locks. It's okay to do this before
719 * updating the parent downlink.
720 */
721 if (BufferIsValid(srcBuffer))
722 UnlockReleaseBuffer(srcBuffer);
723 if (BufferIsValid(destBuffer))
724 UnlockReleaseBuffer(destBuffer);
725
726 /* update parent downlink, unless we did it above */
727 if (XLogRecHasBlockRef(record, 3))
728 {
729 Buffer parentBuffer;
730
731 if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
732 {
733 SpGistInnerTuple parent;
734
735 page = BufferGetPage(parentBuffer);
736
737 parent = (SpGistInnerTuple) PageGetItem(page,
738 PageGetItemId(page, xldata->offnumParent));
739 spgUpdateNodeLink(parent, xldata->nodeI,
740 blknoInner, xldata->offnumInner);
741
742 PageSetLSN(page, lsn);
743 MarkBufferDirty(parentBuffer);
744 }
745 if (BufferIsValid(parentBuffer))
746 UnlockReleaseBuffer(parentBuffer);
747 }
748 else
749 Assert(xldata->innerIsParent || xldata->isRootSplit);
750}
751
752static void
753spgRedoVacuumLeaf(XLogReaderState *record)
754{
755 XLogRecPtr lsn = record->EndRecPtr;
756 char *ptr = XLogRecGetData(record);
757 spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
758 OffsetNumber *toDead;
759 OffsetNumber *toPlaceholder;
760 OffsetNumber *moveSrc;
761 OffsetNumber *moveDest;
762 OffsetNumber *chainSrc;
763 OffsetNumber *chainDest;
764 SpGistState state;
765 Buffer buffer;
766 Page page;
767 int i;
768
769 fillFakeState(&state, xldata->stateSrc);
770
771 ptr += SizeOfSpgxlogVacuumLeaf;
772 toDead = (OffsetNumber *) ptr;
773 ptr += sizeof(OffsetNumber) * xldata->nDead;
774 toPlaceholder = (OffsetNumber *) ptr;
775 ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
776 moveSrc = (OffsetNumber *) ptr;
777 ptr += sizeof(OffsetNumber) * xldata->nMove;
778 moveDest = (OffsetNumber *) ptr;
779 ptr += sizeof(OffsetNumber) * xldata->nMove;
780 chainSrc = (OffsetNumber *) ptr;
781 ptr += sizeof(OffsetNumber) * xldata->nChain;
782 chainDest = (OffsetNumber *) ptr;
783
784 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
785 {
786 page = BufferGetPage(buffer);
787
788 spgPageIndexMultiDelete(&state, page,
789 toDead, xldata->nDead,
790 SPGIST_DEAD, SPGIST_DEAD,
791 InvalidBlockNumber,
792 InvalidOffsetNumber);
793
794 spgPageIndexMultiDelete(&state, page,
795 toPlaceholder, xldata->nPlaceholder,
796 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
797 InvalidBlockNumber,
798 InvalidOffsetNumber);
799
800 /* see comments in vacuumLeafPage() */
801 for (i = 0; i < xldata->nMove; i++)
802 {
803 ItemId idSrc = PageGetItemId(page, moveSrc[i]);
804 ItemId idDest = PageGetItemId(page, moveDest[i]);
805 ItemIdData tmp;
806
807 tmp = *idSrc;
808 *idSrc = *idDest;
809 *idDest = tmp;
810 }
811
812 spgPageIndexMultiDelete(&state, page,
813 moveSrc, xldata->nMove,
814 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
815 InvalidBlockNumber,
816 InvalidOffsetNumber);
817
818 for (i = 0; i < xldata->nChain; i++)
819 {
820 SpGistLeafTuple lt;
821
822 lt = (SpGistLeafTuple) PageGetItem(page,
823 PageGetItemId(page, chainSrc[i]));
824 Assert(lt->tupstate == SPGIST_LIVE);
825 lt->nextOffset = chainDest[i];
826 }
827
828 PageSetLSN(page, lsn);
829 MarkBufferDirty(buffer);
830 }
831 if (BufferIsValid(buffer))
832 UnlockReleaseBuffer(buffer);
833}
834
835static void
836spgRedoVacuumRoot(XLogReaderState *record)
837{
838 XLogRecPtr lsn = record->EndRecPtr;
839 char *ptr = XLogRecGetData(record);
840 spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
841 OffsetNumber *toDelete;
842 Buffer buffer;
843 Page page;
844
845 toDelete = xldata->offsets;
846
847 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
848 {
849 page = BufferGetPage(buffer);
850
851 /* The tuple numbers are in order */
852 PageIndexMultiDelete(page, toDelete, xldata->nDelete);
853
854 PageSetLSN(page, lsn);
855 MarkBufferDirty(buffer);
856 }
857 if (BufferIsValid(buffer))
858 UnlockReleaseBuffer(buffer);
859}
860
861static void
862spgRedoVacuumRedirect(XLogReaderState *record)
863{
864 XLogRecPtr lsn = record->EndRecPtr;
865 char *ptr = XLogRecGetData(record);
866 spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
867 OffsetNumber *itemToPlaceholder;
868 Buffer buffer;
869
870 itemToPlaceholder = xldata->offsets;
871
872 /*
873 * If any redirection tuples are being removed, make sure there are no
874 * live Hot Standby transactions that might need to see them.
875 */
876 if (InHotStandby)
877 {
878 if (TransactionIdIsValid(xldata->newestRedirectXid))
879 {
880 RelFileNode node;
881
882 XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
883 ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
884 node);
885 }
886 }
887
888 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
889 {
890 Page page = BufferGetPage(buffer);
891 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
892 int i;
893
894 /* Convert redirect pointers to plain placeholders */
895 for (i = 0; i < xldata->nToPlaceholder; i++)
896 {
897 SpGistDeadTuple dt;
898
899 dt = (SpGistDeadTuple) PageGetItem(page,
900 PageGetItemId(page, itemToPlaceholder[i]));
901 Assert(dt->tupstate == SPGIST_REDIRECT);
902 dt->tupstate = SPGIST_PLACEHOLDER;
903 ItemPointerSetInvalid(&dt->pointer);
904 }
905
906 Assert(opaque->nRedirection >= xldata->nToPlaceholder);
907 opaque->nRedirection -= xldata->nToPlaceholder;
908 opaque->nPlaceholder += xldata->nToPlaceholder;
909
910 /* Remove placeholder tuples at end of page */
911 if (xldata->firstPlaceholder != InvalidOffsetNumber)
912 {
913 int max = PageGetMaxOffsetNumber(page);
914 OffsetNumber *toDelete;
915
916 toDelete = palloc(sizeof(OffsetNumber) * max);
917
918 for (i = xldata->firstPlaceholder; i <= max; i++)
919 toDelete[i - xldata->firstPlaceholder] = i;
920
921 i = max - xldata->firstPlaceholder + 1;
922 Assert(opaque->nPlaceholder >= i);
923 opaque->nPlaceholder -= i;
924
925 /* The array is sorted, so can use PageIndexMultiDelete */
926 PageIndexMultiDelete(page, toDelete, i);
927
928 pfree(toDelete);
929 }
930
931 PageSetLSN(page, lsn);
932 MarkBufferDirty(buffer);
933 }
934 if (BufferIsValid(buffer))
935 UnlockReleaseBuffer(buffer);
936}
937
938void
939spg_redo(XLogReaderState *record)
940{
941 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
942 MemoryContext oldCxt;
943
944 oldCxt = MemoryContextSwitchTo(opCtx);
945 switch (info)
946 {
947 case XLOG_SPGIST_ADD_LEAF:
948 spgRedoAddLeaf(record);
949 break;
950 case XLOG_SPGIST_MOVE_LEAFS:
951 spgRedoMoveLeafs(record);
952 break;
953 case XLOG_SPGIST_ADD_NODE:
954 spgRedoAddNode(record);
955 break;
956 case XLOG_SPGIST_SPLIT_TUPLE:
957 spgRedoSplitTuple(record);
958 break;
959 case XLOG_SPGIST_PICKSPLIT:
960 spgRedoPickSplit(record);
961 break;
962 case XLOG_SPGIST_VACUUM_LEAF:
963 spgRedoVacuumLeaf(record);
964 break;
965 case XLOG_SPGIST_VACUUM_ROOT:
966 spgRedoVacuumRoot(record);
967 break;
968 case XLOG_SPGIST_VACUUM_REDIRECT:
969 spgRedoVacuumRedirect(record);
970 break;
971 default:
972 elog(PANIC, "spg_redo: unknown op code %u", info);
973 }
974
975 MemoryContextSwitchTo(oldCxt);
976 MemoryContextReset(opCtx);
977}
978
979void
980spg_xlog_startup(void)
981{
982 opCtx = AllocSetContextCreate(CurrentMemoryContext,
983 "SP-GiST temporary context",
984 ALLOCSET_DEFAULT_SIZES);
985}
986
987void
988spg_xlog_cleanup(void)
989{
990 MemoryContextDelete(opCtx);
991 opCtx = NULL;
992}
993
994/*
995 * Mask a SpGist page before performing consistency checks on it.
996 */
997void
998spg_mask(char *pagedata, BlockNumber blkno)
999{
1000 Page page = (Page) pagedata;
1001 PageHeader pagehdr = (PageHeader) page;
1002
1003 mask_page_lsn_and_checksum(page);
1004
1005 mask_page_hint_bits(page);
1006
1007 /*
1008 * Mask the unused space, but only if the page's pd_lower appears to have
1009 * been set correctly.
1010 */
1011 if (pagehdr->pd_lower > SizeOfPageHeaderData)
1012 mask_unused_space(page);
1013}
1014