1/*-------------------------------------------------------------------------
2 *
3 * hash.c
4 * Implementation of Margo Seltzer's Hashing package for postgres.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/hash/hash.c
12 *
13 * NOTES
14 * This file contains only the public interface routines.
15 *
16 *-------------------------------------------------------------------------
17 */
18
19#include "postgres.h"
20
21#include "access/hash.h"
22#include "access/hash_xlog.h"
23#include "access/relscan.h"
24#include "access/tableam.h"
25#include "catalog/index.h"
26#include "commands/progress.h"
27#include "commands/vacuum.h"
28#include "miscadmin.h"
29#include "optimizer/plancat.h"
30#include "pgstat.h"
31#include "utils/builtins.h"
32#include "utils/index_selfuncs.h"
33#include "utils/rel.h"
34#include "miscadmin.h"
35
36
37/* Working state for hashbuild and its callback */
38typedef struct
39{
40 HSpool *spool; /* NULL if not using spooling */
41 double indtuples; /* # tuples accepted into index */
42 Relation heapRel; /* heap relation descriptor */
43} HashBuildState;
44
45static void hashbuildCallback(Relation index,
46 HeapTuple htup,
47 Datum *values,
48 bool *isnull,
49 bool tupleIsAlive,
50 void *state);
51
52
53/*
54 * Hash handler function: return IndexAmRoutine with access method parameters
55 * and callbacks.
56 */
57Datum
58hashhandler(PG_FUNCTION_ARGS)
59{
60 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
61
62 amroutine->amstrategies = HTMaxStrategyNumber;
63 amroutine->amsupport = HASHNProcs;
64 amroutine->amcanorder = false;
65 amroutine->amcanorderbyop = false;
66 amroutine->amcanbackward = true;
67 amroutine->amcanunique = false;
68 amroutine->amcanmulticol = false;
69 amroutine->amoptionalkey = false;
70 amroutine->amsearcharray = false;
71 amroutine->amsearchnulls = false;
72 amroutine->amstorage = false;
73 amroutine->amclusterable = false;
74 amroutine->ampredlocks = true;
75 amroutine->amcanparallel = false;
76 amroutine->amcaninclude = false;
77 amroutine->amkeytype = INT4OID;
78
79 amroutine->ambuild = hashbuild;
80 amroutine->ambuildempty = hashbuildempty;
81 amroutine->aminsert = hashinsert;
82 amroutine->ambulkdelete = hashbulkdelete;
83 amroutine->amvacuumcleanup = hashvacuumcleanup;
84 amroutine->amcanreturn = NULL;
85 amroutine->amcostestimate = hashcostestimate;
86 amroutine->amoptions = hashoptions;
87 amroutine->amproperty = NULL;
88 amroutine->ambuildphasename = NULL;
89 amroutine->amvalidate = hashvalidate;
90 amroutine->ambeginscan = hashbeginscan;
91 amroutine->amrescan = hashrescan;
92 amroutine->amgettuple = hashgettuple;
93 amroutine->amgetbitmap = hashgetbitmap;
94 amroutine->amendscan = hashendscan;
95 amroutine->ammarkpos = NULL;
96 amroutine->amrestrpos = NULL;
97 amroutine->amestimateparallelscan = NULL;
98 amroutine->aminitparallelscan = NULL;
99 amroutine->amparallelrescan = NULL;
100
101 PG_RETURN_POINTER(amroutine);
102}
103
104/*
105 * hashbuild() -- build a new hash index.
106 */
107IndexBuildResult *
108hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
109{
110 IndexBuildResult *result;
111 BlockNumber relpages;
112 double reltuples;
113 double allvisfrac;
114 uint32 num_buckets;
115 long sort_threshold;
116 HashBuildState buildstate;
117
118 /*
119 * We expect to be called exactly once for any index relation. If that's
120 * not the case, big trouble's what we have.
121 */
122 if (RelationGetNumberOfBlocks(index) != 0)
123 elog(ERROR, "index \"%s\" already contains data",
124 RelationGetRelationName(index));
125
126 /* Estimate the number of rows currently present in the table */
127 estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
128
129 /* Initialize the hash index metadata page and initial buckets */
130 num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
131
132 /*
133 * If we just insert the tuples into the index in scan order, then
134 * (assuming their hash codes are pretty random) there will be no locality
135 * of access to the index, and if the index is bigger than available RAM
136 * then we'll thrash horribly. To prevent that scenario, we can sort the
137 * tuples by (expected) bucket number. However, such a sort is useless
138 * overhead when the index does fit in RAM. We choose to sort if the
139 * initial index size exceeds maintenance_work_mem, or the number of
140 * buffers usable for the index, whichever is less. (Limiting by the
141 * number of buffers should reduce thrashing between PG buffers and kernel
142 * buffers, which seems useful even if no physical I/O results. Limiting
143 * by maintenance_work_mem is useful to allow easy testing of the sort
144 * code path, and may be useful to DBAs as an additional control knob.)
145 *
146 * NOTE: this test will need adjustment if a bucket is ever different from
147 * one page. Also, "initial index size" accounting does not include the
148 * metapage, nor the first bitmap page.
149 */
150 sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
151 if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
152 sort_threshold = Min(sort_threshold, NBuffers);
153 else
154 sort_threshold = Min(sort_threshold, NLocBuffer);
155
156 if (num_buckets >= (uint32) sort_threshold)
157 buildstate.spool = _h_spoolinit(heap, index, num_buckets);
158 else
159 buildstate.spool = NULL;
160
161 /* prepare to build the index */
162 buildstate.indtuples = 0;
163 buildstate.heapRel = heap;
164
165 /* do the heap scan */
166 reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
167 hashbuildCallback,
168 (void *) &buildstate, NULL);
169 pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL,
170 buildstate.indtuples);
171
172 if (buildstate.spool)
173 {
174 /* sort the tuples and insert them into the index */
175 _h_indexbuild(buildstate.spool, buildstate.heapRel);
176 _h_spooldestroy(buildstate.spool);
177 }
178
179 /*
180 * Return statistics
181 */
182 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
183
184 result->heap_tuples = reltuples;
185 result->index_tuples = buildstate.indtuples;
186
187 return result;
188}
189
190/*
191 * hashbuildempty() -- build an empty hash index in the initialization fork
192 */
193void
194hashbuildempty(Relation index)
195{
196 _hash_init(index, 0, INIT_FORKNUM);
197}
198
199/*
200 * Per-tuple callback for table_index_build_scan
201 */
202static void
203hashbuildCallback(Relation index,
204 HeapTuple htup,
205 Datum *values,
206 bool *isnull,
207 bool tupleIsAlive,
208 void *state)
209{
210 HashBuildState *buildstate = (HashBuildState *) state;
211 Datum index_values[1];
212 bool index_isnull[1];
213 IndexTuple itup;
214
215 /* convert data to a hash key; on failure, do not insert anything */
216 if (!_hash_convert_tuple(index,
217 values, isnull,
218 index_values, index_isnull))
219 return;
220
221 /* Either spool the tuple for sorting, or just put it into the index */
222 if (buildstate->spool)
223 _h_spool(buildstate->spool, &htup->t_self,
224 index_values, index_isnull);
225 else
226 {
227 /* form an index tuple and point it at the heap tuple */
228 itup = index_form_tuple(RelationGetDescr(index),
229 index_values, index_isnull);
230 itup->t_tid = htup->t_self;
231 _hash_doinsert(index, itup, buildstate->heapRel);
232 pfree(itup);
233 }
234
235 buildstate->indtuples += 1;
236}
237
238/*
239 * hashinsert() -- insert an index tuple into a hash table.
240 *
241 * Hash on the heap tuple's key, form an index tuple with hash code.
242 * Find the appropriate location for the new tuple, and put it there.
243 */
244bool
245hashinsert(Relation rel, Datum *values, bool *isnull,
246 ItemPointer ht_ctid, Relation heapRel,
247 IndexUniqueCheck checkUnique,
248 IndexInfo *indexInfo)
249{
250 Datum index_values[1];
251 bool index_isnull[1];
252 IndexTuple itup;
253
254 /* convert data to a hash key; on failure, do not insert anything */
255 if (!_hash_convert_tuple(rel,
256 values, isnull,
257 index_values, index_isnull))
258 return false;
259
260 /* form an index tuple and point it at the heap tuple */
261 itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
262 itup->t_tid = *ht_ctid;
263
264 _hash_doinsert(rel, itup, heapRel);
265
266 pfree(itup);
267
268 return false;
269}
270
271
272/*
273 * hashgettuple() -- Get the next tuple in the scan.
274 */
275bool
276hashgettuple(IndexScanDesc scan, ScanDirection dir)
277{
278 HashScanOpaque so = (HashScanOpaque) scan->opaque;
279 bool res;
280
281 /* Hash indexes are always lossy since we store only the hash code */
282 scan->xs_recheck = true;
283
284 /*
285 * If we've already initialized this scan, we can just advance it in the
286 * appropriate direction. If we haven't done so yet, we call a routine to
287 * get the first item in the scan.
288 */
289 if (!HashScanPosIsValid(so->currPos))
290 res = _hash_first(scan, dir);
291 else
292 {
293 /*
294 * Check to see if we should kill the previously-fetched tuple.
295 */
296 if (scan->kill_prior_tuple)
297 {
298 /*
299 * Yes, so remember it for later. (We'll deal with all such tuples
300 * at once right after leaving the index page or at end of scan.)
301 * In case if caller reverses the indexscan direction it is quite
302 * possible that the same item might get entered multiple times.
303 * But, we don't detect that; instead, we just forget any excess
304 * entries.
305 */
306 if (so->killedItems == NULL)
307 so->killedItems = (int *)
308 palloc(MaxIndexTuplesPerPage * sizeof(int));
309
310 if (so->numKilled < MaxIndexTuplesPerPage)
311 so->killedItems[so->numKilled++] = so->currPos.itemIndex;
312 }
313
314 /*
315 * Now continue the scan.
316 */
317 res = _hash_next(scan, dir);
318 }
319
320 return res;
321}
322
323
324/*
325 * hashgetbitmap() -- get all tuples at once
326 */
327int64
328hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
329{
330 HashScanOpaque so = (HashScanOpaque) scan->opaque;
331 bool res;
332 int64 ntids = 0;
333 HashScanPosItem *currItem;
334
335 res = _hash_first(scan, ForwardScanDirection);
336
337 while (res)
338 {
339 currItem = &so->currPos.items[so->currPos.itemIndex];
340
341 /*
342 * _hash_first and _hash_next handle eliminate dead index entries
343 * whenever scan->ignore_killed_tuples is true. Therefore, there's
344 * nothing to do here except add the results to the TIDBitmap.
345 */
346 tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
347 ntids++;
348
349 res = _hash_next(scan, ForwardScanDirection);
350 }
351
352 return ntids;
353}
354
355
356/*
357 * hashbeginscan() -- start a scan on a hash index
358 */
359IndexScanDesc
360hashbeginscan(Relation rel, int nkeys, int norderbys)
361{
362 IndexScanDesc scan;
363 HashScanOpaque so;
364
365 /* no order by operators allowed */
366 Assert(norderbys == 0);
367
368 scan = RelationGetIndexScan(rel, nkeys, norderbys);
369
370 so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
371 HashScanPosInvalidate(so->currPos);
372 so->hashso_bucket_buf = InvalidBuffer;
373 so->hashso_split_bucket_buf = InvalidBuffer;
374
375 so->hashso_buc_populated = false;
376 so->hashso_buc_split = false;
377
378 so->killedItems = NULL;
379 so->numKilled = 0;
380
381 scan->opaque = so;
382
383 return scan;
384}
385
386/*
387 * hashrescan() -- rescan an index relation
388 */
389void
390hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
391 ScanKey orderbys, int norderbys)
392{
393 HashScanOpaque so = (HashScanOpaque) scan->opaque;
394 Relation rel = scan->indexRelation;
395
396 if (HashScanPosIsValid(so->currPos))
397 {
398 /* Before leaving current page, deal with any killed items */
399 if (so->numKilled > 0)
400 _hash_kill_items(scan);
401 }
402
403 _hash_dropscanbuf(rel, so);
404
405 /* set position invalid (this will cause _hash_first call) */
406 HashScanPosInvalidate(so->currPos);
407
408 /* Update scan key, if a new one is given */
409 if (scankey && scan->numberOfKeys > 0)
410 {
411 memmove(scan->keyData,
412 scankey,
413 scan->numberOfKeys * sizeof(ScanKeyData));
414 }
415
416 so->hashso_buc_populated = false;
417 so->hashso_buc_split = false;
418}
419
420/*
421 * hashendscan() -- close down a scan
422 */
423void
424hashendscan(IndexScanDesc scan)
425{
426 HashScanOpaque so = (HashScanOpaque) scan->opaque;
427 Relation rel = scan->indexRelation;
428
429 if (HashScanPosIsValid(so->currPos))
430 {
431 /* Before leaving current page, deal with any killed items */
432 if (so->numKilled > 0)
433 _hash_kill_items(scan);
434 }
435
436 _hash_dropscanbuf(rel, so);
437
438 if (so->killedItems != NULL)
439 pfree(so->killedItems);
440 pfree(so);
441 scan->opaque = NULL;
442}
443
444/*
445 * Bulk deletion of all index entries pointing to a set of heap tuples.
446 * The set of target tuples is specified via a callback routine that tells
447 * whether any given heap tuple (identified by ItemPointer) is being deleted.
448 *
449 * This function also deletes the tuples that are moved by split to other
450 * bucket.
451 *
452 * Result: a palloc'd struct containing statistical info for VACUUM displays.
453 */
454IndexBulkDeleteResult *
455hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
456 IndexBulkDeleteCallback callback, void *callback_state)
457{
458 Relation rel = info->index;
459 double tuples_removed;
460 double num_index_tuples;
461 double orig_ntuples;
462 Bucket orig_maxbucket;
463 Bucket cur_maxbucket;
464 Bucket cur_bucket;
465 Buffer metabuf = InvalidBuffer;
466 HashMetaPage metap;
467 HashMetaPage cachedmetap;
468
469 tuples_removed = 0;
470 num_index_tuples = 0;
471
472 /*
473 * We need a copy of the metapage so that we can use its hashm_spares[]
474 * values to compute bucket page addresses, but a cached copy should be
475 * good enough. (If not, we'll detect that further down and refresh the
476 * cache as necessary.)
477 */
478 cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
479 Assert(cachedmetap != NULL);
480
481 orig_maxbucket = cachedmetap->hashm_maxbucket;
482 orig_ntuples = cachedmetap->hashm_ntuples;
483
484 /* Scan the buckets that we know exist */
485 cur_bucket = 0;
486 cur_maxbucket = orig_maxbucket;
487
488loop_top:
489 while (cur_bucket <= cur_maxbucket)
490 {
491 BlockNumber bucket_blkno;
492 BlockNumber blkno;
493 Buffer bucket_buf;
494 Buffer buf;
495 HashPageOpaque bucket_opaque;
496 Page page;
497 bool split_cleanup = false;
498
499 /* Get address of bucket's start page */
500 bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
501
502 blkno = bucket_blkno;
503
504 /*
505 * We need to acquire a cleanup lock on the primary bucket page to out
506 * wait concurrent scans before deleting the dead tuples.
507 */
508 buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
509 LockBufferForCleanup(buf);
510 _hash_checkpage(rel, buf, LH_BUCKET_PAGE);
511
512 page = BufferGetPage(buf);
513 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
514
515 /*
516 * If the bucket contains tuples that are moved by split, then we need
517 * to delete such tuples. We can't delete such tuples if the split
518 * operation on bucket is not finished as those are needed by scans.
519 */
520 if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
521 H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
522 {
523 split_cleanup = true;
524
525 /*
526 * This bucket might have been split since we last held a lock on
527 * the metapage. If so, hashm_maxbucket, hashm_highmask and
528 * hashm_lowmask might be old enough to cause us to fail to remove
529 * tuples left behind by the most recent split. To prevent that,
530 * now that the primary page of the target bucket has been locked
531 * (and thus can't be further split), check whether we need to
532 * update our cached metapage data.
533 */
534 Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
535 if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
536 {
537 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
538 Assert(cachedmetap != NULL);
539 }
540 }
541
542 bucket_buf = buf;
543
544 hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
545 cachedmetap->hashm_maxbucket,
546 cachedmetap->hashm_highmask,
547 cachedmetap->hashm_lowmask, &tuples_removed,
548 &num_index_tuples, split_cleanup,
549 callback, callback_state);
550
551 _hash_dropbuf(rel, bucket_buf);
552
553 /* Advance to next bucket */
554 cur_bucket++;
555 }
556
557 if (BufferIsInvalid(metabuf))
558 metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
559
560 /* Write-lock metapage and check for split since we started */
561 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
562 metap = HashPageGetMeta(BufferGetPage(metabuf));
563
564 if (cur_maxbucket != metap->hashm_maxbucket)
565 {
566 /* There's been a split, so process the additional bucket(s) */
567 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
568 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
569 Assert(cachedmetap != NULL);
570 cur_maxbucket = cachedmetap->hashm_maxbucket;
571 goto loop_top;
572 }
573
574 /* Okay, we're really done. Update tuple count in metapage. */
575 START_CRIT_SECTION();
576
577 if (orig_maxbucket == metap->hashm_maxbucket &&
578 orig_ntuples == metap->hashm_ntuples)
579 {
580 /*
581 * No one has split or inserted anything since start of scan, so
582 * believe our count as gospel.
583 */
584 metap->hashm_ntuples = num_index_tuples;
585 }
586 else
587 {
588 /*
589 * Otherwise, our count is untrustworthy since we may have
590 * double-scanned tuples in split buckets. Proceed by dead-reckoning.
591 * (Note: we still return estimated_count = false, because using this
592 * count is better than not updating reltuples at all.)
593 */
594 if (metap->hashm_ntuples > tuples_removed)
595 metap->hashm_ntuples -= tuples_removed;
596 else
597 metap->hashm_ntuples = 0;
598 num_index_tuples = metap->hashm_ntuples;
599 }
600
601 MarkBufferDirty(metabuf);
602
603 /* XLOG stuff */
604 if (RelationNeedsWAL(rel))
605 {
606 xl_hash_update_meta_page xlrec;
607 XLogRecPtr recptr;
608
609 xlrec.ntuples = metap->hashm_ntuples;
610
611 XLogBeginInsert();
612 XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
613
614 XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
615
616 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
617 PageSetLSN(BufferGetPage(metabuf), recptr);
618 }
619
620 END_CRIT_SECTION();
621
622 _hash_relbuf(rel, metabuf);
623
624 /* return statistics */
625 if (stats == NULL)
626 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
627 stats->estimated_count = false;
628 stats->num_index_tuples = num_index_tuples;
629 stats->tuples_removed += tuples_removed;
630 /* hashvacuumcleanup will fill in num_pages */
631
632 return stats;
633}
634
635/*
636 * Post-VACUUM cleanup.
637 *
638 * Result: a palloc'd struct containing statistical info for VACUUM displays.
639 */
640IndexBulkDeleteResult *
641hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
642{
643 Relation rel = info->index;
644 BlockNumber num_pages;
645
646 /* If hashbulkdelete wasn't called, return NULL signifying no change */
647 /* Note: this covers the analyze_only case too */
648 if (stats == NULL)
649 return NULL;
650
651 /* update statistics */
652 num_pages = RelationGetNumberOfBlocks(rel);
653 stats->num_pages = num_pages;
654
655 return stats;
656}
657
658/*
659 * Helper function to perform deletion of index entries from a bucket.
660 *
661 * This function expects that the caller has acquired a cleanup lock on the
662 * primary bucket page, and will return with a write lock again held on the
663 * primary bucket page. The lock won't necessarily be held continuously,
664 * though, because we'll release it when visiting overflow pages.
665 *
666 * There can't be any concurrent scans in progress when we first enter this
667 * function because of the cleanup lock we hold on the primary bucket page,
668 * but as soon as we release that lock, there might be. If those scans got
669 * ahead of our cleanup scan, they might see a tuple before we kill it and
670 * wake up only after VACUUM has completed and the TID has been recycled for
671 * an unrelated tuple. To avoid that calamity, we prevent scans from passing
672 * our cleanup scan by locking the next page in the bucket chain before
673 * releasing the lock on the previous page. (This type of lock chaining is not
674 * ideal, so we might want to look for a better solution at some point.)
675 *
676 * We need to retain a pin on the primary bucket to ensure that no concurrent
677 * split can start.
678 */
679void
680hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
681 BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
682 uint32 maxbucket, uint32 highmask, uint32 lowmask,
683 double *tuples_removed, double *num_index_tuples,
684 bool split_cleanup,
685 IndexBulkDeleteCallback callback, void *callback_state)
686{
687 BlockNumber blkno;
688 Buffer buf;
689 Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
690 bool bucket_dirty = false;
691
692 blkno = bucket_blkno;
693 buf = bucket_buf;
694
695 if (split_cleanup)
696 new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
697 lowmask, maxbucket);
698
699 /* Scan each page in bucket */
700 for (;;)
701 {
702 HashPageOpaque opaque;
703 OffsetNumber offno;
704 OffsetNumber maxoffno;
705 Buffer next_buf;
706 Page page;
707 OffsetNumber deletable[MaxOffsetNumber];
708 int ndeletable = 0;
709 bool retain_pin = false;
710 bool clear_dead_marking = false;
711
712 vacuum_delay_point();
713
714 page = BufferGetPage(buf);
715 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
716
717 /* Scan each tuple in page */
718 maxoffno = PageGetMaxOffsetNumber(page);
719 for (offno = FirstOffsetNumber;
720 offno <= maxoffno;
721 offno = OffsetNumberNext(offno))
722 {
723 ItemPointer htup;
724 IndexTuple itup;
725 Bucket bucket;
726 bool kill_tuple = false;
727
728 itup = (IndexTuple) PageGetItem(page,
729 PageGetItemId(page, offno));
730 htup = &(itup->t_tid);
731
732 /*
733 * To remove the dead tuples, we strictly want to rely on results
734 * of callback function. refer btvacuumpage for detailed reason.
735 */
736 if (callback && callback(htup, callback_state))
737 {
738 kill_tuple = true;
739 if (tuples_removed)
740 *tuples_removed += 1;
741 }
742 else if (split_cleanup)
743 {
744 /* delete the tuples that are moved by split. */
745 bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
746 maxbucket,
747 highmask,
748 lowmask);
749 /* mark the item for deletion */
750 if (bucket != cur_bucket)
751 {
752 /*
753 * We expect tuples to either belong to current bucket or
754 * new_bucket. This is ensured because we don't allow
755 * further splits from bucket that contains garbage. See
756 * comments in _hash_expandtable.
757 */
758 Assert(bucket == new_bucket);
759 kill_tuple = true;
760 }
761 }
762
763 if (kill_tuple)
764 {
765 /* mark the item for deletion */
766 deletable[ndeletable++] = offno;
767 }
768 else
769 {
770 /* we're keeping it, so count it */
771 if (num_index_tuples)
772 *num_index_tuples += 1;
773 }
774 }
775
776 /* retain the pin on primary bucket page till end of bucket scan */
777 if (blkno == bucket_blkno)
778 retain_pin = true;
779 else
780 retain_pin = false;
781
782 blkno = opaque->hasho_nextblkno;
783
784 /*
785 * Apply deletions, advance to next page and write page if needed.
786 */
787 if (ndeletable > 0)
788 {
789 /* No ereport(ERROR) until changes are logged */
790 START_CRIT_SECTION();
791
792 PageIndexMultiDelete(page, deletable, ndeletable);
793 bucket_dirty = true;
794
795 /*
796 * Let us mark the page as clean if vacuum removes the DEAD tuples
797 * from an index page. We do this by clearing
798 * LH_PAGE_HAS_DEAD_TUPLES flag.
799 */
800 if (tuples_removed && *tuples_removed > 0 &&
801 H_HAS_DEAD_TUPLES(opaque))
802 {
803 opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
804 clear_dead_marking = true;
805 }
806
807 MarkBufferDirty(buf);
808
809 /* XLOG stuff */
810 if (RelationNeedsWAL(rel))
811 {
812 xl_hash_delete xlrec;
813 XLogRecPtr recptr;
814
815 xlrec.clear_dead_marking = clear_dead_marking;
816 xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
817
818 XLogBeginInsert();
819 XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
820
821 /*
822 * bucket buffer needs to be registered to ensure that we can
823 * acquire a cleanup lock on it during replay.
824 */
825 if (!xlrec.is_primary_bucket_page)
826 XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
827
828 XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
829 XLogRegisterBufData(1, (char *) deletable,
830 ndeletable * sizeof(OffsetNumber));
831
832 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
833 PageSetLSN(BufferGetPage(buf), recptr);
834 }
835
836 END_CRIT_SECTION();
837 }
838
839 /* bail out if there are no more pages to scan. */
840 if (!BlockNumberIsValid(blkno))
841 break;
842
843 next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
844 LH_OVERFLOW_PAGE,
845 bstrategy);
846
847 /*
848 * release the lock on previous page after acquiring the lock on next
849 * page
850 */
851 if (retain_pin)
852 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
853 else
854 _hash_relbuf(rel, buf);
855
856 buf = next_buf;
857 }
858
859 /*
860 * lock the bucket page to clear the garbage flag and squeeze the bucket.
861 * if the current buffer is same as bucket buffer, then we already have
862 * lock on bucket page.
863 */
864 if (buf != bucket_buf)
865 {
866 _hash_relbuf(rel, buf);
867 LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
868 }
869
870 /*
871 * Clear the garbage flag from bucket after deleting the tuples that are
872 * moved by split. We purposefully clear the flag before squeeze bucket,
873 * so that after restart, vacuum shouldn't again try to delete the moved
874 * by split tuples.
875 */
876 if (split_cleanup)
877 {
878 HashPageOpaque bucket_opaque;
879 Page page;
880
881 page = BufferGetPage(bucket_buf);
882 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
883
884 /* No ereport(ERROR) until changes are logged */
885 START_CRIT_SECTION();
886
887 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
888 MarkBufferDirty(bucket_buf);
889
890 /* XLOG stuff */
891 if (RelationNeedsWAL(rel))
892 {
893 XLogRecPtr recptr;
894
895 XLogBeginInsert();
896 XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
897
898 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
899 PageSetLSN(page, recptr);
900 }
901
902 END_CRIT_SECTION();
903 }
904
905 /*
906 * If we have deleted anything, try to compact free space. For squeezing
907 * the bucket, we must have a cleanup lock, else it can impact the
908 * ordering of tuples for a scan that has started before it.
909 */
910 if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
911 _hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
912 bstrategy);
913 else
914 LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
915}
916