1/*-------------------------------------------------------------------------
2 *
3 * ginfast.c
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
9 *
10 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/access/gin/ginfast.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19#include "postgres.h"
20
21#include "access/gin_private.h"
22#include "access/ginxlog.h"
23#include "access/xloginsert.h"
24#include "access/xlog.h"
25#include "commands/vacuum.h"
26#include "catalog/pg_am.h"
27#include "miscadmin.h"
28#include "utils/memutils.h"
29#include "utils/rel.h"
30#include "utils/acl.h"
31#include "postmaster/autovacuum.h"
32#include "storage/indexfsm.h"
33#include "storage/lmgr.h"
34#include "storage/predicate.h"
35#include "utils/builtins.h"
36
37/* GUC parameter */
38int gin_pending_list_limit = 0;
39
40#define GIN_PAGE_FREESIZE \
41 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
42
43typedef struct KeyArray
44{
45 Datum *keys; /* expansible array */
46 GinNullCategory *categories; /* another expansible array */
47 int32 nvalues; /* current number of valid entries */
48 int32 maxvalues; /* allocated size of arrays */
49} KeyArray;
50
51
52/*
53 * Build a pending-list page from the given array of tuples, and write it out.
54 *
55 * Returns amount of free space left on the page.
56 */
57static int32
58writeListPage(Relation index, Buffer buffer,
59 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
60{
61 Page page = BufferGetPage(buffer);
62 int32 i,
63 freesize,
64 size = 0;
65 OffsetNumber l,
66 off;
67 PGAlignedBlock workspace;
68 char *ptr;
69
70 START_CRIT_SECTION();
71
72 GinInitBuffer(buffer, GIN_LIST);
73
74 off = FirstOffsetNumber;
75 ptr = workspace.data;
76
77 for (i = 0; i < ntuples; i++)
78 {
79 int this_size = IndexTupleSize(tuples[i]);
80
81 memcpy(ptr, tuples[i], this_size);
82 ptr += this_size;
83 size += this_size;
84
85 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
86
87 if (l == InvalidOffsetNumber)
88 elog(ERROR, "failed to add item to index page in \"%s\"",
89 RelationGetRelationName(index));
90
91 off++;
92 }
93
94 Assert(size <= BLCKSZ); /* else we overran workspace */
95
96 GinPageGetOpaque(page)->rightlink = rightlink;
97
98 /*
99 * tail page may contain only whole row(s) or final part of row placed on
100 * previous pages (a "row" here meaning all the index tuples generated for
101 * one heap tuple)
102 */
103 if (rightlink == InvalidBlockNumber)
104 {
105 GinPageSetFullRow(page);
106 GinPageGetOpaque(page)->maxoff = 1;
107 }
108 else
109 {
110 GinPageGetOpaque(page)->maxoff = 0;
111 }
112
113 MarkBufferDirty(buffer);
114
115 if (RelationNeedsWAL(index))
116 {
117 ginxlogInsertListPage data;
118 XLogRecPtr recptr;
119
120 data.rightlink = rightlink;
121 data.ntuples = ntuples;
122
123 XLogBeginInsert();
124 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
125
126 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
127 XLogRegisterBufData(0, workspace.data, size);
128
129 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
130 PageSetLSN(page, recptr);
131 }
132
133 /* get free space before releasing buffer */
134 freesize = PageGetExactFreeSpace(page);
135
136 UnlockReleaseBuffer(buffer);
137
138 END_CRIT_SECTION();
139
140 return freesize;
141}
142
143static void
144makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
145 GinMetaPageData *res)
146{
147 Buffer curBuffer = InvalidBuffer;
148 Buffer prevBuffer = InvalidBuffer;
149 int i,
150 size = 0,
151 tupsize;
152 int startTuple = 0;
153
154 Assert(ntuples > 0);
155
156 /*
157 * Split tuples into pages
158 */
159 for (i = 0; i < ntuples; i++)
160 {
161 if (curBuffer == InvalidBuffer)
162 {
163 curBuffer = GinNewBuffer(index);
164
165 if (prevBuffer != InvalidBuffer)
166 {
167 res->nPendingPages++;
168 writeListPage(index, prevBuffer,
169 tuples + startTuple,
170 i - startTuple,
171 BufferGetBlockNumber(curBuffer));
172 }
173 else
174 {
175 res->head = BufferGetBlockNumber(curBuffer);
176 }
177
178 prevBuffer = curBuffer;
179 startTuple = i;
180 size = 0;
181 }
182
183 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
184
185 if (size + tupsize > GinListPageSize)
186 {
187 /* won't fit, force a new page and reprocess */
188 i--;
189 curBuffer = InvalidBuffer;
190 }
191 else
192 {
193 size += tupsize;
194 }
195 }
196
197 /*
198 * Write last page
199 */
200 res->tail = BufferGetBlockNumber(curBuffer);
201 res->tailFreeSize = writeListPage(index, curBuffer,
202 tuples + startTuple,
203 ntuples - startTuple,
204 InvalidBlockNumber);
205 res->nPendingPages++;
206 /* that was only one heap tuple */
207 res->nPendingHeapTuples = 1;
208}
209
210/*
211 * Write the index tuples contained in *collector into the index's
212 * pending list.
213 *
214 * Function guarantees that all these tuples will be inserted consecutively,
215 * preserving order
216 */
217void
218ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
219{
220 Relation index = ginstate->index;
221 Buffer metabuffer;
222 Page metapage;
223 GinMetaPageData *metadata = NULL;
224 Buffer buffer = InvalidBuffer;
225 Page page = NULL;
226 ginxlogUpdateMeta data;
227 bool separateList = false;
228 bool needCleanup = false;
229 int cleanupSize;
230 bool needWal;
231
232 if (collector->ntuples == 0)
233 return;
234
235 needWal = RelationNeedsWAL(index);
236
237 data.node = index->rd_node;
238 data.ntuples = 0;
239 data.newRightlink = data.prevTail = InvalidBlockNumber;
240
241 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
242 metapage = BufferGetPage(metabuffer);
243
244 /*
245 * An insertion to the pending list could logically belong anywhere in the
246 * tree, so it conflicts with all serializable scans. All scans acquire a
247 * predicate lock on the metabuffer to represent that.
248 */
249 CheckForSerializableConflictIn(index, NULL, metabuffer);
250
251 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
252 {
253 /*
254 * Total size is greater than one page => make sublist
255 */
256 separateList = true;
257 }
258 else
259 {
260 LockBuffer(metabuffer, GIN_EXCLUSIVE);
261 metadata = GinPageGetMeta(metapage);
262
263 if (metadata->head == InvalidBlockNumber ||
264 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
265 {
266 /*
267 * Pending list is empty or total size is greater than freespace
268 * on tail page => make sublist
269 *
270 * We unlock metabuffer to keep high concurrency
271 */
272 separateList = true;
273 LockBuffer(metabuffer, GIN_UNLOCK);
274 }
275 }
276
277 if (separateList)
278 {
279 /*
280 * We should make sublist separately and append it to the tail
281 */
282 GinMetaPageData sublist;
283
284 memset(&sublist, 0, sizeof(GinMetaPageData));
285 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
286
287 if (needWal)
288 XLogBeginInsert();
289
290 /*
291 * metapage was unlocked, see above
292 */
293 LockBuffer(metabuffer, GIN_EXCLUSIVE);
294 metadata = GinPageGetMeta(metapage);
295
296 if (metadata->head == InvalidBlockNumber)
297 {
298 /*
299 * Main list is empty, so just insert sublist as main list
300 */
301 START_CRIT_SECTION();
302
303 metadata->head = sublist.head;
304 metadata->tail = sublist.tail;
305 metadata->tailFreeSize = sublist.tailFreeSize;
306
307 metadata->nPendingPages = sublist.nPendingPages;
308 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
309 }
310 else
311 {
312 /*
313 * Merge lists
314 */
315 data.prevTail = metadata->tail;
316 data.newRightlink = sublist.head;
317
318 buffer = ReadBuffer(index, metadata->tail);
319 LockBuffer(buffer, GIN_EXCLUSIVE);
320 page = BufferGetPage(buffer);
321
322 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
323
324 START_CRIT_SECTION();
325
326 GinPageGetOpaque(page)->rightlink = sublist.head;
327
328 MarkBufferDirty(buffer);
329
330 metadata->tail = sublist.tail;
331 metadata->tailFreeSize = sublist.tailFreeSize;
332
333 metadata->nPendingPages += sublist.nPendingPages;
334 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
335
336 if (needWal)
337 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
338 }
339 }
340 else
341 {
342 /*
343 * Insert into tail page. Metapage is already locked
344 */
345 OffsetNumber l,
346 off;
347 int i,
348 tupsize;
349 char *ptr;
350 char *collectordata;
351
352 buffer = ReadBuffer(index, metadata->tail);
353 LockBuffer(buffer, GIN_EXCLUSIVE);
354 page = BufferGetPage(buffer);
355
356 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
357 OffsetNumberNext(PageGetMaxOffsetNumber(page));
358
359 collectordata = ptr = (char *) palloc(collector->sumsize);
360
361 data.ntuples = collector->ntuples;
362
363 if (needWal)
364 XLogBeginInsert();
365
366 START_CRIT_SECTION();
367
368 /*
369 * Increase counter of heap tuples
370 */
371 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
372 GinPageGetOpaque(page)->maxoff++;
373 metadata->nPendingHeapTuples++;
374
375 for (i = 0; i < collector->ntuples; i++)
376 {
377 tupsize = IndexTupleSize(collector->tuples[i]);
378 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
379
380 if (l == InvalidOffsetNumber)
381 elog(ERROR, "failed to add item to index page in \"%s\"",
382 RelationGetRelationName(index));
383
384 memcpy(ptr, collector->tuples[i], tupsize);
385 ptr += tupsize;
386
387 off++;
388 }
389
390 Assert((ptr - collectordata) <= collector->sumsize);
391 if (needWal)
392 {
393 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
394 XLogRegisterBufData(1, collectordata, collector->sumsize);
395 }
396
397 metadata->tailFreeSize = PageGetExactFreeSpace(page);
398
399 MarkBufferDirty(buffer);
400 }
401
402 /*
403 * Set pd_lower just past the end of the metadata. This is essential,
404 * because without doing so, metadata will be lost if xlog.c compresses
405 * the page. (We must do this here because pre-v11 versions of PG did not
406 * set the metapage's pd_lower correctly, so a pg_upgraded index might
407 * contain the wrong value.)
408 */
409 ((PageHeader) metapage)->pd_lower =
410 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
411
412 /*
413 * Write metabuffer, make xlog entry
414 */
415 MarkBufferDirty(metabuffer);
416
417 if (needWal)
418 {
419 XLogRecPtr recptr;
420
421 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
422
423 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
424 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
425
426 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
427 PageSetLSN(metapage, recptr);
428
429 if (buffer != InvalidBuffer)
430 {
431 PageSetLSN(page, recptr);
432 }
433 }
434
435 if (buffer != InvalidBuffer)
436 UnlockReleaseBuffer(buffer);
437
438 /*
439 * Force pending list cleanup when it becomes too long. And,
440 * ginInsertCleanup could take significant amount of time, so we prefer to
441 * call it when it can do all the work in a single collection cycle. In
442 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
443 * while pending list is still small enough to fit into
444 * gin_pending_list_limit.
445 *
446 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
447 */
448 cleanupSize = GinGetPendingListCleanupSize(index);
449 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
450 needCleanup = true;
451
452 UnlockReleaseBuffer(metabuffer);
453
454 END_CRIT_SECTION();
455
456 /*
457 * Since it could contend with concurrent cleanup process we cleanup
458 * pending list not forcibly.
459 */
460 if (needCleanup)
461 ginInsertCleanup(ginstate, false, true, false, NULL);
462}
463
464/*
465 * Create temporary index tuples for a single indexable item (one index column
466 * for the heap tuple specified by ht_ctid), and append them to the array
467 * in *collector. They will subsequently be written out using
468 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
469 * temp tuples for a given heap tuple must be written in one call to
470 * ginHeapTupleFastInsert.
471 */
472void
473ginHeapTupleFastCollect(GinState *ginstate,
474 GinTupleCollector *collector,
475 OffsetNumber attnum, Datum value, bool isNull,
476 ItemPointer ht_ctid)
477{
478 Datum *entries;
479 GinNullCategory *categories;
480 int32 i,
481 nentries;
482
483 /*
484 * Extract the key values that need to be inserted in the index
485 */
486 entries = ginExtractEntries(ginstate, attnum, value, isNull,
487 &nentries, &categories);
488
489 /*
490 * Protect against integer overflow in allocation calculations
491 */
492 if (nentries < 0 ||
493 collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
494 elog(ERROR, "too many entries for GIN index");
495
496 /*
497 * Allocate/reallocate memory for storing collected tuples
498 */
499 if (collector->tuples == NULL)
500 {
501 /*
502 * Determine the number of elements to allocate in the tuples array
503 * initially. Make it a power of 2 to avoid wasting memory when
504 * resizing (since palloc likes powers of 2).
505 */
506 collector->lentuples = 16;
507 while (collector->lentuples < nentries)
508 collector->lentuples *= 2;
509
510 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
511 }
512 else if (collector->lentuples < collector->ntuples + nentries)
513 {
514 /*
515 * Advance lentuples to the next suitable power of 2. This won't
516 * overflow, though we could get to a value that exceeds
517 * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
518 */
519 do
520 {
521 collector->lentuples *= 2;
522 } while (collector->lentuples < collector->ntuples + nentries);
523
524 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
525 sizeof(IndexTuple) * collector->lentuples);
526 }
527
528 /*
529 * Build an index tuple for each key value, and add to array. In pending
530 * tuples we just stick the heap TID into t_tid.
531 */
532 for (i = 0; i < nentries; i++)
533 {
534 IndexTuple itup;
535
536 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
537 NULL, 0, 0, true);
538 itup->t_tid = *ht_ctid;
539 collector->tuples[collector->ntuples++] = itup;
540 collector->sumsize += IndexTupleSize(itup);
541 }
542}
543
544/*
545 * Deletes pending list pages up to (not including) newHead page.
546 * If newHead == InvalidBlockNumber then function drops the whole list.
547 *
548 * metapage is pinned and exclusive-locked throughout this function.
549 */
550static void
551shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
552 bool fill_fsm, IndexBulkDeleteResult *stats)
553{
554 Page metapage;
555 GinMetaPageData *metadata;
556 BlockNumber blknoToDelete;
557
558 metapage = BufferGetPage(metabuffer);
559 metadata = GinPageGetMeta(metapage);
560 blknoToDelete = metadata->head;
561
562 do
563 {
564 Page page;
565 int i;
566 int64 nDeletedHeapTuples = 0;
567 ginxlogDeleteListPages data;
568 Buffer buffers[GIN_NDELETE_AT_ONCE];
569 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
570
571 data.ndeleted = 0;
572 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
573 {
574 freespace[data.ndeleted] = blknoToDelete;
575 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
576 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
577 page = BufferGetPage(buffers[data.ndeleted]);
578
579 data.ndeleted++;
580
581 Assert(!GinPageIsDeleted(page));
582
583 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
584 blknoToDelete = GinPageGetOpaque(page)->rightlink;
585 }
586
587 if (stats)
588 stats->pages_deleted += data.ndeleted;
589
590 /*
591 * This operation touches an unusually large number of pages, so
592 * prepare the XLogInsert machinery for that before entering the
593 * critical section.
594 */
595 if (RelationNeedsWAL(index))
596 XLogEnsureRecordSpace(data.ndeleted, 0);
597
598 START_CRIT_SECTION();
599
600 metadata->head = blknoToDelete;
601
602 Assert(metadata->nPendingPages >= data.ndeleted);
603 metadata->nPendingPages -= data.ndeleted;
604 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
605 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
606
607 if (blknoToDelete == InvalidBlockNumber)
608 {
609 metadata->tail = InvalidBlockNumber;
610 metadata->tailFreeSize = 0;
611 metadata->nPendingPages = 0;
612 metadata->nPendingHeapTuples = 0;
613 }
614
615 /*
616 * Set pd_lower just past the end of the metadata. This is essential,
617 * because without doing so, metadata will be lost if xlog.c
618 * compresses the page. (We must do this here because pre-v11
619 * versions of PG did not set the metapage's pd_lower correctly, so a
620 * pg_upgraded index might contain the wrong value.)
621 */
622 ((PageHeader) metapage)->pd_lower =
623 ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
624
625 MarkBufferDirty(metabuffer);
626
627 for (i = 0; i < data.ndeleted; i++)
628 {
629 page = BufferGetPage(buffers[i]);
630 GinPageGetOpaque(page)->flags = GIN_DELETED;
631 MarkBufferDirty(buffers[i]);
632 }
633
634 if (RelationNeedsWAL(index))
635 {
636 XLogRecPtr recptr;
637
638 XLogBeginInsert();
639 XLogRegisterBuffer(0, metabuffer,
640 REGBUF_WILL_INIT | REGBUF_STANDARD);
641 for (i = 0; i < data.ndeleted; i++)
642 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
643
644 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
645
646 XLogRegisterData((char *) &data,
647 sizeof(ginxlogDeleteListPages));
648
649 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
650 PageSetLSN(metapage, recptr);
651
652 for (i = 0; i < data.ndeleted; i++)
653 {
654 page = BufferGetPage(buffers[i]);
655 PageSetLSN(page, recptr);
656 }
657 }
658
659 for (i = 0; i < data.ndeleted; i++)
660 UnlockReleaseBuffer(buffers[i]);
661
662 END_CRIT_SECTION();
663
664 for (i = 0; fill_fsm && i < data.ndeleted; i++)
665 RecordFreeIndexPage(index, freespace[i]);
666
667 } while (blknoToDelete != newHead);
668}
669
670/* Initialize empty KeyArray */
671static void
672initKeyArray(KeyArray *keys, int32 maxvalues)
673{
674 keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
675 keys->categories = (GinNullCategory *)
676 palloc(sizeof(GinNullCategory) * maxvalues);
677 keys->nvalues = 0;
678 keys->maxvalues = maxvalues;
679}
680
681/* Add datum to KeyArray, resizing if needed */
682static void
683addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
684{
685 if (keys->nvalues >= keys->maxvalues)
686 {
687 keys->maxvalues *= 2;
688 keys->keys = (Datum *)
689 repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
690 keys->categories = (GinNullCategory *)
691 repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
692 }
693
694 keys->keys[keys->nvalues] = datum;
695 keys->categories[keys->nvalues] = category;
696 keys->nvalues++;
697}
698
699/*
700 * Collect data from a pending-list page in preparation for insertion into
701 * the main index.
702 *
703 * Go through all tuples >= startoff on page and collect values in accum
704 *
705 * Note that ka is just workspace --- it does not carry any state across
706 * calls.
707 */
708static void
709processPendingPage(BuildAccumulator *accum, KeyArray *ka,
710 Page page, OffsetNumber startoff)
711{
712 ItemPointerData heapptr;
713 OffsetNumber i,
714 maxoff;
715 OffsetNumber attrnum;
716
717 /* reset *ka to empty */
718 ka->nvalues = 0;
719
720 maxoff = PageGetMaxOffsetNumber(page);
721 Assert(maxoff >= FirstOffsetNumber);
722 ItemPointerSetInvalid(&heapptr);
723 attrnum = 0;
724
725 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
726 {
727 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
728 OffsetNumber curattnum;
729 Datum curkey;
730 GinNullCategory curcategory;
731
732 /* Check for change of heap TID or attnum */
733 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
734
735 if (!ItemPointerIsValid(&heapptr))
736 {
737 heapptr = itup->t_tid;
738 attrnum = curattnum;
739 }
740 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
741 curattnum == attrnum))
742 {
743 /*
744 * ginInsertBAEntries can insert several datums per call, but only
745 * for one heap tuple and one column. So call it at a boundary,
746 * and reset ka.
747 */
748 ginInsertBAEntries(accum, &heapptr, attrnum,
749 ka->keys, ka->categories, ka->nvalues);
750 ka->nvalues = 0;
751 heapptr = itup->t_tid;
752 attrnum = curattnum;
753 }
754
755 /* Add key to KeyArray */
756 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
757 addDatum(ka, curkey, curcategory);
758 }
759
760 /* Dump out all remaining keys */
761 ginInsertBAEntries(accum, &heapptr, attrnum,
762 ka->keys, ka->categories, ka->nvalues);
763}
764
765/*
766 * Move tuples from pending pages into regular GIN structure.
767 *
768 * On first glance it looks completely not crash-safe. But if we crash
769 * after posting entries to the main index and before removing them from the
770 * pending list, it's okay because when we redo the posting later on, nothing
771 * bad will happen.
772 *
773 * fill_fsm indicates that ginInsertCleanup should add deleted pages
774 * to FSM otherwise caller is responsible to put deleted pages into
775 * FSM.
776 *
777 * If stats isn't null, we count deleted pending pages into the counts.
778 */
779void
780ginInsertCleanup(GinState *ginstate, bool full_clean,
781 bool fill_fsm, bool forceCleanup,
782 IndexBulkDeleteResult *stats)
783{
784 Relation index = ginstate->index;
785 Buffer metabuffer,
786 buffer;
787 Page metapage,
788 page;
789 GinMetaPageData *metadata;
790 MemoryContext opCtx,
791 oldCtx;
792 BuildAccumulator accum;
793 KeyArray datums;
794 BlockNumber blkno,
795 blknoFinish;
796 bool cleanupFinish = false;
797 bool fsm_vac = false;
798 Size workMemory;
799
800 /*
801 * We would like to prevent concurrent cleanup process. For that we will
802 * lock metapage in exclusive mode using LockPage() call. Nobody other
803 * will use that lock for metapage, so we keep possibility of concurrent
804 * insertion into pending list
805 */
806
807 if (forceCleanup)
808 {
809 /*
810 * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
811 * and we would like to wait concurrent cleanup to finish.
812 */
813 LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
814 workMemory =
815 (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
816 autovacuum_work_mem : maintenance_work_mem;
817 }
818 else
819 {
820 /*
821 * We are called from regular insert and if we see concurrent cleanup
822 * just exit in hope that concurrent process will clean up pending
823 * list.
824 */
825 if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
826 return;
827 workMemory = work_mem;
828 }
829
830 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
831 LockBuffer(metabuffer, GIN_SHARE);
832 metapage = BufferGetPage(metabuffer);
833 metadata = GinPageGetMeta(metapage);
834
835 if (metadata->head == InvalidBlockNumber)
836 {
837 /* Nothing to do */
838 UnlockReleaseBuffer(metabuffer);
839 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
840 return;
841 }
842
843 /*
844 * Remember a tail page to prevent infinite cleanup if other backends add
845 * new tuples faster than we can cleanup.
846 */
847 blknoFinish = metadata->tail;
848
849 /*
850 * Read and lock head of pending list
851 */
852 blkno = metadata->head;
853 buffer = ReadBuffer(index, blkno);
854 LockBuffer(buffer, GIN_SHARE);
855 page = BufferGetPage(buffer);
856
857 LockBuffer(metabuffer, GIN_UNLOCK);
858
859 /*
860 * Initialize. All temporary space will be in opCtx
861 */
862 opCtx = AllocSetContextCreate(CurrentMemoryContext,
863 "GIN insert cleanup temporary context",
864 ALLOCSET_DEFAULT_SIZES);
865
866 oldCtx = MemoryContextSwitchTo(opCtx);
867
868 initKeyArray(&datums, 128);
869 ginInitBA(&accum);
870 accum.ginstate = ginstate;
871
872 /*
873 * At the top of this loop, we have pin and lock on the current page of
874 * the pending list. However, we'll release that before exiting the loop.
875 * Note we also have pin but not lock on the metapage.
876 */
877 for (;;)
878 {
879 Assert(!GinPageIsDeleted(page));
880
881 /*
882 * Are we walk through the page which as we remember was a tail when
883 * we start our cleanup? But if caller asks us to clean up whole
884 * pending list then ignore old tail, we will work until list becomes
885 * empty.
886 */
887 if (blkno == blknoFinish && full_clean == false)
888 cleanupFinish = true;
889
890 /*
891 * read page's datums into accum
892 */
893 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
894
895 vacuum_delay_point();
896
897 /*
898 * Is it time to flush memory to disk? Flush if we are at the end of
899 * the pending list, or if we have a full row and memory is getting
900 * full.
901 */
902 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
903 (GinPageHasFullRow(page) &&
904 (accum.allocatedMemory >= workMemory * 1024L)))
905 {
906 ItemPointerData *list;
907 uint32 nlist;
908 Datum key;
909 GinNullCategory category;
910 OffsetNumber maxoff,
911 attnum;
912
913 /*
914 * Unlock current page to increase performance. Changes of page
915 * will be checked later by comparing maxoff after completion of
916 * memory flush.
917 */
918 maxoff = PageGetMaxOffsetNumber(page);
919 LockBuffer(buffer, GIN_UNLOCK);
920
921 /*
922 * Moving collected data into regular structure can take
923 * significant amount of time - so, run it without locking pending
924 * list.
925 */
926 ginBeginBAScan(&accum);
927 while ((list = ginGetBAEntry(&accum,
928 &attnum, &key, &category, &nlist)) != NULL)
929 {
930 ginEntryInsert(ginstate, attnum, key, category,
931 list, nlist, NULL);
932 vacuum_delay_point();
933 }
934
935 /*
936 * Lock the whole list to remove pages
937 */
938 LockBuffer(metabuffer, GIN_EXCLUSIVE);
939 LockBuffer(buffer, GIN_SHARE);
940
941 Assert(!GinPageIsDeleted(page));
942
943 /*
944 * While we left the page unlocked, more stuff might have gotten
945 * added to it. If so, process those entries immediately. There
946 * shouldn't be very many, so we don't worry about the fact that
947 * we're doing this with exclusive lock. Insertion algorithm
948 * guarantees that inserted row(s) will not continue on next page.
949 * NOTE: intentionally no vacuum_delay_point in this loop.
950 */
951 if (PageGetMaxOffsetNumber(page) != maxoff)
952 {
953 ginInitBA(&accum);
954 processPendingPage(&accum, &datums, page, maxoff + 1);
955
956 ginBeginBAScan(&accum);
957 while ((list = ginGetBAEntry(&accum,
958 &attnum, &key, &category, &nlist)) != NULL)
959 ginEntryInsert(ginstate, attnum, key, category,
960 list, nlist, NULL);
961 }
962
963 /*
964 * Remember next page - it will become the new list head
965 */
966 blkno = GinPageGetOpaque(page)->rightlink;
967 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
968 * locking */
969
970 /*
971 * remove read pages from pending list, at this point all content
972 * of read pages is in regular structure
973 */
974 shiftList(index, metabuffer, blkno, fill_fsm, stats);
975
976 /* At this point, some pending pages have been freed up */
977 fsm_vac = true;
978
979 Assert(blkno == metadata->head);
980 LockBuffer(metabuffer, GIN_UNLOCK);
981
982 /*
983 * if we removed the whole pending list or we cleanup tail (which
984 * we remembered on start our cleanup process) then just exit
985 */
986 if (blkno == InvalidBlockNumber || cleanupFinish)
987 break;
988
989 /*
990 * release memory used so far and reinit state
991 */
992 MemoryContextReset(opCtx);
993 initKeyArray(&datums, datums.maxvalues);
994 ginInitBA(&accum);
995 }
996 else
997 {
998 blkno = GinPageGetOpaque(page)->rightlink;
999 UnlockReleaseBuffer(buffer);
1000 }
1001
1002 /*
1003 * Read next page in pending list
1004 */
1005 vacuum_delay_point();
1006 buffer = ReadBuffer(index, blkno);
1007 LockBuffer(buffer, GIN_SHARE);
1008 page = BufferGetPage(buffer);
1009 }
1010
1011 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
1012 ReleaseBuffer(metabuffer);
1013
1014 /*
1015 * As pending list pages can have a high churn rate, it is desirable to
1016 * recycle them immediately to the FreeSpace Map when ordinary backends
1017 * clean the list.
1018 */
1019 if (fsm_vac && fill_fsm)
1020 IndexFreeSpaceMapVacuum(index);
1021
1022 /* Clean up temporary space */
1023 MemoryContextSwitchTo(oldCtx);
1024 MemoryContextDelete(opCtx);
1025}
1026
1027/*
1028 * SQL-callable function to clean the insert pending list
1029 */
1030Datum
1031gin_clean_pending_list(PG_FUNCTION_ARGS)
1032{
1033 Oid indexoid = PG_GETARG_OID(0);
1034 Relation indexRel = index_open(indexoid, RowExclusiveLock);
1035 IndexBulkDeleteResult stats;
1036 GinState ginstate;
1037
1038 if (RecoveryInProgress())
1039 ereport(ERROR,
1040 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1041 errmsg("recovery is in progress"),
1042 errhint("GIN pending list cannot be cleaned up during recovery.")));
1043
1044 /* Must be a GIN index */
1045 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1046 indexRel->rd_rel->relam != GIN_AM_OID)
1047 ereport(ERROR,
1048 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1049 errmsg("\"%s\" is not a GIN index",
1050 RelationGetRelationName(indexRel))));
1051
1052 /*
1053 * Reject attempts to read non-local temporary relations; we would be
1054 * likely to get wrong data since we have no visibility into the owning
1055 * session's local buffers.
1056 */
1057 if (RELATION_IS_OTHER_TEMP(indexRel))
1058 ereport(ERROR,
1059 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1060 errmsg("cannot access temporary indexes of other sessions")));
1061
1062 /* User must own the index (comparable to privileges needed for VACUUM) */
1063 if (!pg_class_ownercheck(indexoid, GetUserId()))
1064 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1065 RelationGetRelationName(indexRel));
1066
1067 memset(&stats, 0, sizeof(stats));
1068 initGinState(&ginstate, indexRel);
1069 ginInsertCleanup(&ginstate, true, true, true, &stats);
1070
1071 index_close(indexRel, RowExclusiveLock);
1072
1073 PG_RETURN_INT64((int64) stats.pages_deleted);
1074}
1075