1/*-------------------------------------------------------------------------
2 *
3 * hashinsert.c
4 * Item insertion in hash tables for Postgres.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/hash/hashinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/hash.h"
19#include "access/hash_xlog.h"
20#include "miscadmin.h"
21#include "utils/rel.h"
22#include "storage/lwlock.h"
23#include "storage/buf_internals.h"
24#include "storage/predicate.h"
25
26static void _hash_vacuum_one_page(Relation rel, Relation hrel,
27 Buffer metabuf, Buffer buf);
28
29/*
30 * _hash_doinsert() -- Handle insertion of a single index tuple.
31 *
32 * This routine is called by the public interface routines, hashbuild
33 * and hashinsert. By here, itup is completely filled in.
34 */
35void
36_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
37{
38 Buffer buf = InvalidBuffer;
39 Buffer bucket_buf;
40 Buffer metabuf;
41 HashMetaPage metap;
42 HashMetaPage usedmetap = NULL;
43 Page metapage;
44 Page page;
45 HashPageOpaque pageopaque;
46 Size itemsz;
47 bool do_expand;
48 uint32 hashkey;
49 Bucket bucket;
50 OffsetNumber itup_off;
51
52 /*
53 * Get the hash key for the item (it's stored in the index tuple itself).
54 */
55 hashkey = _hash_get_indextuple_hashkey(itup);
56
57 /* compute item size too */
58 itemsz = IndexTupleSize(itup);
59 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
60 * need to be consistent */
61
62restart_insert:
63
64 /*
65 * Read the metapage. We don't lock it yet; HashMaxItemSize() will
66 * examine pd_pagesize_version, but that can't change so we can examine it
67 * without a lock.
68 */
69 metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
70 metapage = BufferGetPage(metabuf);
71
72 /*
73 * Check whether the item can fit on a hash page at all. (Eventually, we
74 * ought to try to apply TOAST methods if not.) Note that at this point,
75 * itemsz doesn't include the ItemId.
76 *
77 * XXX this is useless code if we are only storing hash keys.
78 */
79 if (itemsz > HashMaxItemSize(metapage))
80 ereport(ERROR,
81 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
82 errmsg("index row size %zu exceeds hash maximum %zu",
83 itemsz, HashMaxItemSize(metapage)),
84 errhint("Values larger than a buffer page cannot be indexed.")));
85
86 /* Lock the primary bucket page for the target bucket. */
87 buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
88 &usedmetap);
89 Assert(usedmetap != NULL);
90
91 CheckForSerializableConflictIn(rel, NULL, buf);
92
93 /* remember the primary bucket buffer to release the pin on it at end. */
94 bucket_buf = buf;
95
96 page = BufferGetPage(buf);
97 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
98 bucket = pageopaque->hasho_bucket;
99
100 /*
101 * If this bucket is in the process of being split, try to finish the
102 * split before inserting, because that might create room for the
103 * insertion to proceed without allocating an additional overflow page.
104 * It's only interesting to finish the split if we're trying to insert
105 * into the bucket from which we're removing tuples (the "old" bucket),
106 * not if we're trying to insert into the bucket into which tuples are
107 * being moved (the "new" bucket).
108 */
109 if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
110 {
111 /* release the lock on bucket buffer, before completing the split. */
112 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
113
114 _hash_finish_split(rel, metabuf, buf, bucket,
115 usedmetap->hashm_maxbucket,
116 usedmetap->hashm_highmask,
117 usedmetap->hashm_lowmask);
118
119 /* release the pin on old and meta buffer. retry for insert. */
120 _hash_dropbuf(rel, buf);
121 _hash_dropbuf(rel, metabuf);
122 goto restart_insert;
123 }
124
125 /* Do the insertion */
126 while (PageGetFreeSpace(page) < itemsz)
127 {
128 BlockNumber nextblkno;
129
130 /*
131 * Check if current page has any DEAD tuples. If yes, delete these
132 * tuples and see if we can get a space for the new item to be
133 * inserted before moving to the next page in the bucket chain.
134 */
135 if (H_HAS_DEAD_TUPLES(pageopaque))
136 {
137
138 if (IsBufferCleanupOK(buf))
139 {
140 _hash_vacuum_one_page(rel, heapRel, metabuf, buf);
141
142 if (PageGetFreeSpace(page) >= itemsz)
143 break; /* OK, now we have enough space */
144 }
145 }
146
147 /*
148 * no space on this page; check for an overflow page
149 */
150 nextblkno = pageopaque->hasho_nextblkno;
151
152 if (BlockNumberIsValid(nextblkno))
153 {
154 /*
155 * ovfl page exists; go get it. if it doesn't have room, we'll
156 * find out next pass through the loop test above. we always
157 * release both the lock and pin if this is an overflow page, but
158 * only the lock if this is the primary bucket page, since the pin
159 * on the primary bucket must be retained throughout the scan.
160 */
161 if (buf != bucket_buf)
162 _hash_relbuf(rel, buf);
163 else
164 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
165 buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
166 page = BufferGetPage(buf);
167 }
168 else
169 {
170 /*
171 * we're at the end of the bucket chain and we haven't found a
172 * page with enough room. allocate a new overflow page.
173 */
174
175 /* release our write lock without modifying buffer */
176 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
177
178 /* chain to a new overflow page */
179 buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf) ? true : false);
180 page = BufferGetPage(buf);
181
182 /* should fit now, given test above */
183 Assert(PageGetFreeSpace(page) >= itemsz);
184 }
185 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
186 Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
187 Assert(pageopaque->hasho_bucket == bucket);
188 }
189
190 /*
191 * Write-lock the metapage so we can increment the tuple count. After
192 * incrementing it, check to see if it's time for a split.
193 */
194 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
195
196 /* Do the update. No ereport(ERROR) until changes are logged */
197 START_CRIT_SECTION();
198
199 /* found page with enough space, so add the item here */
200 itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
201 MarkBufferDirty(buf);
202
203 /* metapage operations */
204 metap = HashPageGetMeta(metapage);
205 metap->hashm_ntuples += 1;
206
207 /* Make sure this stays in sync with _hash_expandtable() */
208 do_expand = metap->hashm_ntuples >
209 (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
210
211 MarkBufferDirty(metabuf);
212
213 /* XLOG stuff */
214 if (RelationNeedsWAL(rel))
215 {
216 xl_hash_insert xlrec;
217 XLogRecPtr recptr;
218
219 xlrec.offnum = itup_off;
220
221 XLogBeginInsert();
222 XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
223
224 XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
225
226 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
227 XLogRegisterBufData(0, (char *) itup, IndexTupleSize(itup));
228
229 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
230
231 PageSetLSN(BufferGetPage(buf), recptr);
232 PageSetLSN(BufferGetPage(metabuf), recptr);
233 }
234
235 END_CRIT_SECTION();
236
237 /* drop lock on metapage, but keep pin */
238 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
239
240 /*
241 * Release the modified page and ensure to release the pin on primary
242 * page.
243 */
244 _hash_relbuf(rel, buf);
245 if (buf != bucket_buf)
246 _hash_dropbuf(rel, bucket_buf);
247
248 /* Attempt to split if a split is needed */
249 if (do_expand)
250 _hash_expandtable(rel, metabuf);
251
252 /* Finally drop our pin on the metapage */
253 _hash_dropbuf(rel, metabuf);
254}
255
256/*
257 * _hash_pgaddtup() -- add a tuple to a particular page in the index.
258 *
259 * This routine adds the tuple to the page as requested; it does not write out
260 * the page. It is an error to call pgaddtup() without pin and write lock on
261 * the target buffer.
262 *
263 * Returns the offset number at which the tuple was inserted. This function
264 * is responsible for preserving the condition that tuples in a hash index
265 * page are sorted by hashkey value.
266 */
267OffsetNumber
268_hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
269{
270 OffsetNumber itup_off;
271 Page page;
272 uint32 hashkey;
273
274 _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
275 page = BufferGetPage(buf);
276
277 /* Find where to insert the tuple (preserving page's hashkey ordering) */
278 hashkey = _hash_get_indextuple_hashkey(itup);
279 itup_off = _hash_binsearch(page, hashkey);
280
281 if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
282 == InvalidOffsetNumber)
283 elog(ERROR, "failed to add index item to \"%s\"",
284 RelationGetRelationName(rel));
285
286 return itup_off;
287}
288
289/*
290 * _hash_pgaddmultitup() -- add a tuple vector to a particular page in the
291 * index.
292 *
293 * This routine has same requirements for locking and tuple ordering as
294 * _hash_pgaddtup().
295 *
296 * Returns the offset number array at which the tuples were inserted.
297 */
298void
299_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
300 OffsetNumber *itup_offsets, uint16 nitups)
301{
302 OffsetNumber itup_off;
303 Page page;
304 uint32 hashkey;
305 int i;
306
307 _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
308 page = BufferGetPage(buf);
309
310 for (i = 0; i < nitups; i++)
311 {
312 Size itemsize;
313
314 itemsize = IndexTupleSize(itups[i]);
315 itemsize = MAXALIGN(itemsize);
316
317 /* Find where to insert the tuple (preserving page's hashkey ordering) */
318 hashkey = _hash_get_indextuple_hashkey(itups[i]);
319 itup_off = _hash_binsearch(page, hashkey);
320
321 itup_offsets[i] = itup_off;
322
323 if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
324 == InvalidOffsetNumber)
325 elog(ERROR, "failed to add index item to \"%s\"",
326 RelationGetRelationName(rel));
327 }
328}
329
330/*
331 * _hash_vacuum_one_page - vacuum just one index page.
332 *
333 * Try to remove LP_DEAD items from the given page. We must acquire cleanup
334 * lock on the page being modified before calling this function.
335 */
336
337static void
338_hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
339{
340 OffsetNumber deletable[MaxOffsetNumber];
341 int ndeletable = 0;
342 OffsetNumber offnum,
343 maxoff;
344 Page page = BufferGetPage(buf);
345 HashPageOpaque pageopaque;
346 HashMetaPage metap;
347
348 /* Scan each tuple in page to see if it is marked as LP_DEAD */
349 maxoff = PageGetMaxOffsetNumber(page);
350 for (offnum = FirstOffsetNumber;
351 offnum <= maxoff;
352 offnum = OffsetNumberNext(offnum))
353 {
354 ItemId itemId = PageGetItemId(page, offnum);
355
356 if (ItemIdIsDead(itemId))
357 deletable[ndeletable++] = offnum;
358 }
359
360 if (ndeletable > 0)
361 {
362 TransactionId latestRemovedXid;
363
364 latestRemovedXid =
365 index_compute_xid_horizon_for_tuples(rel, hrel, buf,
366 deletable, ndeletable);
367
368 /*
369 * Write-lock the meta page so that we can decrement tuple count.
370 */
371 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
372
373 /* No ereport(ERROR) until changes are logged */
374 START_CRIT_SECTION();
375
376 PageIndexMultiDelete(page, deletable, ndeletable);
377
378 /*
379 * Mark the page as not containing any LP_DEAD items. This is not
380 * certainly true (there might be some that have recently been marked,
381 * but weren't included in our target-item list), but it will almost
382 * always be true and it doesn't seem worth an additional page scan to
383 * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
384 * anyway.
385 */
386 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
387 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
388
389 metap = HashPageGetMeta(BufferGetPage(metabuf));
390 metap->hashm_ntuples -= ndeletable;
391
392 MarkBufferDirty(buf);
393 MarkBufferDirty(metabuf);
394
395 /* XLOG stuff */
396 if (RelationNeedsWAL(rel))
397 {
398 xl_hash_vacuum_one_page xlrec;
399 XLogRecPtr recptr;
400
401 xlrec.latestRemovedXid = latestRemovedXid;
402 xlrec.ntuples = ndeletable;
403
404 XLogBeginInsert();
405 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
406 XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
407
408 /*
409 * We need the target-offsets array whether or not we store the
410 * whole buffer, to allow us to find the latestRemovedXid on a
411 * standby server.
412 */
413 XLogRegisterData((char *) deletable,
414 ndeletable * sizeof(OffsetNumber));
415
416 XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
417
418 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
419
420 PageSetLSN(BufferGetPage(buf), recptr);
421 PageSetLSN(BufferGetPage(metabuf), recptr);
422 }
423
424 END_CRIT_SECTION();
425
426 /*
427 * Releasing write lock on meta page as we have updated the tuple
428 * count.
429 */
430 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
431 }
432}
433