1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * spgxlog.c |
4 | * WAL replay logic for SP-GiST |
5 | * |
6 | * |
7 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
8 | * Portions Copyright (c) 1994, Regents of the University of California |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/access/spgist/spgxlog.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | #include "postgres.h" |
16 | |
17 | #include "access/bufmask.h" |
18 | #include "access/spgist_private.h" |
19 | #include "access/spgxlog.h" |
20 | #include "access/transam.h" |
21 | #include "access/xlog.h" |
22 | #include "access/xlogutils.h" |
23 | #include "storage/standby.h" |
24 | #include "utils/memutils.h" |
25 | |
26 | |
27 | static MemoryContext opCtx; /* working memory for operations */ |
28 | |
29 | |
30 | /* |
31 | * Prepare a dummy SpGistState, with just the minimum info needed for replay. |
32 | * |
33 | * At present, all we need is enough info to support spgFormDeadTuple(), |
34 | * plus the isBuild flag. |
35 | */ |
36 | static void |
37 | fillFakeState(SpGistState *state, spgxlogState stateSrc) |
38 | { |
39 | memset(state, 0, sizeof(*state)); |
40 | |
41 | state->myXid = stateSrc.myXid; |
42 | state->isBuild = stateSrc.isBuild; |
43 | state->deadTupleStorage = palloc0(SGDTSIZE); |
44 | } |
45 | |
46 | /* |
47 | * Add a leaf tuple, or replace an existing placeholder tuple. This is used |
48 | * to replay SpGistPageAddNewItem() operations. If the offset points at an |
49 | * existing tuple, it had better be a placeholder tuple. |
50 | */ |
51 | static void |
52 | addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset) |
53 | { |
54 | if (offset <= PageGetMaxOffsetNumber(page)) |
55 | { |
56 | SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page, |
57 | PageGetItemId(page, offset)); |
58 | |
59 | if (dt->tupstate != SPGIST_PLACEHOLDER) |
60 | elog(ERROR, "SPGiST tuple to be replaced is not a placeholder" ); |
61 | |
62 | Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0); |
63 | SpGistPageGetOpaque(page)->nPlaceholder--; |
64 | |
65 | PageIndexTupleDelete(page, offset); |
66 | } |
67 | |
68 | Assert(offset <= PageGetMaxOffsetNumber(page) + 1); |
69 | |
70 | if (PageAddItem(page, tuple, size, offset, false, false) != offset) |
71 | elog(ERROR, "failed to add item of size %u to SPGiST index page" , |
72 | size); |
73 | } |
74 | |
75 | static void |
76 | spgRedoAddLeaf(XLogReaderState *record) |
77 | { |
78 | XLogRecPtr lsn = record->EndRecPtr; |
79 | char *ptr = XLogRecGetData(record); |
80 | spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr; |
81 | char *leafTuple; |
82 | SpGistLeafTupleData leafTupleHdr; |
83 | Buffer buffer; |
84 | Page page; |
85 | XLogRedoAction action; |
86 | |
87 | ptr += sizeof(spgxlogAddLeaf); |
88 | leafTuple = ptr; |
89 | /* the leaf tuple is unaligned, so make a copy to access its header */ |
90 | memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); |
91 | |
92 | /* |
93 | * In normal operation we would have both current and parent pages locked |
94 | * simultaneously; but in WAL replay it should be safe to update the leaf |
95 | * page before updating the parent. |
96 | */ |
97 | if (xldata->newPage) |
98 | { |
99 | buffer = XLogInitBufferForRedo(record, 0); |
100 | SpGistInitBuffer(buffer, |
101 | SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); |
102 | action = BLK_NEEDS_REDO; |
103 | } |
104 | else |
105 | action = XLogReadBufferForRedo(record, 0, &buffer); |
106 | |
107 | if (action == BLK_NEEDS_REDO) |
108 | { |
109 | page = BufferGetPage(buffer); |
110 | |
111 | /* insert new tuple */ |
112 | if (xldata->offnumLeaf != xldata->offnumHeadLeaf) |
113 | { |
114 | /* normal cases, tuple was added by SpGistPageAddNewItem */ |
115 | addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, |
116 | xldata->offnumLeaf); |
117 | |
118 | /* update head tuple's chain link if needed */ |
119 | if (xldata->offnumHeadLeaf != InvalidOffsetNumber) |
120 | { |
121 | SpGistLeafTuple head; |
122 | |
123 | head = (SpGistLeafTuple) PageGetItem(page, |
124 | PageGetItemId(page, xldata->offnumHeadLeaf)); |
125 | Assert(head->nextOffset == leafTupleHdr.nextOffset); |
126 | head->nextOffset = xldata->offnumLeaf; |
127 | } |
128 | } |
129 | else |
130 | { |
131 | /* replacing a DEAD tuple */ |
132 | PageIndexTupleDelete(page, xldata->offnumLeaf); |
133 | if (PageAddItem(page, |
134 | (Item) leafTuple, leafTupleHdr.size, |
135 | xldata->offnumLeaf, false, false) != xldata->offnumLeaf) |
136 | elog(ERROR, "failed to add item of size %u to SPGiST index page" , |
137 | leafTupleHdr.size); |
138 | } |
139 | |
140 | PageSetLSN(page, lsn); |
141 | MarkBufferDirty(buffer); |
142 | } |
143 | if (BufferIsValid(buffer)) |
144 | UnlockReleaseBuffer(buffer); |
145 | |
146 | /* update parent downlink if necessary */ |
147 | if (xldata->offnumParent != InvalidOffsetNumber) |
148 | { |
149 | if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) |
150 | { |
151 | SpGistInnerTuple tuple; |
152 | BlockNumber blknoLeaf; |
153 | |
154 | XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf); |
155 | |
156 | page = BufferGetPage(buffer); |
157 | |
158 | tuple = (SpGistInnerTuple) PageGetItem(page, |
159 | PageGetItemId(page, xldata->offnumParent)); |
160 | |
161 | spgUpdateNodeLink(tuple, xldata->nodeI, |
162 | blknoLeaf, xldata->offnumLeaf); |
163 | |
164 | PageSetLSN(page, lsn); |
165 | MarkBufferDirty(buffer); |
166 | } |
167 | if (BufferIsValid(buffer)) |
168 | UnlockReleaseBuffer(buffer); |
169 | } |
170 | } |
171 | |
172 | static void |
173 | spgRedoMoveLeafs(XLogReaderState *record) |
174 | { |
175 | XLogRecPtr lsn = record->EndRecPtr; |
176 | char *ptr = XLogRecGetData(record); |
177 | spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr; |
178 | SpGistState state; |
179 | OffsetNumber *toDelete; |
180 | OffsetNumber *toInsert; |
181 | int nInsert; |
182 | Buffer buffer; |
183 | Page page; |
184 | XLogRedoAction action; |
185 | BlockNumber blknoDst; |
186 | |
187 | XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst); |
188 | |
189 | fillFakeState(&state, xldata->stateSrc); |
190 | |
191 | nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1; |
192 | |
193 | ptr += SizeOfSpgxlogMoveLeafs; |
194 | toDelete = (OffsetNumber *) ptr; |
195 | ptr += sizeof(OffsetNumber) * xldata->nMoves; |
196 | toInsert = (OffsetNumber *) ptr; |
197 | ptr += sizeof(OffsetNumber) * nInsert; |
198 | |
199 | /* now ptr points to the list of leaf tuples */ |
200 | |
201 | /* |
202 | * In normal operation we would have all three pages (source, dest, and |
203 | * parent) locked simultaneously; but in WAL replay it should be safe to |
204 | * update them one at a time, as long as we do it in the right order. |
205 | */ |
206 | |
207 | /* Insert tuples on the dest page (do first, so redirect is valid) */ |
208 | if (xldata->newPage) |
209 | { |
210 | buffer = XLogInitBufferForRedo(record, 1); |
211 | SpGistInitBuffer(buffer, |
212 | SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); |
213 | action = BLK_NEEDS_REDO; |
214 | } |
215 | else |
216 | action = XLogReadBufferForRedo(record, 1, &buffer); |
217 | |
218 | if (action == BLK_NEEDS_REDO) |
219 | { |
220 | int i; |
221 | |
222 | page = BufferGetPage(buffer); |
223 | |
224 | for (i = 0; i < nInsert; i++) |
225 | { |
226 | char *leafTuple; |
227 | SpGistLeafTupleData leafTupleHdr; |
228 | |
229 | /* |
230 | * the tuples are not aligned, so must copy to access the size |
231 | * field. |
232 | */ |
233 | leafTuple = ptr; |
234 | memcpy(&leafTupleHdr, leafTuple, |
235 | sizeof(SpGistLeafTupleData)); |
236 | |
237 | addOrReplaceTuple(page, (Item) leafTuple, |
238 | leafTupleHdr.size, toInsert[i]); |
239 | ptr += leafTupleHdr.size; |
240 | } |
241 | |
242 | PageSetLSN(page, lsn); |
243 | MarkBufferDirty(buffer); |
244 | } |
245 | if (BufferIsValid(buffer)) |
246 | UnlockReleaseBuffer(buffer); |
247 | |
248 | /* Delete tuples from the source page, inserting a redirection pointer */ |
249 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
250 | { |
251 | page = BufferGetPage(buffer); |
252 | |
253 | spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, |
254 | state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, |
255 | SPGIST_PLACEHOLDER, |
256 | blknoDst, |
257 | toInsert[nInsert - 1]); |
258 | |
259 | PageSetLSN(page, lsn); |
260 | MarkBufferDirty(buffer); |
261 | } |
262 | if (BufferIsValid(buffer)) |
263 | UnlockReleaseBuffer(buffer); |
264 | |
265 | /* And update the parent downlink */ |
266 | if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) |
267 | { |
268 | SpGistInnerTuple tuple; |
269 | |
270 | page = BufferGetPage(buffer); |
271 | |
272 | tuple = (SpGistInnerTuple) PageGetItem(page, |
273 | PageGetItemId(page, xldata->offnumParent)); |
274 | |
275 | spgUpdateNodeLink(tuple, xldata->nodeI, |
276 | blknoDst, toInsert[nInsert - 1]); |
277 | |
278 | PageSetLSN(page, lsn); |
279 | MarkBufferDirty(buffer); |
280 | } |
281 | if (BufferIsValid(buffer)) |
282 | UnlockReleaseBuffer(buffer); |
283 | } |
284 | |
285 | static void |
286 | spgRedoAddNode(XLogReaderState *record) |
287 | { |
288 | XLogRecPtr lsn = record->EndRecPtr; |
289 | char *ptr = XLogRecGetData(record); |
290 | spgxlogAddNode *xldata = (spgxlogAddNode *) ptr; |
291 | char *innerTuple; |
292 | SpGistInnerTupleData innerTupleHdr; |
293 | SpGistState state; |
294 | Buffer buffer; |
295 | Page page; |
296 | XLogRedoAction action; |
297 | |
298 | ptr += sizeof(spgxlogAddNode); |
299 | innerTuple = ptr; |
300 | /* the tuple is unaligned, so make a copy to access its header */ |
301 | memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData)); |
302 | |
303 | fillFakeState(&state, xldata->stateSrc); |
304 | |
305 | if (!XLogRecHasBlockRef(record, 1)) |
306 | { |
307 | /* update in place */ |
308 | Assert(xldata->parentBlk == -1); |
309 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
310 | { |
311 | page = BufferGetPage(buffer); |
312 | |
313 | PageIndexTupleDelete(page, xldata->offnum); |
314 | if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, |
315 | xldata->offnum, |
316 | false, false) != xldata->offnum) |
317 | elog(ERROR, "failed to add item of size %u to SPGiST index page" , |
318 | innerTupleHdr.size); |
319 | |
320 | PageSetLSN(page, lsn); |
321 | MarkBufferDirty(buffer); |
322 | } |
323 | if (BufferIsValid(buffer)) |
324 | UnlockReleaseBuffer(buffer); |
325 | } |
326 | else |
327 | { |
328 | BlockNumber blkno; |
329 | BlockNumber blknoNew; |
330 | |
331 | XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno); |
332 | XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew); |
333 | |
334 | /* |
335 | * In normal operation we would have all three pages (source, dest, |
336 | * and parent) locked simultaneously; but in WAL replay it should be |
337 | * safe to update them one at a time, as long as we do it in the right |
338 | * order. We must insert the new tuple before replacing the old tuple |
339 | * with the redirect tuple. |
340 | */ |
341 | |
342 | /* Install new tuple first so redirect is valid */ |
343 | if (xldata->newPage) |
344 | { |
345 | /* AddNode is not used for nulls pages */ |
346 | buffer = XLogInitBufferForRedo(record, 1); |
347 | SpGistInitBuffer(buffer, 0); |
348 | action = BLK_NEEDS_REDO; |
349 | } |
350 | else |
351 | action = XLogReadBufferForRedo(record, 1, &buffer); |
352 | if (action == BLK_NEEDS_REDO) |
353 | { |
354 | page = BufferGetPage(buffer); |
355 | |
356 | addOrReplaceTuple(page, (Item) innerTuple, |
357 | innerTupleHdr.size, xldata->offnumNew); |
358 | |
359 | /* |
360 | * If parent is in this same page, update it now. |
361 | */ |
362 | if (xldata->parentBlk == 1) |
363 | { |
364 | SpGistInnerTuple parentTuple; |
365 | |
366 | parentTuple = (SpGistInnerTuple) PageGetItem(page, |
367 | PageGetItemId(page, xldata->offnumParent)); |
368 | |
369 | spgUpdateNodeLink(parentTuple, xldata->nodeI, |
370 | blknoNew, xldata->offnumNew); |
371 | } |
372 | PageSetLSN(page, lsn); |
373 | MarkBufferDirty(buffer); |
374 | } |
375 | if (BufferIsValid(buffer)) |
376 | UnlockReleaseBuffer(buffer); |
377 | |
378 | /* Delete old tuple, replacing it with redirect or placeholder tuple */ |
379 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
380 | { |
381 | SpGistDeadTuple dt; |
382 | |
383 | page = BufferGetPage(buffer); |
384 | |
385 | if (state.isBuild) |
386 | dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER, |
387 | InvalidBlockNumber, |
388 | InvalidOffsetNumber); |
389 | else |
390 | dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, |
391 | blknoNew, |
392 | xldata->offnumNew); |
393 | |
394 | PageIndexTupleDelete(page, xldata->offnum); |
395 | if (PageAddItem(page, (Item) dt, dt->size, |
396 | xldata->offnum, |
397 | false, false) != xldata->offnum) |
398 | elog(ERROR, "failed to add item of size %u to SPGiST index page" , |
399 | dt->size); |
400 | |
401 | if (state.isBuild) |
402 | SpGistPageGetOpaque(page)->nPlaceholder++; |
403 | else |
404 | SpGistPageGetOpaque(page)->nRedirection++; |
405 | |
406 | /* |
407 | * If parent is in this same page, update it now. |
408 | */ |
409 | if (xldata->parentBlk == 0) |
410 | { |
411 | SpGistInnerTuple parentTuple; |
412 | |
413 | parentTuple = (SpGistInnerTuple) PageGetItem(page, |
414 | PageGetItemId(page, xldata->offnumParent)); |
415 | |
416 | spgUpdateNodeLink(parentTuple, xldata->nodeI, |
417 | blknoNew, xldata->offnumNew); |
418 | } |
419 | PageSetLSN(page, lsn); |
420 | MarkBufferDirty(buffer); |
421 | } |
422 | if (BufferIsValid(buffer)) |
423 | UnlockReleaseBuffer(buffer); |
424 | |
425 | /* |
426 | * Update parent downlink (if we didn't do it as part of the source or |
427 | * destination page update already). |
428 | */ |
429 | if (xldata->parentBlk == 2) |
430 | { |
431 | if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) |
432 | { |
433 | SpGistInnerTuple parentTuple; |
434 | |
435 | page = BufferGetPage(buffer); |
436 | |
437 | parentTuple = (SpGistInnerTuple) PageGetItem(page, |
438 | PageGetItemId(page, xldata->offnumParent)); |
439 | |
440 | spgUpdateNodeLink(parentTuple, xldata->nodeI, |
441 | blknoNew, xldata->offnumNew); |
442 | |
443 | PageSetLSN(page, lsn); |
444 | MarkBufferDirty(buffer); |
445 | } |
446 | if (BufferIsValid(buffer)) |
447 | UnlockReleaseBuffer(buffer); |
448 | } |
449 | } |
450 | } |
451 | |
452 | static void |
453 | spgRedoSplitTuple(XLogReaderState *record) |
454 | { |
455 | XLogRecPtr lsn = record->EndRecPtr; |
456 | char *ptr = XLogRecGetData(record); |
457 | spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr; |
458 | char *prefixTuple; |
459 | SpGistInnerTupleData prefixTupleHdr; |
460 | char *postfixTuple; |
461 | SpGistInnerTupleData postfixTupleHdr; |
462 | Buffer buffer; |
463 | Page page; |
464 | XLogRedoAction action; |
465 | |
466 | ptr += sizeof(spgxlogSplitTuple); |
467 | prefixTuple = ptr; |
468 | /* the prefix tuple is unaligned, so make a copy to access its header */ |
469 | memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData)); |
470 | ptr += prefixTupleHdr.size; |
471 | postfixTuple = ptr; |
472 | /* postfix tuple is also unaligned */ |
473 | memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData)); |
474 | |
475 | /* |
476 | * In normal operation we would have both pages locked simultaneously; but |
477 | * in WAL replay it should be safe to update them one at a time, as long |
478 | * as we do it in the right order. |
479 | */ |
480 | |
481 | /* insert postfix tuple first to avoid dangling link */ |
482 | if (!xldata->postfixBlkSame) |
483 | { |
484 | if (xldata->newPage) |
485 | { |
486 | buffer = XLogInitBufferForRedo(record, 1); |
487 | /* SplitTuple is not used for nulls pages */ |
488 | SpGistInitBuffer(buffer, 0); |
489 | action = BLK_NEEDS_REDO; |
490 | } |
491 | else |
492 | action = XLogReadBufferForRedo(record, 1, &buffer); |
493 | if (action == BLK_NEEDS_REDO) |
494 | { |
495 | page = BufferGetPage(buffer); |
496 | |
497 | addOrReplaceTuple(page, (Item) postfixTuple, |
498 | postfixTupleHdr.size, xldata->offnumPostfix); |
499 | |
500 | PageSetLSN(page, lsn); |
501 | MarkBufferDirty(buffer); |
502 | } |
503 | if (BufferIsValid(buffer)) |
504 | UnlockReleaseBuffer(buffer); |
505 | } |
506 | |
507 | /* now handle the original page */ |
508 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
509 | { |
510 | page = BufferGetPage(buffer); |
511 | |
512 | PageIndexTupleDelete(page, xldata->offnumPrefix); |
513 | if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, |
514 | xldata->offnumPrefix, false, false) != xldata->offnumPrefix) |
515 | elog(ERROR, "failed to add item of size %u to SPGiST index page" , |
516 | prefixTupleHdr.size); |
517 | |
518 | if (xldata->postfixBlkSame) |
519 | addOrReplaceTuple(page, (Item) postfixTuple, |
520 | postfixTupleHdr.size, |
521 | xldata->offnumPostfix); |
522 | |
523 | PageSetLSN(page, lsn); |
524 | MarkBufferDirty(buffer); |
525 | } |
526 | if (BufferIsValid(buffer)) |
527 | UnlockReleaseBuffer(buffer); |
528 | } |
529 | |
530 | static void |
531 | spgRedoPickSplit(XLogReaderState *record) |
532 | { |
533 | XLogRecPtr lsn = record->EndRecPtr; |
534 | char *ptr = XLogRecGetData(record); |
535 | spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr; |
536 | char *innerTuple; |
537 | SpGistInnerTupleData innerTupleHdr; |
538 | SpGistState state; |
539 | OffsetNumber *toDelete; |
540 | OffsetNumber *toInsert; |
541 | uint8 *leafPageSelect; |
542 | Buffer srcBuffer; |
543 | Buffer destBuffer; |
544 | Buffer innerBuffer; |
545 | Page srcPage; |
546 | Page destPage; |
547 | Page page; |
548 | int i; |
549 | BlockNumber blknoInner; |
550 | XLogRedoAction action; |
551 | |
552 | XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner); |
553 | |
554 | fillFakeState(&state, xldata->stateSrc); |
555 | |
556 | ptr += SizeOfSpgxlogPickSplit; |
557 | toDelete = (OffsetNumber *) ptr; |
558 | ptr += sizeof(OffsetNumber) * xldata->nDelete; |
559 | toInsert = (OffsetNumber *) ptr; |
560 | ptr += sizeof(OffsetNumber) * xldata->nInsert; |
561 | leafPageSelect = (uint8 *) ptr; |
562 | ptr += sizeof(uint8) * xldata->nInsert; |
563 | |
564 | innerTuple = ptr; |
565 | /* the inner tuple is unaligned, so make a copy to access its header */ |
566 | memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData)); |
567 | ptr += innerTupleHdr.size; |
568 | |
569 | /* now ptr points to the list of leaf tuples */ |
570 | |
571 | if (xldata->isRootSplit) |
572 | { |
573 | /* when splitting root, we touch it only in the guise of new inner */ |
574 | srcBuffer = InvalidBuffer; |
575 | srcPage = NULL; |
576 | } |
577 | else if (xldata->initSrc) |
578 | { |
579 | /* just re-init the source page */ |
580 | srcBuffer = XLogInitBufferForRedo(record, 0); |
581 | srcPage = (Page) BufferGetPage(srcBuffer); |
582 | |
583 | SpGistInitBuffer(srcBuffer, |
584 | SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); |
585 | /* don't update LSN etc till we're done with it */ |
586 | } |
587 | else |
588 | { |
589 | /* |
590 | * Delete the specified tuples from source page. (In case we're in |
591 | * Hot Standby, we need to hold lock on the page till we're done |
592 | * inserting leaf tuples and the new inner tuple, else the added |
593 | * redirect tuple will be a dangling link.) |
594 | */ |
595 | srcPage = NULL; |
596 | if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO) |
597 | { |
598 | srcPage = BufferGetPage(srcBuffer); |
599 | |
600 | /* |
601 | * We have it a bit easier here than in doPickSplit(), because we |
602 | * know the inner tuple's location already, so we can inject the |
603 | * correct redirection tuple now. |
604 | */ |
605 | if (!state.isBuild) |
606 | spgPageIndexMultiDelete(&state, srcPage, |
607 | toDelete, xldata->nDelete, |
608 | SPGIST_REDIRECT, |
609 | SPGIST_PLACEHOLDER, |
610 | blknoInner, |
611 | xldata->offnumInner); |
612 | else |
613 | spgPageIndexMultiDelete(&state, srcPage, |
614 | toDelete, xldata->nDelete, |
615 | SPGIST_PLACEHOLDER, |
616 | SPGIST_PLACEHOLDER, |
617 | InvalidBlockNumber, |
618 | InvalidOffsetNumber); |
619 | |
620 | /* don't update LSN etc till we're done with it */ |
621 | } |
622 | } |
623 | |
624 | /* try to access dest page if any */ |
625 | if (!XLogRecHasBlockRef(record, 1)) |
626 | { |
627 | destBuffer = InvalidBuffer; |
628 | destPage = NULL; |
629 | } |
630 | else if (xldata->initDest) |
631 | { |
632 | /* just re-init the dest page */ |
633 | destBuffer = XLogInitBufferForRedo(record, 1); |
634 | destPage = (Page) BufferGetPage(destBuffer); |
635 | |
636 | SpGistInitBuffer(destBuffer, |
637 | SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); |
638 | /* don't update LSN etc till we're done with it */ |
639 | } |
640 | else |
641 | { |
642 | /* |
643 | * We could probably release the page lock immediately in the |
644 | * full-page-image case, but for safety let's hold it till later. |
645 | */ |
646 | if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO) |
647 | destPage = (Page) BufferGetPage(destBuffer); |
648 | else |
649 | destPage = NULL; /* don't do any page updates */ |
650 | } |
651 | |
652 | /* restore leaf tuples to src and/or dest page */ |
653 | for (i = 0; i < xldata->nInsert; i++) |
654 | { |
655 | char *leafTuple; |
656 | SpGistLeafTupleData leafTupleHdr; |
657 | |
658 | /* the tuples are not aligned, so must copy to access the size field. */ |
659 | leafTuple = ptr; |
660 | memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); |
661 | ptr += leafTupleHdr.size; |
662 | |
663 | page = leafPageSelect[i] ? destPage : srcPage; |
664 | if (page == NULL) |
665 | continue; /* no need to touch this page */ |
666 | |
667 | addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, |
668 | toInsert[i]); |
669 | } |
670 | |
671 | /* Now update src and dest page LSNs if needed */ |
672 | if (srcPage != NULL) |
673 | { |
674 | PageSetLSN(srcPage, lsn); |
675 | MarkBufferDirty(srcBuffer); |
676 | } |
677 | if (destPage != NULL) |
678 | { |
679 | PageSetLSN(destPage, lsn); |
680 | MarkBufferDirty(destBuffer); |
681 | } |
682 | |
683 | /* restore new inner tuple */ |
684 | if (xldata->initInner) |
685 | { |
686 | innerBuffer = XLogInitBufferForRedo(record, 2); |
687 | SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0)); |
688 | action = BLK_NEEDS_REDO; |
689 | } |
690 | else |
691 | action = XLogReadBufferForRedo(record, 2, &innerBuffer); |
692 | |
693 | if (action == BLK_NEEDS_REDO) |
694 | { |
695 | page = BufferGetPage(innerBuffer); |
696 | |
697 | addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size, |
698 | xldata->offnumInner); |
699 | |
700 | /* if inner is also parent, update link while we're here */ |
701 | if (xldata->innerIsParent) |
702 | { |
703 | SpGistInnerTuple parent; |
704 | |
705 | parent = (SpGistInnerTuple) PageGetItem(page, |
706 | PageGetItemId(page, xldata->offnumParent)); |
707 | spgUpdateNodeLink(parent, xldata->nodeI, |
708 | blknoInner, xldata->offnumInner); |
709 | } |
710 | |
711 | PageSetLSN(page, lsn); |
712 | MarkBufferDirty(innerBuffer); |
713 | } |
714 | if (BufferIsValid(innerBuffer)) |
715 | UnlockReleaseBuffer(innerBuffer); |
716 | |
717 | /* |
718 | * Now we can release the leaf-page locks. It's okay to do this before |
719 | * updating the parent downlink. |
720 | */ |
721 | if (BufferIsValid(srcBuffer)) |
722 | UnlockReleaseBuffer(srcBuffer); |
723 | if (BufferIsValid(destBuffer)) |
724 | UnlockReleaseBuffer(destBuffer); |
725 | |
726 | /* update parent downlink, unless we did it above */ |
727 | if (XLogRecHasBlockRef(record, 3)) |
728 | { |
729 | Buffer parentBuffer; |
730 | |
731 | if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO) |
732 | { |
733 | SpGistInnerTuple parent; |
734 | |
735 | page = BufferGetPage(parentBuffer); |
736 | |
737 | parent = (SpGistInnerTuple) PageGetItem(page, |
738 | PageGetItemId(page, xldata->offnumParent)); |
739 | spgUpdateNodeLink(parent, xldata->nodeI, |
740 | blknoInner, xldata->offnumInner); |
741 | |
742 | PageSetLSN(page, lsn); |
743 | MarkBufferDirty(parentBuffer); |
744 | } |
745 | if (BufferIsValid(parentBuffer)) |
746 | UnlockReleaseBuffer(parentBuffer); |
747 | } |
748 | else |
749 | Assert(xldata->innerIsParent || xldata->isRootSplit); |
750 | } |
751 | |
752 | static void |
753 | spgRedoVacuumLeaf(XLogReaderState *record) |
754 | { |
755 | XLogRecPtr lsn = record->EndRecPtr; |
756 | char *ptr = XLogRecGetData(record); |
757 | spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr; |
758 | OffsetNumber *toDead; |
759 | OffsetNumber *toPlaceholder; |
760 | OffsetNumber *moveSrc; |
761 | OffsetNumber *moveDest; |
762 | OffsetNumber *chainSrc; |
763 | OffsetNumber *chainDest; |
764 | SpGistState state; |
765 | Buffer buffer; |
766 | Page page; |
767 | int i; |
768 | |
769 | fillFakeState(&state, xldata->stateSrc); |
770 | |
771 | ptr += SizeOfSpgxlogVacuumLeaf; |
772 | toDead = (OffsetNumber *) ptr; |
773 | ptr += sizeof(OffsetNumber) * xldata->nDead; |
774 | toPlaceholder = (OffsetNumber *) ptr; |
775 | ptr += sizeof(OffsetNumber) * xldata->nPlaceholder; |
776 | moveSrc = (OffsetNumber *) ptr; |
777 | ptr += sizeof(OffsetNumber) * xldata->nMove; |
778 | moveDest = (OffsetNumber *) ptr; |
779 | ptr += sizeof(OffsetNumber) * xldata->nMove; |
780 | chainSrc = (OffsetNumber *) ptr; |
781 | ptr += sizeof(OffsetNumber) * xldata->nChain; |
782 | chainDest = (OffsetNumber *) ptr; |
783 | |
784 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
785 | { |
786 | page = BufferGetPage(buffer); |
787 | |
788 | spgPageIndexMultiDelete(&state, page, |
789 | toDead, xldata->nDead, |
790 | SPGIST_DEAD, SPGIST_DEAD, |
791 | InvalidBlockNumber, |
792 | InvalidOffsetNumber); |
793 | |
794 | spgPageIndexMultiDelete(&state, page, |
795 | toPlaceholder, xldata->nPlaceholder, |
796 | SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, |
797 | InvalidBlockNumber, |
798 | InvalidOffsetNumber); |
799 | |
800 | /* see comments in vacuumLeafPage() */ |
801 | for (i = 0; i < xldata->nMove; i++) |
802 | { |
803 | ItemId idSrc = PageGetItemId(page, moveSrc[i]); |
804 | ItemId idDest = PageGetItemId(page, moveDest[i]); |
805 | ItemIdData tmp; |
806 | |
807 | tmp = *idSrc; |
808 | *idSrc = *idDest; |
809 | *idDest = tmp; |
810 | } |
811 | |
812 | spgPageIndexMultiDelete(&state, page, |
813 | moveSrc, xldata->nMove, |
814 | SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, |
815 | InvalidBlockNumber, |
816 | InvalidOffsetNumber); |
817 | |
818 | for (i = 0; i < xldata->nChain; i++) |
819 | { |
820 | SpGistLeafTuple lt; |
821 | |
822 | lt = (SpGistLeafTuple) PageGetItem(page, |
823 | PageGetItemId(page, chainSrc[i])); |
824 | Assert(lt->tupstate == SPGIST_LIVE); |
825 | lt->nextOffset = chainDest[i]; |
826 | } |
827 | |
828 | PageSetLSN(page, lsn); |
829 | MarkBufferDirty(buffer); |
830 | } |
831 | if (BufferIsValid(buffer)) |
832 | UnlockReleaseBuffer(buffer); |
833 | } |
834 | |
835 | static void |
836 | spgRedoVacuumRoot(XLogReaderState *record) |
837 | { |
838 | XLogRecPtr lsn = record->EndRecPtr; |
839 | char *ptr = XLogRecGetData(record); |
840 | spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr; |
841 | OffsetNumber *toDelete; |
842 | Buffer buffer; |
843 | Page page; |
844 | |
845 | toDelete = xldata->offsets; |
846 | |
847 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
848 | { |
849 | page = BufferGetPage(buffer); |
850 | |
851 | /* The tuple numbers are in order */ |
852 | PageIndexMultiDelete(page, toDelete, xldata->nDelete); |
853 | |
854 | PageSetLSN(page, lsn); |
855 | MarkBufferDirty(buffer); |
856 | } |
857 | if (BufferIsValid(buffer)) |
858 | UnlockReleaseBuffer(buffer); |
859 | } |
860 | |
861 | static void |
862 | spgRedoVacuumRedirect(XLogReaderState *record) |
863 | { |
864 | XLogRecPtr lsn = record->EndRecPtr; |
865 | char *ptr = XLogRecGetData(record); |
866 | spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; |
867 | OffsetNumber *itemToPlaceholder; |
868 | Buffer buffer; |
869 | |
870 | itemToPlaceholder = xldata->offsets; |
871 | |
872 | /* |
873 | * If any redirection tuples are being removed, make sure there are no |
874 | * live Hot Standby transactions that might need to see them. |
875 | */ |
876 | if (InHotStandby) |
877 | { |
878 | if (TransactionIdIsValid(xldata->newestRedirectXid)) |
879 | { |
880 | RelFileNode node; |
881 | |
882 | XLogRecGetBlockTag(record, 0, &node, NULL, NULL); |
883 | ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, |
884 | node); |
885 | } |
886 | } |
887 | |
888 | if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) |
889 | { |
890 | Page page = BufferGetPage(buffer); |
891 | SpGistPageOpaque opaque = SpGistPageGetOpaque(page); |
892 | int i; |
893 | |
894 | /* Convert redirect pointers to plain placeholders */ |
895 | for (i = 0; i < xldata->nToPlaceholder; i++) |
896 | { |
897 | SpGistDeadTuple dt; |
898 | |
899 | dt = (SpGistDeadTuple) PageGetItem(page, |
900 | PageGetItemId(page, itemToPlaceholder[i])); |
901 | Assert(dt->tupstate == SPGIST_REDIRECT); |
902 | dt->tupstate = SPGIST_PLACEHOLDER; |
903 | ItemPointerSetInvalid(&dt->pointer); |
904 | } |
905 | |
906 | Assert(opaque->nRedirection >= xldata->nToPlaceholder); |
907 | opaque->nRedirection -= xldata->nToPlaceholder; |
908 | opaque->nPlaceholder += xldata->nToPlaceholder; |
909 | |
910 | /* Remove placeholder tuples at end of page */ |
911 | if (xldata->firstPlaceholder != InvalidOffsetNumber) |
912 | { |
913 | int max = PageGetMaxOffsetNumber(page); |
914 | OffsetNumber *toDelete; |
915 | |
916 | toDelete = palloc(sizeof(OffsetNumber) * max); |
917 | |
918 | for (i = xldata->firstPlaceholder; i <= max; i++) |
919 | toDelete[i - xldata->firstPlaceholder] = i; |
920 | |
921 | i = max - xldata->firstPlaceholder + 1; |
922 | Assert(opaque->nPlaceholder >= i); |
923 | opaque->nPlaceholder -= i; |
924 | |
925 | /* The array is sorted, so can use PageIndexMultiDelete */ |
926 | PageIndexMultiDelete(page, toDelete, i); |
927 | |
928 | pfree(toDelete); |
929 | } |
930 | |
931 | PageSetLSN(page, lsn); |
932 | MarkBufferDirty(buffer); |
933 | } |
934 | if (BufferIsValid(buffer)) |
935 | UnlockReleaseBuffer(buffer); |
936 | } |
937 | |
938 | void |
939 | spg_redo(XLogReaderState *record) |
940 | { |
941 | uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
942 | MemoryContext oldCxt; |
943 | |
944 | oldCxt = MemoryContextSwitchTo(opCtx); |
945 | switch (info) |
946 | { |
947 | case XLOG_SPGIST_ADD_LEAF: |
948 | spgRedoAddLeaf(record); |
949 | break; |
950 | case XLOG_SPGIST_MOVE_LEAFS: |
951 | spgRedoMoveLeafs(record); |
952 | break; |
953 | case XLOG_SPGIST_ADD_NODE: |
954 | spgRedoAddNode(record); |
955 | break; |
956 | case XLOG_SPGIST_SPLIT_TUPLE: |
957 | spgRedoSplitTuple(record); |
958 | break; |
959 | case XLOG_SPGIST_PICKSPLIT: |
960 | spgRedoPickSplit(record); |
961 | break; |
962 | case XLOG_SPGIST_VACUUM_LEAF: |
963 | spgRedoVacuumLeaf(record); |
964 | break; |
965 | case XLOG_SPGIST_VACUUM_ROOT: |
966 | spgRedoVacuumRoot(record); |
967 | break; |
968 | case XLOG_SPGIST_VACUUM_REDIRECT: |
969 | spgRedoVacuumRedirect(record); |
970 | break; |
971 | default: |
972 | elog(PANIC, "spg_redo: unknown op code %u" , info); |
973 | } |
974 | |
975 | MemoryContextSwitchTo(oldCxt); |
976 | MemoryContextReset(opCtx); |
977 | } |
978 | |
979 | void |
980 | spg_xlog_startup(void) |
981 | { |
982 | opCtx = AllocSetContextCreate(CurrentMemoryContext, |
983 | "SP-GiST temporary context" , |
984 | ALLOCSET_DEFAULT_SIZES); |
985 | } |
986 | |
987 | void |
988 | spg_xlog_cleanup(void) |
989 | { |
990 | MemoryContextDelete(opCtx); |
991 | opCtx = NULL; |
992 | } |
993 | |
994 | /* |
995 | * Mask a SpGist page before performing consistency checks on it. |
996 | */ |
997 | void |
998 | spg_mask(char *pagedata, BlockNumber blkno) |
999 | { |
1000 | Page page = (Page) pagedata; |
1001 | PageHeader pagehdr = (PageHeader) page; |
1002 | |
1003 | mask_page_lsn_and_checksum(page); |
1004 | |
1005 | mask_page_hint_bits(page); |
1006 | |
1007 | /* |
1008 | * Mask the unused space, but only if the page's pd_lower appears to have |
1009 | * been set correctly. |
1010 | */ |
1011 | if (pagehdr->pd_lower > SizeOfPageHeaderData) |
1012 | mask_unused_space(page); |
1013 | } |
1014 | |