1/*-------------------------------------------------------------------------
2 *
3 * hash_xlog.c
4 * WAL replay logic for hash index.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/hash/hash_xlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/bufmask.h"
18#include "access/hash.h"
19#include "access/hash_xlog.h"
20#include "access/xlogutils.h"
21#include "access/xlog.h"
22#include "access/transam.h"
23#include "storage/procarray.h"
24#include "miscadmin.h"
25
26/*
27 * replay a hash index meta page
28 */
29static void
30hash_xlog_init_meta_page(XLogReaderState *record)
31{
32 XLogRecPtr lsn = record->EndRecPtr;
33 Page page;
34 Buffer metabuf;
35 ForkNumber forknum;
36
37 xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
38
39 /* create the index' metapage */
40 metabuf = XLogInitBufferForRedo(record, 0);
41 Assert(BufferIsValid(metabuf));
42 _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
43 xlrec->ffactor, true);
44 page = (Page) BufferGetPage(metabuf);
45 PageSetLSN(page, lsn);
46 MarkBufferDirty(metabuf);
47
48 /*
49 * Force the on-disk state of init forks to always be in sync with the
50 * state in shared buffers. See XLogReadBufferForRedoExtended. We need
51 * special handling for init forks as create index operations don't log a
52 * full page image of the metapage.
53 */
54 XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
55 if (forknum == INIT_FORKNUM)
56 FlushOneBuffer(metabuf);
57
58 /* all done */
59 UnlockReleaseBuffer(metabuf);
60}
61
62/*
63 * replay a hash index bitmap page
64 */
65static void
66hash_xlog_init_bitmap_page(XLogReaderState *record)
67{
68 XLogRecPtr lsn = record->EndRecPtr;
69 Buffer bitmapbuf;
70 Buffer metabuf;
71 Page page;
72 HashMetaPage metap;
73 uint32 num_buckets;
74 ForkNumber forknum;
75
76 xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
77
78 /*
79 * Initialize bitmap page
80 */
81 bitmapbuf = XLogInitBufferForRedo(record, 0);
82 _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
83 PageSetLSN(BufferGetPage(bitmapbuf), lsn);
84 MarkBufferDirty(bitmapbuf);
85
86 /*
87 * Force the on-disk state of init forks to always be in sync with the
88 * state in shared buffers. See XLogReadBufferForRedoExtended. We need
89 * special handling for init forks as create index operations don't log a
90 * full page image of the metapage.
91 */
92 XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
93 if (forknum == INIT_FORKNUM)
94 FlushOneBuffer(bitmapbuf);
95 UnlockReleaseBuffer(bitmapbuf);
96
97 /* add the new bitmap page to the metapage's list of bitmaps */
98 if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
99 {
100 /*
101 * Note: in normal operation, we'd update the metapage while still
102 * holding lock on the bitmap page. But during replay it's not
103 * necessary to hold that lock, since nobody can see it yet; the
104 * creating transaction hasn't yet committed.
105 */
106 page = BufferGetPage(metabuf);
107 metap = HashPageGetMeta(page);
108
109 num_buckets = metap->hashm_maxbucket + 1;
110 metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
111 metap->hashm_nmaps++;
112
113 PageSetLSN(page, lsn);
114 MarkBufferDirty(metabuf);
115
116 XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL);
117 if (forknum == INIT_FORKNUM)
118 FlushOneBuffer(metabuf);
119 }
120 if (BufferIsValid(metabuf))
121 UnlockReleaseBuffer(metabuf);
122}
123
124/*
125 * replay a hash index insert without split
126 */
127static void
128hash_xlog_insert(XLogReaderState *record)
129{
130 HashMetaPage metap;
131 XLogRecPtr lsn = record->EndRecPtr;
132 xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
133 Buffer buffer;
134 Page page;
135
136 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
137 {
138 Size datalen;
139 char *datapos = XLogRecGetBlockData(record, 0, &datalen);
140
141 page = BufferGetPage(buffer);
142
143 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
144 false, false) == InvalidOffsetNumber)
145 elog(PANIC, "hash_xlog_insert: failed to add item");
146
147 PageSetLSN(page, lsn);
148 MarkBufferDirty(buffer);
149 }
150 if (BufferIsValid(buffer))
151 UnlockReleaseBuffer(buffer);
152
153 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
154 {
155 /*
156 * Note: in normal operation, we'd update the metapage while still
157 * holding lock on the page we inserted into. But during replay it's
158 * not necessary to hold that lock, since no other index updates can
159 * be happening concurrently.
160 */
161 page = BufferGetPage(buffer);
162 metap = HashPageGetMeta(page);
163 metap->hashm_ntuples += 1;
164
165 PageSetLSN(page, lsn);
166 MarkBufferDirty(buffer);
167 }
168 if (BufferIsValid(buffer))
169 UnlockReleaseBuffer(buffer);
170}
171
172/*
173 * replay addition of overflow page for hash index
174 */
175static void
176hash_xlog_add_ovfl_page(XLogReaderState *record)
177{
178 XLogRecPtr lsn = record->EndRecPtr;
179 xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
180 Buffer leftbuf;
181 Buffer ovflbuf;
182 Buffer metabuf;
183 BlockNumber leftblk;
184 BlockNumber rightblk;
185 BlockNumber newmapblk = InvalidBlockNumber;
186 Page ovflpage;
187 HashPageOpaque ovflopaque;
188 uint32 *num_bucket;
189 char *data;
190 Size datalen PG_USED_FOR_ASSERTS_ONLY;
191 bool new_bmpage = false;
192
193 XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
194 XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
195
196 ovflbuf = XLogInitBufferForRedo(record, 0);
197 Assert(BufferIsValid(ovflbuf));
198
199 data = XLogRecGetBlockData(record, 0, &datalen);
200 num_bucket = (uint32 *) data;
201 Assert(datalen == sizeof(uint32));
202 _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
203 true);
204 /* update backlink */
205 ovflpage = BufferGetPage(ovflbuf);
206 ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
207 ovflopaque->hasho_prevblkno = leftblk;
208
209 PageSetLSN(ovflpage, lsn);
210 MarkBufferDirty(ovflbuf);
211
212 if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
213 {
214 Page leftpage;
215 HashPageOpaque leftopaque;
216
217 leftpage = BufferGetPage(leftbuf);
218 leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
219 leftopaque->hasho_nextblkno = rightblk;
220
221 PageSetLSN(leftpage, lsn);
222 MarkBufferDirty(leftbuf);
223 }
224
225 if (BufferIsValid(leftbuf))
226 UnlockReleaseBuffer(leftbuf);
227 UnlockReleaseBuffer(ovflbuf);
228
229 /*
230 * Note: in normal operation, we'd update the bitmap and meta page while
231 * still holding lock on the overflow pages. But during replay it's not
232 * necessary to hold those locks, since no other index updates can be
233 * happening concurrently.
234 */
235 if (XLogRecHasBlockRef(record, 2))
236 {
237 Buffer mapbuffer;
238
239 if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
240 {
241 Page mappage = (Page) BufferGetPage(mapbuffer);
242 uint32 *freep = NULL;
243 char *data;
244 uint32 *bitmap_page_bit;
245
246 freep = HashPageGetBitmap(mappage);
247
248 data = XLogRecGetBlockData(record, 2, &datalen);
249 bitmap_page_bit = (uint32 *) data;
250
251 SETBIT(freep, *bitmap_page_bit);
252
253 PageSetLSN(mappage, lsn);
254 MarkBufferDirty(mapbuffer);
255 }
256 if (BufferIsValid(mapbuffer))
257 UnlockReleaseBuffer(mapbuffer);
258 }
259
260 if (XLogRecHasBlockRef(record, 3))
261 {
262 Buffer newmapbuf;
263
264 newmapbuf = XLogInitBufferForRedo(record, 3);
265
266 _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
267
268 new_bmpage = true;
269 newmapblk = BufferGetBlockNumber(newmapbuf);
270
271 MarkBufferDirty(newmapbuf);
272 PageSetLSN(BufferGetPage(newmapbuf), lsn);
273
274 UnlockReleaseBuffer(newmapbuf);
275 }
276
277 if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
278 {
279 HashMetaPage metap;
280 Page page;
281 uint32 *firstfree_ovflpage;
282
283 data = XLogRecGetBlockData(record, 4, &datalen);
284 firstfree_ovflpage = (uint32 *) data;
285
286 page = BufferGetPage(metabuf);
287 metap = HashPageGetMeta(page);
288 metap->hashm_firstfree = *firstfree_ovflpage;
289
290 if (!xlrec->bmpage_found)
291 {
292 metap->hashm_spares[metap->hashm_ovflpoint]++;
293
294 if (new_bmpage)
295 {
296 Assert(BlockNumberIsValid(newmapblk));
297
298 metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
299 metap->hashm_nmaps++;
300 metap->hashm_spares[metap->hashm_ovflpoint]++;
301 }
302 }
303
304 PageSetLSN(page, lsn);
305 MarkBufferDirty(metabuf);
306 }
307 if (BufferIsValid(metabuf))
308 UnlockReleaseBuffer(metabuf);
309}
310
311/*
312 * replay allocation of page for split operation
313 */
314static void
315hash_xlog_split_allocate_page(XLogReaderState *record)
316{
317 XLogRecPtr lsn = record->EndRecPtr;
318 xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
319 Buffer oldbuf;
320 Buffer newbuf;
321 Buffer metabuf;
322 Size datalen PG_USED_FOR_ASSERTS_ONLY;
323 char *data;
324 XLogRedoAction action;
325
326 /*
327 * To be consistent with normal operation, here we take cleanup locks on
328 * both the old and new buckets even though there can't be any concurrent
329 * inserts.
330 */
331
332 /* replay the record for old bucket */
333 action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
334
335 /*
336 * Note that we still update the page even if it was restored from a full
337 * page image, because the special space is not included in the image.
338 */
339 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
340 {
341 Page oldpage;
342 HashPageOpaque oldopaque;
343
344 oldpage = BufferGetPage(oldbuf);
345 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
346
347 oldopaque->hasho_flag = xlrec->old_bucket_flag;
348 oldopaque->hasho_prevblkno = xlrec->new_bucket;
349
350 PageSetLSN(oldpage, lsn);
351 MarkBufferDirty(oldbuf);
352 }
353
354 /* replay the record for new bucket */
355 newbuf = XLogInitBufferForRedo(record, 1);
356 _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
357 xlrec->new_bucket_flag, true);
358 if (!IsBufferCleanupOK(newbuf))
359 elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
360 MarkBufferDirty(newbuf);
361 PageSetLSN(BufferGetPage(newbuf), lsn);
362
363 /*
364 * We can release the lock on old bucket early as well but doing here to
365 * consistent with normal operation.
366 */
367 if (BufferIsValid(oldbuf))
368 UnlockReleaseBuffer(oldbuf);
369 if (BufferIsValid(newbuf))
370 UnlockReleaseBuffer(newbuf);
371
372 /*
373 * Note: in normal operation, we'd update the meta page while still
374 * holding lock on the old and new bucket pages. But during replay it's
375 * not necessary to hold those locks, since no other bucket splits can be
376 * happening concurrently.
377 */
378
379 /* replay the record for metapage changes */
380 if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
381 {
382 Page page;
383 HashMetaPage metap;
384
385 page = BufferGetPage(metabuf);
386 metap = HashPageGetMeta(page);
387 metap->hashm_maxbucket = xlrec->new_bucket;
388
389 data = XLogRecGetBlockData(record, 2, &datalen);
390
391 if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
392 {
393 uint32 lowmask;
394 uint32 *highmask;
395
396 /* extract low and high masks. */
397 memcpy(&lowmask, data, sizeof(uint32));
398 highmask = (uint32 *) ((char *) data + sizeof(uint32));
399
400 /* update metapage */
401 metap->hashm_lowmask = lowmask;
402 metap->hashm_highmask = *highmask;
403
404 data += sizeof(uint32) * 2;
405 }
406
407 if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
408 {
409 uint32 ovflpoint;
410 uint32 *ovflpages;
411
412 /* extract information of overflow pages. */
413 memcpy(&ovflpoint, data, sizeof(uint32));
414 ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
415
416 /* update metapage */
417 metap->hashm_spares[ovflpoint] = *ovflpages;
418 metap->hashm_ovflpoint = ovflpoint;
419 }
420
421 MarkBufferDirty(metabuf);
422 PageSetLSN(BufferGetPage(metabuf), lsn);
423 }
424
425 if (BufferIsValid(metabuf))
426 UnlockReleaseBuffer(metabuf);
427}
428
429/*
430 * replay of split operation
431 */
432static void
433hash_xlog_split_page(XLogReaderState *record)
434{
435 Buffer buf;
436
437 if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
438 elog(ERROR, "Hash split record did not contain a full-page image");
439
440 UnlockReleaseBuffer(buf);
441}
442
443/*
444 * replay completion of split operation
445 */
446static void
447hash_xlog_split_complete(XLogReaderState *record)
448{
449 XLogRecPtr lsn = record->EndRecPtr;
450 xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
451 Buffer oldbuf;
452 Buffer newbuf;
453 XLogRedoAction action;
454
455 /* replay the record for old bucket */
456 action = XLogReadBufferForRedo(record, 0, &oldbuf);
457
458 /*
459 * Note that we still update the page even if it was restored from a full
460 * page image, because the bucket flag is not included in the image.
461 */
462 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
463 {
464 Page oldpage;
465 HashPageOpaque oldopaque;
466
467 oldpage = BufferGetPage(oldbuf);
468 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
469
470 oldopaque->hasho_flag = xlrec->old_bucket_flag;
471
472 PageSetLSN(oldpage, lsn);
473 MarkBufferDirty(oldbuf);
474 }
475 if (BufferIsValid(oldbuf))
476 UnlockReleaseBuffer(oldbuf);
477
478 /* replay the record for new bucket */
479 action = XLogReadBufferForRedo(record, 1, &newbuf);
480
481 /*
482 * Note that we still update the page even if it was restored from a full
483 * page image, because the bucket flag is not included in the image.
484 */
485 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
486 {
487 Page newpage;
488 HashPageOpaque nopaque;
489
490 newpage = BufferGetPage(newbuf);
491 nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
492
493 nopaque->hasho_flag = xlrec->new_bucket_flag;
494
495 PageSetLSN(newpage, lsn);
496 MarkBufferDirty(newbuf);
497 }
498 if (BufferIsValid(newbuf))
499 UnlockReleaseBuffer(newbuf);
500}
501
502/*
503 * replay move of page contents for squeeze operation of hash index
504 */
505static void
506hash_xlog_move_page_contents(XLogReaderState *record)
507{
508 XLogRecPtr lsn = record->EndRecPtr;
509 xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
510 Buffer bucketbuf = InvalidBuffer;
511 Buffer writebuf = InvalidBuffer;
512 Buffer deletebuf = InvalidBuffer;
513 XLogRedoAction action;
514
515 /*
516 * Ensure we have a cleanup lock on primary bucket page before we start
517 * with the actual replay operation. This is to ensure that neither a
518 * scan can start nor a scan can be already-in-progress during the replay
519 * of this operation. If we allow scans during this operation, then they
520 * can miss some records or show the same record multiple times.
521 */
522 if (xldata->is_prim_bucket_same_wrt)
523 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
524 else
525 {
526 /*
527 * we don't care for return value as the purpose of reading bucketbuf
528 * is to ensure a cleanup lock on primary bucket page.
529 */
530 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
531
532 action = XLogReadBufferForRedo(record, 1, &writebuf);
533 }
534
535 /* replay the record for adding entries in overflow buffer */
536 if (action == BLK_NEEDS_REDO)
537 {
538 Page writepage;
539 char *begin;
540 char *data;
541 Size datalen;
542 uint16 ninserted = 0;
543
544 data = begin = XLogRecGetBlockData(record, 1, &datalen);
545
546 writepage = (Page) BufferGetPage(writebuf);
547
548 if (xldata->ntups > 0)
549 {
550 OffsetNumber *towrite = (OffsetNumber *) data;
551
552 data += sizeof(OffsetNumber) * xldata->ntups;
553
554 while (data - begin < datalen)
555 {
556 IndexTuple itup = (IndexTuple) data;
557 Size itemsz;
558 OffsetNumber l;
559
560 itemsz = IndexTupleSize(itup);
561 itemsz = MAXALIGN(itemsz);
562
563 data += itemsz;
564
565 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
566 if (l == InvalidOffsetNumber)
567 elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
568 (int) itemsz);
569
570 ninserted++;
571 }
572 }
573
574 /*
575 * number of tuples inserted must be same as requested in REDO record.
576 */
577 Assert(ninserted == xldata->ntups);
578
579 PageSetLSN(writepage, lsn);
580 MarkBufferDirty(writebuf);
581 }
582
583 /* replay the record for deleting entries from overflow buffer */
584 if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
585 {
586 Page page;
587 char *ptr;
588 Size len;
589
590 ptr = XLogRecGetBlockData(record, 2, &len);
591
592 page = (Page) BufferGetPage(deletebuf);
593
594 if (len > 0)
595 {
596 OffsetNumber *unused;
597 OffsetNumber *unend;
598
599 unused = (OffsetNumber *) ptr;
600 unend = (OffsetNumber *) ((char *) ptr + len);
601
602 if ((unend - unused) > 0)
603 PageIndexMultiDelete(page, unused, unend - unused);
604 }
605
606 PageSetLSN(page, lsn);
607 MarkBufferDirty(deletebuf);
608 }
609
610 /*
611 * Replay is complete, now we can release the buffers. We release locks at
612 * end of replay operation to ensure that we hold lock on primary bucket
613 * page till end of operation. We can optimize by releasing the lock on
614 * write buffer as soon as the operation for same is complete, if it is
615 * not same as primary bucket page, but that doesn't seem to be worth
616 * complicating the code.
617 */
618 if (BufferIsValid(deletebuf))
619 UnlockReleaseBuffer(deletebuf);
620
621 if (BufferIsValid(writebuf))
622 UnlockReleaseBuffer(writebuf);
623
624 if (BufferIsValid(bucketbuf))
625 UnlockReleaseBuffer(bucketbuf);
626}
627
628/*
629 * replay squeeze page operation of hash index
630 */
631static void
632hash_xlog_squeeze_page(XLogReaderState *record)
633{
634 XLogRecPtr lsn = record->EndRecPtr;
635 xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
636 Buffer bucketbuf = InvalidBuffer;
637 Buffer writebuf;
638 Buffer ovflbuf;
639 Buffer prevbuf = InvalidBuffer;
640 Buffer mapbuf;
641 XLogRedoAction action;
642
643 /*
644 * Ensure we have a cleanup lock on primary bucket page before we start
645 * with the actual replay operation. This is to ensure that neither a
646 * scan can start nor a scan can be already-in-progress during the replay
647 * of this operation. If we allow scans during this operation, then they
648 * can miss some records or show the same record multiple times.
649 */
650 if (xldata->is_prim_bucket_same_wrt)
651 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
652 else
653 {
654 /*
655 * we don't care for return value as the purpose of reading bucketbuf
656 * is to ensure a cleanup lock on primary bucket page.
657 */
658 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
659
660 action = XLogReadBufferForRedo(record, 1, &writebuf);
661 }
662
663 /* replay the record for adding entries in overflow buffer */
664 if (action == BLK_NEEDS_REDO)
665 {
666 Page writepage;
667 char *begin;
668 char *data;
669 Size datalen;
670 uint16 ninserted = 0;
671
672 data = begin = XLogRecGetBlockData(record, 1, &datalen);
673
674 writepage = (Page) BufferGetPage(writebuf);
675
676 if (xldata->ntups > 0)
677 {
678 OffsetNumber *towrite = (OffsetNumber *) data;
679
680 data += sizeof(OffsetNumber) * xldata->ntups;
681
682 while (data - begin < datalen)
683 {
684 IndexTuple itup = (IndexTuple) data;
685 Size itemsz;
686 OffsetNumber l;
687
688 itemsz = IndexTupleSize(itup);
689 itemsz = MAXALIGN(itemsz);
690
691 data += itemsz;
692
693 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
694 if (l == InvalidOffsetNumber)
695 elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
696 (int) itemsz);
697
698 ninserted++;
699 }
700 }
701
702 /*
703 * number of tuples inserted must be same as requested in REDO record.
704 */
705 Assert(ninserted == xldata->ntups);
706
707 /*
708 * if the page on which are adding tuples is a page previous to freed
709 * overflow page, then update its nextblno.
710 */
711 if (xldata->is_prev_bucket_same_wrt)
712 {
713 HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
714
715 writeopaque->hasho_nextblkno = xldata->nextblkno;
716 }
717
718 PageSetLSN(writepage, lsn);
719 MarkBufferDirty(writebuf);
720 }
721
722 /* replay the record for initializing overflow buffer */
723 if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
724 {
725 Page ovflpage;
726 HashPageOpaque ovflopaque;
727
728 ovflpage = BufferGetPage(ovflbuf);
729
730 _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
731
732 ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
733
734 ovflopaque->hasho_prevblkno = InvalidBlockNumber;
735 ovflopaque->hasho_nextblkno = InvalidBlockNumber;
736 ovflopaque->hasho_bucket = -1;
737 ovflopaque->hasho_flag = LH_UNUSED_PAGE;
738 ovflopaque->hasho_page_id = HASHO_PAGE_ID;
739
740 PageSetLSN(ovflpage, lsn);
741 MarkBufferDirty(ovflbuf);
742 }
743 if (BufferIsValid(ovflbuf))
744 UnlockReleaseBuffer(ovflbuf);
745
746 /* replay the record for page previous to the freed overflow page */
747 if (!xldata->is_prev_bucket_same_wrt &&
748 XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
749 {
750 Page prevpage = BufferGetPage(prevbuf);
751 HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
752
753 prevopaque->hasho_nextblkno = xldata->nextblkno;
754
755 PageSetLSN(prevpage, lsn);
756 MarkBufferDirty(prevbuf);
757 }
758 if (BufferIsValid(prevbuf))
759 UnlockReleaseBuffer(prevbuf);
760
761 /* replay the record for page next to the freed overflow page */
762 if (XLogRecHasBlockRef(record, 4))
763 {
764 Buffer nextbuf;
765
766 if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
767 {
768 Page nextpage = BufferGetPage(nextbuf);
769 HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
770
771 nextopaque->hasho_prevblkno = xldata->prevblkno;
772
773 PageSetLSN(nextpage, lsn);
774 MarkBufferDirty(nextbuf);
775 }
776 if (BufferIsValid(nextbuf))
777 UnlockReleaseBuffer(nextbuf);
778 }
779
780 if (BufferIsValid(writebuf))
781 UnlockReleaseBuffer(writebuf);
782
783 if (BufferIsValid(bucketbuf))
784 UnlockReleaseBuffer(bucketbuf);
785
786 /*
787 * Note: in normal operation, we'd update the bitmap and meta page while
788 * still holding lock on the primary bucket page and overflow pages. But
789 * during replay it's not necessary to hold those locks, since no other
790 * index updates can be happening concurrently.
791 */
792 /* replay the record for bitmap page */
793 if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
794 {
795 Page mappage = (Page) BufferGetPage(mapbuf);
796 uint32 *freep = NULL;
797 char *data;
798 uint32 *bitmap_page_bit;
799 Size datalen;
800
801 freep = HashPageGetBitmap(mappage);
802
803 data = XLogRecGetBlockData(record, 5, &datalen);
804 bitmap_page_bit = (uint32 *) data;
805
806 CLRBIT(freep, *bitmap_page_bit);
807
808 PageSetLSN(mappage, lsn);
809 MarkBufferDirty(mapbuf);
810 }
811 if (BufferIsValid(mapbuf))
812 UnlockReleaseBuffer(mapbuf);
813
814 /* replay the record for meta page */
815 if (XLogRecHasBlockRef(record, 6))
816 {
817 Buffer metabuf;
818
819 if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
820 {
821 HashMetaPage metap;
822 Page page;
823 char *data;
824 uint32 *firstfree_ovflpage;
825 Size datalen;
826
827 data = XLogRecGetBlockData(record, 6, &datalen);
828 firstfree_ovflpage = (uint32 *) data;
829
830 page = BufferGetPage(metabuf);
831 metap = HashPageGetMeta(page);
832 metap->hashm_firstfree = *firstfree_ovflpage;
833
834 PageSetLSN(page, lsn);
835 MarkBufferDirty(metabuf);
836 }
837 if (BufferIsValid(metabuf))
838 UnlockReleaseBuffer(metabuf);
839 }
840}
841
842/*
843 * replay delete operation of hash index
844 */
845static void
846hash_xlog_delete(XLogReaderState *record)
847{
848 XLogRecPtr lsn = record->EndRecPtr;
849 xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
850 Buffer bucketbuf = InvalidBuffer;
851 Buffer deletebuf;
852 Page page;
853 XLogRedoAction action;
854
855 /*
856 * Ensure we have a cleanup lock on primary bucket page before we start
857 * with the actual replay operation. This is to ensure that neither a
858 * scan can start nor a scan can be already-in-progress during the replay
859 * of this operation. If we allow scans during this operation, then they
860 * can miss some records or show the same record multiple times.
861 */
862 if (xldata->is_primary_bucket_page)
863 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
864 else
865 {
866 /*
867 * we don't care for return value as the purpose of reading bucketbuf
868 * is to ensure a cleanup lock on primary bucket page.
869 */
870 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
871
872 action = XLogReadBufferForRedo(record, 1, &deletebuf);
873 }
874
875 /* replay the record for deleting entries in bucket page */
876 if (action == BLK_NEEDS_REDO)
877 {
878 char *ptr;
879 Size len;
880
881 ptr = XLogRecGetBlockData(record, 1, &len);
882
883 page = (Page) BufferGetPage(deletebuf);
884
885 if (len > 0)
886 {
887 OffsetNumber *unused;
888 OffsetNumber *unend;
889
890 unused = (OffsetNumber *) ptr;
891 unend = (OffsetNumber *) ((char *) ptr + len);
892
893 if ((unend - unused) > 0)
894 PageIndexMultiDelete(page, unused, unend - unused);
895 }
896
897 /*
898 * Mark the page as not containing any LP_DEAD items only if
899 * clear_dead_marking flag is set to true. See comments in
900 * hashbucketcleanup() for details.
901 */
902 if (xldata->clear_dead_marking)
903 {
904 HashPageOpaque pageopaque;
905
906 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
907 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
908 }
909
910 PageSetLSN(page, lsn);
911 MarkBufferDirty(deletebuf);
912 }
913 if (BufferIsValid(deletebuf))
914 UnlockReleaseBuffer(deletebuf);
915
916 if (BufferIsValid(bucketbuf))
917 UnlockReleaseBuffer(bucketbuf);
918}
919
920/*
921 * replay split cleanup flag operation for primary bucket page.
922 */
923static void
924hash_xlog_split_cleanup(XLogReaderState *record)
925{
926 XLogRecPtr lsn = record->EndRecPtr;
927 Buffer buffer;
928 Page page;
929
930 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
931 {
932 HashPageOpaque bucket_opaque;
933
934 page = (Page) BufferGetPage(buffer);
935
936 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
937 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
938 PageSetLSN(page, lsn);
939 MarkBufferDirty(buffer);
940 }
941 if (BufferIsValid(buffer))
942 UnlockReleaseBuffer(buffer);
943}
944
945/*
946 * replay for update meta page
947 */
948static void
949hash_xlog_update_meta_page(XLogReaderState *record)
950{
951 HashMetaPage metap;
952 XLogRecPtr lsn = record->EndRecPtr;
953 xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
954 Buffer metabuf;
955 Page page;
956
957 if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
958 {
959 page = BufferGetPage(metabuf);
960 metap = HashPageGetMeta(page);
961
962 metap->hashm_ntuples = xldata->ntuples;
963
964 PageSetLSN(page, lsn);
965 MarkBufferDirty(metabuf);
966 }
967 if (BufferIsValid(metabuf))
968 UnlockReleaseBuffer(metabuf);
969}
970
971/*
972 * replay delete operation in hash index to remove
973 * tuples marked as DEAD during index tuple insertion.
974 */
975static void
976hash_xlog_vacuum_one_page(XLogReaderState *record)
977{
978 XLogRecPtr lsn = record->EndRecPtr;
979 xl_hash_vacuum_one_page *xldata;
980 Buffer buffer;
981 Buffer metabuf;
982 Page page;
983 XLogRedoAction action;
984 HashPageOpaque pageopaque;
985
986 xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
987
988 /*
989 * If we have any conflict processing to do, it must happen before we
990 * update the page.
991 *
992 * Hash index records that are marked as LP_DEAD and being removed during
993 * hash index tuple insertion can conflict with standby queries. You might
994 * think that vacuum records would conflict as well, but we've handled
995 * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
996 * cleaned by the vacuum of the heap and so we can resolve any conflicts
997 * just once when that arrives. After that we know that no conflicts
998 * exist from individual hash index vacuum records on that index.
999 */
1000 if (InHotStandby)
1001 {
1002 RelFileNode rnode;
1003
1004 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
1005 ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
1006 }
1007
1008 action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
1009
1010 if (action == BLK_NEEDS_REDO)
1011 {
1012 page = (Page) BufferGetPage(buffer);
1013
1014 if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
1015 {
1016 OffsetNumber *unused;
1017
1018 unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
1019
1020 PageIndexMultiDelete(page, unused, xldata->ntuples);
1021 }
1022
1023 /*
1024 * Mark the page as not containing any LP_DEAD items. See comments in
1025 * _hash_vacuum_one_page() for details.
1026 */
1027 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
1028 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
1029
1030 PageSetLSN(page, lsn);
1031 MarkBufferDirty(buffer);
1032 }
1033 if (BufferIsValid(buffer))
1034 UnlockReleaseBuffer(buffer);
1035
1036 if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
1037 {
1038 Page metapage;
1039 HashMetaPage metap;
1040
1041 metapage = BufferGetPage(metabuf);
1042 metap = HashPageGetMeta(metapage);
1043
1044 metap->hashm_ntuples -= xldata->ntuples;
1045
1046 PageSetLSN(metapage, lsn);
1047 MarkBufferDirty(metabuf);
1048 }
1049 if (BufferIsValid(metabuf))
1050 UnlockReleaseBuffer(metabuf);
1051}
1052
1053void
1054hash_redo(XLogReaderState *record)
1055{
1056 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1057
1058 switch (info)
1059 {
1060 case XLOG_HASH_INIT_META_PAGE:
1061 hash_xlog_init_meta_page(record);
1062 break;
1063 case XLOG_HASH_INIT_BITMAP_PAGE:
1064 hash_xlog_init_bitmap_page(record);
1065 break;
1066 case XLOG_HASH_INSERT:
1067 hash_xlog_insert(record);
1068 break;
1069 case XLOG_HASH_ADD_OVFL_PAGE:
1070 hash_xlog_add_ovfl_page(record);
1071 break;
1072 case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
1073 hash_xlog_split_allocate_page(record);
1074 break;
1075 case XLOG_HASH_SPLIT_PAGE:
1076 hash_xlog_split_page(record);
1077 break;
1078 case XLOG_HASH_SPLIT_COMPLETE:
1079 hash_xlog_split_complete(record);
1080 break;
1081 case XLOG_HASH_MOVE_PAGE_CONTENTS:
1082 hash_xlog_move_page_contents(record);
1083 break;
1084 case XLOG_HASH_SQUEEZE_PAGE:
1085 hash_xlog_squeeze_page(record);
1086 break;
1087 case XLOG_HASH_DELETE:
1088 hash_xlog_delete(record);
1089 break;
1090 case XLOG_HASH_SPLIT_CLEANUP:
1091 hash_xlog_split_cleanup(record);
1092 break;
1093 case XLOG_HASH_UPDATE_META_PAGE:
1094 hash_xlog_update_meta_page(record);
1095 break;
1096 case XLOG_HASH_VACUUM_ONE_PAGE:
1097 hash_xlog_vacuum_one_page(record);
1098 break;
1099 default:
1100 elog(PANIC, "hash_redo: unknown op code %u", info);
1101 }
1102}
1103
1104/*
1105 * Mask a hash page before performing consistency checks on it.
1106 */
1107void
1108hash_mask(char *pagedata, BlockNumber blkno)
1109{
1110 Page page = (Page) pagedata;
1111 HashPageOpaque opaque;
1112 int pagetype;
1113
1114 mask_page_lsn_and_checksum(page);
1115
1116 mask_page_hint_bits(page);
1117 mask_unused_space(page);
1118
1119 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
1120
1121 pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
1122 if (pagetype == LH_UNUSED_PAGE)
1123 {
1124 /*
1125 * Mask everything on a UNUSED page.
1126 */
1127 mask_page_content(page);
1128 }
1129 else if (pagetype == LH_BUCKET_PAGE ||
1130 pagetype == LH_OVERFLOW_PAGE)
1131 {
1132 /*
1133 * In hash bucket and overflow pages, it is possible to modify the
1134 * LP_FLAGS without emitting any WAL record. Hence, mask the line
1135 * pointer flags. See hashgettuple(), _hash_kill_items() for details.
1136 */
1137 mask_lp_flags(page);
1138 }
1139
1140 /*
1141 * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
1142 * unlogged. So, mask it. See _hash_kill_items() for details.
1143 */
1144 opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
1145}
1146