1/*
2 * brin.c
3 * Implementation of BRIN indexes for Postgres
4 *
5 * See src/backend/access/brin/README for details.
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/brin/brin.c
12 *
13 * TODO
14 * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 */
16#include "postgres.h"
17
18#include "access/brin.h"
19#include "access/brin_page.h"
20#include "access/brin_pageops.h"
21#include "access/brin_xlog.h"
22#include "access/relation.h"
23#include "access/reloptions.h"
24#include "access/relscan.h"
25#include "access/table.h"
26#include "access/tableam.h"
27#include "access/xloginsert.h"
28#include "catalog/index.h"
29#include "catalog/pg_am.h"
30#include "miscadmin.h"
31#include "pgstat.h"
32#include "postmaster/autovacuum.h"
33#include "storage/bufmgr.h"
34#include "storage/freespace.h"
35#include "utils/builtins.h"
36#include "utils/index_selfuncs.h"
37#include "utils/memutils.h"
38#include "utils/rel.h"
39
40
41/*
42 * We use a BrinBuildState during initial construction of a BRIN index.
43 * The running state is kept in a BrinMemTuple.
44 */
45typedef struct BrinBuildState
46{
47 Relation bs_irel;
48 int bs_numtuples;
49 Buffer bs_currentInsertBuf;
50 BlockNumber bs_pagesPerRange;
51 BlockNumber bs_currRangeStart;
52 BrinRevmap *bs_rmAccess;
53 BrinDesc *bs_bdesc;
54 BrinMemTuple *bs_dtuple;
55} BrinBuildState;
56
57/*
58 * Struct used as "opaque" during index scans
59 */
60typedef struct BrinOpaque
61{
62 BlockNumber bo_pagesPerRange;
63 BrinRevmap *bo_rmAccess;
64 BrinDesc *bo_bdesc;
65} BrinOpaque;
66
67#define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
68
69static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
70 BrinRevmap *revmap, BlockNumber pagesPerRange);
71static void terminate_brin_buildstate(BrinBuildState *state);
72static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
73 bool include_partial, double *numSummarized, double *numExisting);
74static void form_and_insert_tuple(BrinBuildState *state);
75static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
76 BrinTuple *b);
77static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
78
79
80/*
81 * BRIN handler function: return IndexAmRoutine with access method parameters
82 * and callbacks.
83 */
84Datum
85brinhandler(PG_FUNCTION_ARGS)
86{
87 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
88
89 amroutine->amstrategies = 0;
90 amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
91 amroutine->amcanorder = false;
92 amroutine->amcanorderbyop = false;
93 amroutine->amcanbackward = false;
94 amroutine->amcanunique = false;
95 amroutine->amcanmulticol = true;
96 amroutine->amoptionalkey = true;
97 amroutine->amsearcharray = false;
98 amroutine->amsearchnulls = true;
99 amroutine->amstorage = true;
100 amroutine->amclusterable = false;
101 amroutine->ampredlocks = false;
102 amroutine->amcanparallel = false;
103 amroutine->amcaninclude = false;
104 amroutine->amkeytype = InvalidOid;
105
106 amroutine->ambuild = brinbuild;
107 amroutine->ambuildempty = brinbuildempty;
108 amroutine->aminsert = brininsert;
109 amroutine->ambulkdelete = brinbulkdelete;
110 amroutine->amvacuumcleanup = brinvacuumcleanup;
111 amroutine->amcanreturn = NULL;
112 amroutine->amcostestimate = brincostestimate;
113 amroutine->amoptions = brinoptions;
114 amroutine->amproperty = NULL;
115 amroutine->ambuildphasename = NULL;
116 amroutine->amvalidate = brinvalidate;
117 amroutine->ambeginscan = brinbeginscan;
118 amroutine->amrescan = brinrescan;
119 amroutine->amgettuple = NULL;
120 amroutine->amgetbitmap = bringetbitmap;
121 amroutine->amendscan = brinendscan;
122 amroutine->ammarkpos = NULL;
123 amroutine->amrestrpos = NULL;
124 amroutine->amestimateparallelscan = NULL;
125 amroutine->aminitparallelscan = NULL;
126 amroutine->amparallelrescan = NULL;
127
128 PG_RETURN_POINTER(amroutine);
129}
130
131/*
132 * A tuple in the heap is being inserted. To keep a brin index up to date,
133 * we need to obtain the relevant index tuple and compare its stored values
134 * with those of the new tuple. If the tuple values are not consistent with
135 * the summary tuple, we need to update the index tuple.
136 *
137 * If autosummarization is enabled, check if we need to summarize the previous
138 * page range.
139 *
140 * If the range is not currently summarized (i.e. the revmap returns NULL for
141 * it), there's nothing to do for this tuple.
142 */
143bool
144brininsert(Relation idxRel, Datum *values, bool *nulls,
145 ItemPointer heaptid, Relation heapRel,
146 IndexUniqueCheck checkUnique,
147 IndexInfo *indexInfo)
148{
149 BlockNumber pagesPerRange;
150 BlockNumber origHeapBlk;
151 BlockNumber heapBlk;
152 BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
153 BrinRevmap *revmap;
154 Buffer buf = InvalidBuffer;
155 MemoryContext tupcxt = NULL;
156 MemoryContext oldcxt = CurrentMemoryContext;
157 bool autosummarize = BrinGetAutoSummarize(idxRel);
158
159 revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
160
161 /*
162 * origHeapBlk is the block number where the insertion occurred. heapBlk
163 * is the first block in the corresponding page range.
164 */
165 origHeapBlk = ItemPointerGetBlockNumber(heaptid);
166 heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
167
168 for (;;)
169 {
170 bool need_insert = false;
171 OffsetNumber off;
172 BrinTuple *brtup;
173 BrinMemTuple *dtup;
174 int keyno;
175
176 CHECK_FOR_INTERRUPTS();
177
178 /*
179 * If auto-summarization is enabled and we just inserted the first
180 * tuple into the first block of a new non-first page range, request a
181 * summarization run of the previous range.
182 */
183 if (autosummarize &&
184 heapBlk > 0 &&
185 heapBlk == origHeapBlk &&
186 ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
187 {
188 BlockNumber lastPageRange = heapBlk - 1;
189 BrinTuple *lastPageTuple;
190
191 lastPageTuple =
192 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
193 NULL, BUFFER_LOCK_SHARE, NULL);
194 if (!lastPageTuple)
195 {
196 bool recorded;
197
198 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
199 RelationGetRelid(idxRel),
200 lastPageRange);
201 if (!recorded)
202 ereport(LOG,
203 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
204 errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
205 RelationGetRelationName(idxRel),
206 lastPageRange)));
207 }
208 else
209 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
210 }
211
212 brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
213 NULL, BUFFER_LOCK_SHARE, NULL);
214
215 /* if range is unsummarized, there's nothing to do */
216 if (!brtup)
217 break;
218
219 /* First time through in this statement? */
220 if (bdesc == NULL)
221 {
222 MemoryContextSwitchTo(indexInfo->ii_Context);
223 bdesc = brin_build_desc(idxRel);
224 indexInfo->ii_AmCache = (void *) bdesc;
225 MemoryContextSwitchTo(oldcxt);
226 }
227 /* First time through in this brininsert call? */
228 if (tupcxt == NULL)
229 {
230 tupcxt = AllocSetContextCreate(CurrentMemoryContext,
231 "brininsert cxt",
232 ALLOCSET_DEFAULT_SIZES);
233 MemoryContextSwitchTo(tupcxt);
234 }
235
236 dtup = brin_deform_tuple(bdesc, brtup, NULL);
237
238 /*
239 * Compare the key values of the new tuple to the stored index values;
240 * our deformed tuple will get updated if the new tuple doesn't fit
241 * the original range (note this means we can't break out of the loop
242 * early). Make a note of whether this happens, so that we know to
243 * insert the modified tuple later.
244 */
245 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
246 {
247 Datum result;
248 BrinValues *bval;
249 FmgrInfo *addValue;
250
251 bval = &dtup->bt_columns[keyno];
252 addValue = index_getprocinfo(idxRel, keyno + 1,
253 BRIN_PROCNUM_ADDVALUE);
254 result = FunctionCall4Coll(addValue,
255 idxRel->rd_indcollation[keyno],
256 PointerGetDatum(bdesc),
257 PointerGetDatum(bval),
258 values[keyno],
259 nulls[keyno]);
260 /* if that returned true, we need to insert the updated tuple */
261 need_insert |= DatumGetBool(result);
262 }
263
264 if (!need_insert)
265 {
266 /*
267 * The tuple is consistent with the new values, so there's nothing
268 * to do.
269 */
270 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
271 }
272 else
273 {
274 Page page = BufferGetPage(buf);
275 ItemId lp = PageGetItemId(page, off);
276 Size origsz;
277 BrinTuple *origtup;
278 Size newsz;
279 BrinTuple *newtup;
280 bool samepage;
281
282 /*
283 * Make a copy of the old tuple, so that we can compare it after
284 * re-acquiring the lock.
285 */
286 origsz = ItemIdGetLength(lp);
287 origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
288
289 /*
290 * Before releasing the lock, check if we can attempt a same-page
291 * update. Another process could insert a tuple concurrently in
292 * the same page though, so downstream we must be prepared to cope
293 * if this turns out to not be possible after all.
294 */
295 newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
296 samepage = brin_can_do_samepage_update(buf, origsz, newsz);
297 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
298
299 /*
300 * Try to update the tuple. If this doesn't work for whatever
301 * reason, we need to restart from the top; the revmap might be
302 * pointing at a different tuple for this block now, so we need to
303 * recompute to ensure both our new heap tuple and the other
304 * inserter's are covered by the combined tuple. It might be that
305 * we don't need to update at all.
306 */
307 if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
308 buf, off, origtup, origsz, newtup, newsz,
309 samepage))
310 {
311 /* no luck; start over */
312 MemoryContextResetAndDeleteChildren(tupcxt);
313 continue;
314 }
315 }
316
317 /* success! */
318 break;
319 }
320
321 brinRevmapTerminate(revmap);
322 if (BufferIsValid(buf))
323 ReleaseBuffer(buf);
324 MemoryContextSwitchTo(oldcxt);
325 if (tupcxt != NULL)
326 MemoryContextDelete(tupcxt);
327
328 return false;
329}
330
331/*
332 * Initialize state for a BRIN index scan.
333 *
334 * We read the metapage here to determine the pages-per-range number that this
335 * index was built with. Note that since this cannot be changed while we're
336 * holding lock on index, it's not necessary to recompute it during brinrescan.
337 */
338IndexScanDesc
339brinbeginscan(Relation r, int nkeys, int norderbys)
340{
341 IndexScanDesc scan;
342 BrinOpaque *opaque;
343
344 scan = RelationGetIndexScan(r, nkeys, norderbys);
345
346 opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
347 opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
348 scan->xs_snapshot);
349 opaque->bo_bdesc = brin_build_desc(r);
350 scan->opaque = opaque;
351
352 return scan;
353}
354
355/*
356 * Execute the index scan.
357 *
358 * This works by reading index TIDs from the revmap, and obtaining the index
359 * tuples pointed to by them; the summary values in the index tuples are
360 * compared to the scan keys. We return into the TID bitmap all the pages in
361 * ranges corresponding to index tuples that match the scan keys.
362 *
363 * If a TID from the revmap is read as InvalidTID, we know that range is
364 * unsummarized. Pages in those ranges need to be returned regardless of scan
365 * keys.
366 */
367int64
368bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
369{
370 Relation idxRel = scan->indexRelation;
371 Buffer buf = InvalidBuffer;
372 BrinDesc *bdesc;
373 Oid heapOid;
374 Relation heapRel;
375 BrinOpaque *opaque;
376 BlockNumber nblocks;
377 BlockNumber heapBlk;
378 int totalpages = 0;
379 FmgrInfo *consistentFn;
380 MemoryContext oldcxt;
381 MemoryContext perRangeCxt;
382 BrinMemTuple *dtup;
383 BrinTuple *btup = NULL;
384 Size btupsz = 0;
385
386 opaque = (BrinOpaque *) scan->opaque;
387 bdesc = opaque->bo_bdesc;
388 pgstat_count_index_scan(idxRel);
389
390 /*
391 * We need to know the size of the table so that we know how long to
392 * iterate on the revmap.
393 */
394 heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
395 heapRel = table_open(heapOid, AccessShareLock);
396 nblocks = RelationGetNumberOfBlocks(heapRel);
397 table_close(heapRel, AccessShareLock);
398
399 /*
400 * Make room for the consistent support procedures of indexed columns. We
401 * don't look them up here; we do that lazily the first time we see a scan
402 * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
403 */
404 consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
405
406 /* allocate an initial in-memory tuple, out of the per-range memcxt */
407 dtup = brin_new_memtuple(bdesc);
408
409 /*
410 * Setup and use a per-range memory context, which is reset every time we
411 * loop below. This avoids having to free the tuples within the loop.
412 */
413 perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
414 "bringetbitmap cxt",
415 ALLOCSET_DEFAULT_SIZES);
416 oldcxt = MemoryContextSwitchTo(perRangeCxt);
417
418 /*
419 * Now scan the revmap. We start by querying for heap page 0,
420 * incrementing by the number of pages per range; this gives us a full
421 * view of the table.
422 */
423 for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
424 {
425 bool addrange;
426 bool gottuple = false;
427 BrinTuple *tup;
428 OffsetNumber off;
429 Size size;
430
431 CHECK_FOR_INTERRUPTS();
432
433 MemoryContextResetAndDeleteChildren(perRangeCxt);
434
435 tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
436 &off, &size, BUFFER_LOCK_SHARE,
437 scan->xs_snapshot);
438 if (tup)
439 {
440 gottuple = true;
441 btup = brin_copy_tuple(tup, size, btup, &btupsz);
442 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
443 }
444
445 /*
446 * For page ranges with no indexed tuple, we must return the whole
447 * range; otherwise, compare it to the scan keys.
448 */
449 if (!gottuple)
450 {
451 addrange = true;
452 }
453 else
454 {
455 dtup = brin_deform_tuple(bdesc, btup, dtup);
456 if (dtup->bt_placeholder)
457 {
458 /*
459 * Placeholder tuples are always returned, regardless of the
460 * values stored in them.
461 */
462 addrange = true;
463 }
464 else
465 {
466 int keyno;
467
468 /*
469 * Compare scan keys with summary values stored for the range.
470 * If scan keys are matched, the page range must be added to
471 * the bitmap. We initially assume the range needs to be
472 * added; in particular this serves the case where there are
473 * no keys.
474 */
475 addrange = true;
476 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
477 {
478 ScanKey key = &scan->keyData[keyno];
479 AttrNumber keyattno = key->sk_attno;
480 BrinValues *bval = &dtup->bt_columns[keyattno - 1];
481 Datum add;
482
483 /*
484 * The collation of the scan key must match the collation
485 * used in the index column (but only if the search is not
486 * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
487 * this index ...
488 */
489 Assert((key->sk_flags & SK_ISNULL) ||
490 (key->sk_collation ==
491 TupleDescAttr(bdesc->bd_tupdesc,
492 keyattno - 1)->attcollation));
493
494 /* First time this column? look up consistent function */
495 if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
496 {
497 FmgrInfo *tmp;
498
499 tmp = index_getprocinfo(idxRel, keyattno,
500 BRIN_PROCNUM_CONSISTENT);
501 fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
502 CurrentMemoryContext);
503 }
504
505 /*
506 * Check whether the scan key is consistent with the page
507 * range values; if so, have the pages in the range added
508 * to the output bitmap.
509 *
510 * When there are multiple scan keys, failure to meet the
511 * criteria for a single one of them is enough to discard
512 * the range as a whole, so break out of the loop as soon
513 * as a false return value is obtained.
514 */
515 add = FunctionCall3Coll(&consistentFn[keyattno - 1],
516 key->sk_collation,
517 PointerGetDatum(bdesc),
518 PointerGetDatum(bval),
519 PointerGetDatum(key));
520 addrange = DatumGetBool(add);
521 if (!addrange)
522 break;
523 }
524 }
525 }
526
527 /* add the pages in the range to the output bitmap, if needed */
528 if (addrange)
529 {
530 BlockNumber pageno;
531
532 for (pageno = heapBlk;
533 pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
534 pageno++)
535 {
536 MemoryContextSwitchTo(oldcxt);
537 tbm_add_page(tbm, pageno);
538 totalpages++;
539 MemoryContextSwitchTo(perRangeCxt);
540 }
541 }
542 }
543
544 MemoryContextSwitchTo(oldcxt);
545 MemoryContextDelete(perRangeCxt);
546
547 if (buf != InvalidBuffer)
548 ReleaseBuffer(buf);
549
550 /*
551 * XXX We have an approximation of the number of *pages* that our scan
552 * returns, but we don't have a precise idea of the number of heap tuples
553 * involved.
554 */
555 return totalpages * 10;
556}
557
558/*
559 * Re-initialize state for a BRIN index scan
560 */
561void
562brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
563 ScanKey orderbys, int norderbys)
564{
565 /*
566 * Other index AMs preprocess the scan keys at this point, or sometime
567 * early during the scan; this lets them optimize by removing redundant
568 * keys, or doing early returns when they are impossible to satisfy; see
569 * _bt_preprocess_keys for an example. Something like that could be added
570 * here someday, too.
571 */
572
573 if (scankey && scan->numberOfKeys > 0)
574 memmove(scan->keyData, scankey,
575 scan->numberOfKeys * sizeof(ScanKeyData));
576}
577
578/*
579 * Close down a BRIN index scan
580 */
581void
582brinendscan(IndexScanDesc scan)
583{
584 BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
585
586 brinRevmapTerminate(opaque->bo_rmAccess);
587 brin_free_desc(opaque->bo_bdesc);
588 pfree(opaque);
589}
590
591/*
592 * Per-heap-tuple callback for table_index_build_scan.
593 *
594 * Note we don't worry about the page range at the end of the table here; it is
595 * present in the build state struct after we're called the last time, but not
596 * inserted into the index. Caller must ensure to do so, if appropriate.
597 */
598static void
599brinbuildCallback(Relation index,
600 HeapTuple htup,
601 Datum *values,
602 bool *isnull,
603 bool tupleIsAlive,
604 void *brstate)
605{
606 BrinBuildState *state = (BrinBuildState *) brstate;
607 BlockNumber thisblock;
608 int i;
609
610 thisblock = ItemPointerGetBlockNumber(&htup->t_self);
611
612 /*
613 * If we're in a block that belongs to a future range, summarize what
614 * we've got and start afresh. Note the scan might have skipped many
615 * pages, if they were devoid of live tuples; make sure to insert index
616 * tuples for those too.
617 */
618 while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
619 {
620
621 BRIN_elog((DEBUG2,
622 "brinbuildCallback: completed a range: %u--%u",
623 state->bs_currRangeStart,
624 state->bs_currRangeStart + state->bs_pagesPerRange));
625
626 /* create the index tuple and insert it */
627 form_and_insert_tuple(state);
628
629 /* set state to correspond to the next range */
630 state->bs_currRangeStart += state->bs_pagesPerRange;
631
632 /* re-initialize state for it */
633 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
634 }
635
636 /* Accumulate the current tuple into the running state */
637 for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
638 {
639 FmgrInfo *addValue;
640 BrinValues *col;
641 Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
642
643 col = &state->bs_dtuple->bt_columns[i];
644 addValue = index_getprocinfo(index, i + 1,
645 BRIN_PROCNUM_ADDVALUE);
646
647 /*
648 * Update dtuple state, if and as necessary.
649 */
650 FunctionCall4Coll(addValue,
651 attr->attcollation,
652 PointerGetDatum(state->bs_bdesc),
653 PointerGetDatum(col),
654 values[i], isnull[i]);
655 }
656}
657
658/*
659 * brinbuild() -- build a new BRIN index.
660 */
661IndexBuildResult *
662brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
663{
664 IndexBuildResult *result;
665 double reltuples;
666 double idxtuples;
667 BrinRevmap *revmap;
668 BrinBuildState *state;
669 Buffer meta;
670 BlockNumber pagesPerRange;
671
672 /*
673 * We expect to be called exactly once for any index relation.
674 */
675 if (RelationGetNumberOfBlocks(index) != 0)
676 elog(ERROR, "index \"%s\" already contains data",
677 RelationGetRelationName(index));
678
679 /*
680 * Critical section not required, because on error the creation of the
681 * whole relation will be rolled back.
682 */
683
684 meta = ReadBuffer(index, P_NEW);
685 Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
686 LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
687
688 brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
689 BRIN_CURRENT_VERSION);
690 MarkBufferDirty(meta);
691
692 if (RelationNeedsWAL(index))
693 {
694 xl_brin_createidx xlrec;
695 XLogRecPtr recptr;
696 Page page;
697
698 xlrec.version = BRIN_CURRENT_VERSION;
699 xlrec.pagesPerRange = BrinGetPagesPerRange(index);
700
701 XLogBeginInsert();
702 XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
703 XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
704
705 recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
706
707 page = BufferGetPage(meta);
708 PageSetLSN(page, recptr);
709 }
710
711 UnlockReleaseBuffer(meta);
712
713 /*
714 * Initialize our state, including the deformed tuple state.
715 */
716 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
717 state = initialize_brin_buildstate(index, revmap, pagesPerRange);
718
719 /*
720 * Now scan the relation. No syncscan allowed here because we want the
721 * heap blocks in physical order.
722 */
723 reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
724 brinbuildCallback, (void *) state, NULL);
725
726 /* process the final batch */
727 form_and_insert_tuple(state);
728
729 /* release resources */
730 idxtuples = state->bs_numtuples;
731 brinRevmapTerminate(state->bs_rmAccess);
732 terminate_brin_buildstate(state);
733
734 /*
735 * Return statistics
736 */
737 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
738
739 result->heap_tuples = reltuples;
740 result->index_tuples = idxtuples;
741
742 return result;
743}
744
745void
746brinbuildempty(Relation index)
747{
748 Buffer metabuf;
749
750 /* An empty BRIN index has a metapage only. */
751 metabuf =
752 ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
753 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
754
755 /* Initialize and xlog metabuffer. */
756 START_CRIT_SECTION();
757 brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
758 BRIN_CURRENT_VERSION);
759 MarkBufferDirty(metabuf);
760 log_newpage_buffer(metabuf, true);
761 END_CRIT_SECTION();
762
763 UnlockReleaseBuffer(metabuf);
764}
765
766/*
767 * brinbulkdelete
768 * Since there are no per-heap-tuple index tuples in BRIN indexes,
769 * there's not a lot we can do here.
770 *
771 * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
772 * tuple is deleted), meaning the need to re-run summarization on the affected
773 * range. Would need to add an extra flag in brintuples for that.
774 */
775IndexBulkDeleteResult *
776brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
777 IndexBulkDeleteCallback callback, void *callback_state)
778{
779 /* allocate stats if first time through, else re-use existing struct */
780 if (stats == NULL)
781 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
782
783 return stats;
784}
785
786/*
787 * This routine is in charge of "vacuuming" a BRIN index: we just summarize
788 * ranges that are currently unsummarized.
789 */
790IndexBulkDeleteResult *
791brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
792{
793 Relation heapRel;
794
795 /* No-op in ANALYZE ONLY mode */
796 if (info->analyze_only)
797 return stats;
798
799 if (!stats)
800 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
801 stats->num_pages = RelationGetNumberOfBlocks(info->index);
802 /* rest of stats is initialized by zeroing */
803
804 heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
805 AccessShareLock);
806
807 brin_vacuum_scan(info->index, info->strategy);
808
809 brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
810 &stats->num_index_tuples, &stats->num_index_tuples);
811
812 table_close(heapRel, AccessShareLock);
813
814 return stats;
815}
816
817/*
818 * reloptions processor for BRIN indexes
819 */
820bytea *
821brinoptions(Datum reloptions, bool validate)
822{
823 relopt_value *options;
824 BrinOptions *rdopts;
825 int numoptions;
826 static const relopt_parse_elt tab[] = {
827 {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
828 {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
829 };
830
831 options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
832 &numoptions);
833
834 /* if none set, we're done */
835 if (numoptions == 0)
836 return NULL;
837
838 rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
839
840 fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
841 validate, tab, lengthof(tab));
842
843 pfree(options);
844
845 return (bytea *) rdopts;
846}
847
848/*
849 * SQL-callable function to scan through an index and summarize all ranges
850 * that are not currently summarized.
851 */
852Datum
853brin_summarize_new_values(PG_FUNCTION_ARGS)
854{
855 Datum relation = PG_GETARG_DATUM(0);
856
857 return DirectFunctionCall2(brin_summarize_range,
858 relation,
859 Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
860}
861
862/*
863 * SQL-callable function to summarize the indicated page range, if not already
864 * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
865 * unsummarized ranges are summarized.
866 */
867Datum
868brin_summarize_range(PG_FUNCTION_ARGS)
869{
870 Oid indexoid = PG_GETARG_OID(0);
871 int64 heapBlk64 = PG_GETARG_INT64(1);
872 BlockNumber heapBlk;
873 Oid heapoid;
874 Relation indexRel;
875 Relation heapRel;
876 double numSummarized = 0;
877
878 if (RecoveryInProgress())
879 ereport(ERROR,
880 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
881 errmsg("recovery is in progress"),
882 errhint("BRIN control functions cannot be executed during recovery.")));
883
884 if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
885 {
886 char *blk = psprintf(INT64_FORMAT, heapBlk64);
887
888 ereport(ERROR,
889 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
890 errmsg("block number out of range: %s", blk)));
891 }
892 heapBlk = (BlockNumber) heapBlk64;
893
894 /*
895 * We must lock table before index to avoid deadlocks. However, if the
896 * passed indexoid isn't an index then IndexGetRelation() will fail.
897 * Rather than emitting a not-very-helpful error message, postpone
898 * complaining, expecting that the is-it-an-index test below will fail.
899 */
900 heapoid = IndexGetRelation(indexoid, true);
901 if (OidIsValid(heapoid))
902 heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
903 else
904 heapRel = NULL;
905
906 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
907
908 /* Must be a BRIN index */
909 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
910 indexRel->rd_rel->relam != BRIN_AM_OID)
911 ereport(ERROR,
912 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
913 errmsg("\"%s\" is not a BRIN index",
914 RelationGetRelationName(indexRel))));
915
916 /* User must own the index (comparable to privileges needed for VACUUM) */
917 if (!pg_class_ownercheck(indexoid, GetUserId()))
918 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
919 RelationGetRelationName(indexRel));
920
921 /*
922 * Since we did the IndexGetRelation call above without any lock, it's
923 * barely possible that a race against an index drop/recreation could have
924 * netted us the wrong table. Recheck.
925 */
926 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
927 ereport(ERROR,
928 (errcode(ERRCODE_UNDEFINED_TABLE),
929 errmsg("could not open parent table of index %s",
930 RelationGetRelationName(indexRel))));
931
932 /* OK, do it */
933 brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
934
935 relation_close(indexRel, ShareUpdateExclusiveLock);
936 relation_close(heapRel, ShareUpdateExclusiveLock);
937
938 PG_RETURN_INT32((int32) numSummarized);
939}
940
941/*
942 * SQL-callable interface to mark a range as no longer summarized
943 */
944Datum
945brin_desummarize_range(PG_FUNCTION_ARGS)
946{
947 Oid indexoid = PG_GETARG_OID(0);
948 int64 heapBlk64 = PG_GETARG_INT64(1);
949 BlockNumber heapBlk;
950 Oid heapoid;
951 Relation heapRel;
952 Relation indexRel;
953 bool done;
954
955 if (RecoveryInProgress())
956 ereport(ERROR,
957 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
958 errmsg("recovery is in progress"),
959 errhint("BRIN control functions cannot be executed during recovery.")));
960
961 if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
962 {
963 char *blk = psprintf(INT64_FORMAT, heapBlk64);
964
965 ereport(ERROR,
966 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
967 errmsg("block number out of range: %s", blk)));
968 }
969 heapBlk = (BlockNumber) heapBlk64;
970
971 /*
972 * We must lock table before index to avoid deadlocks. However, if the
973 * passed indexoid isn't an index then IndexGetRelation() will fail.
974 * Rather than emitting a not-very-helpful error message, postpone
975 * complaining, expecting that the is-it-an-index test below will fail.
976 */
977 heapoid = IndexGetRelation(indexoid, true);
978 if (OidIsValid(heapoid))
979 heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
980 else
981 heapRel = NULL;
982
983 indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
984
985 /* Must be a BRIN index */
986 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
987 indexRel->rd_rel->relam != BRIN_AM_OID)
988 ereport(ERROR,
989 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
990 errmsg("\"%s\" is not a BRIN index",
991 RelationGetRelationName(indexRel))));
992
993 /* User must own the index (comparable to privileges needed for VACUUM) */
994 if (!pg_class_ownercheck(indexoid, GetUserId()))
995 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
996 RelationGetRelationName(indexRel));
997
998 /*
999 * Since we did the IndexGetRelation call above without any lock, it's
1000 * barely possible that a race against an index drop/recreation could have
1001 * netted us the wrong table. Recheck.
1002 */
1003 if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1004 ereport(ERROR,
1005 (errcode(ERRCODE_UNDEFINED_TABLE),
1006 errmsg("could not open parent table of index %s",
1007 RelationGetRelationName(indexRel))));
1008
1009 /* the revmap does the hard work */
1010 do
1011 {
1012 done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1013 }
1014 while (!done);
1015
1016 relation_close(indexRel, ShareUpdateExclusiveLock);
1017 relation_close(heapRel, ShareUpdateExclusiveLock);
1018
1019 PG_RETURN_VOID();
1020}
1021
1022/*
1023 * Build a BrinDesc used to create or scan a BRIN index
1024 */
1025BrinDesc *
1026brin_build_desc(Relation rel)
1027{
1028 BrinOpcInfo **opcinfo;
1029 BrinDesc *bdesc;
1030 TupleDesc tupdesc;
1031 int totalstored = 0;
1032 int keyno;
1033 long totalsize;
1034 MemoryContext cxt;
1035 MemoryContext oldcxt;
1036
1037 cxt = AllocSetContextCreate(CurrentMemoryContext,
1038 "brin desc cxt",
1039 ALLOCSET_SMALL_SIZES);
1040 oldcxt = MemoryContextSwitchTo(cxt);
1041 tupdesc = RelationGetDescr(rel);
1042
1043 /*
1044 * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1045 * the number of columns stored, since the number is opclass-defined.
1046 */
1047 opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1048 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1049 {
1050 FmgrInfo *opcInfoFn;
1051 Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1052
1053 opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1054
1055 opcinfo[keyno] = (BrinOpcInfo *)
1056 DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1057 totalstored += opcinfo[keyno]->oi_nstored;
1058 }
1059
1060 /* Allocate our result struct and fill it in */
1061 totalsize = offsetof(BrinDesc, bd_info) +
1062 sizeof(BrinOpcInfo *) * tupdesc->natts;
1063
1064 bdesc = palloc(totalsize);
1065 bdesc->bd_context = cxt;
1066 bdesc->bd_index = rel;
1067 bdesc->bd_tupdesc = tupdesc;
1068 bdesc->bd_disktdesc = NULL; /* generated lazily */
1069 bdesc->bd_totalstored = totalstored;
1070
1071 for (keyno = 0; keyno < tupdesc->natts; keyno++)
1072 bdesc->bd_info[keyno] = opcinfo[keyno];
1073 pfree(opcinfo);
1074
1075 MemoryContextSwitchTo(oldcxt);
1076
1077 return bdesc;
1078}
1079
1080void
1081brin_free_desc(BrinDesc *bdesc)
1082{
1083 /* make sure the tupdesc is still valid */
1084 Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1085 /* no need for retail pfree */
1086 MemoryContextDelete(bdesc->bd_context);
1087}
1088
1089/*
1090 * Fetch index's statistical data into *stats
1091 */
1092void
1093brinGetStats(Relation index, BrinStatsData *stats)
1094{
1095 Buffer metabuffer;
1096 Page metapage;
1097 BrinMetaPageData *metadata;
1098
1099 metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1100 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1101 metapage = BufferGetPage(metabuffer);
1102 metadata = (BrinMetaPageData *) PageGetContents(metapage);
1103
1104 stats->pagesPerRange = metadata->pagesPerRange;
1105 stats->revmapNumPages = metadata->lastRevmapPage - 1;
1106
1107 UnlockReleaseBuffer(metabuffer);
1108}
1109
1110/*
1111 * Initialize a BrinBuildState appropriate to create tuples on the given index.
1112 */
1113static BrinBuildState *
1114initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1115 BlockNumber pagesPerRange)
1116{
1117 BrinBuildState *state;
1118
1119 state = palloc(sizeof(BrinBuildState));
1120
1121 state->bs_irel = idxRel;
1122 state->bs_numtuples = 0;
1123 state->bs_currentInsertBuf = InvalidBuffer;
1124 state->bs_pagesPerRange = pagesPerRange;
1125 state->bs_currRangeStart = 0;
1126 state->bs_rmAccess = revmap;
1127 state->bs_bdesc = brin_build_desc(idxRel);
1128 state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1129
1130 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1131
1132 return state;
1133}
1134
1135/*
1136 * Release resources associated with a BrinBuildState.
1137 */
1138static void
1139terminate_brin_buildstate(BrinBuildState *state)
1140{
1141 /*
1142 * Release the last index buffer used. We might as well ensure that
1143 * whatever free space remains in that page is available in FSM, too.
1144 */
1145 if (!BufferIsInvalid(state->bs_currentInsertBuf))
1146 {
1147 Page page;
1148 Size freespace;
1149 BlockNumber blk;
1150
1151 page = BufferGetPage(state->bs_currentInsertBuf);
1152 freespace = PageGetFreeSpace(page);
1153 blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1154 ReleaseBuffer(state->bs_currentInsertBuf);
1155 RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1156 FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1157 }
1158
1159 brin_free_desc(state->bs_bdesc);
1160 pfree(state->bs_dtuple);
1161 pfree(state);
1162}
1163
1164/*
1165 * On the given BRIN index, summarize the heap page range that corresponds
1166 * to the heap block number given.
1167 *
1168 * This routine can run in parallel with insertions into the heap. To avoid
1169 * missing those values from the summary tuple, we first insert a placeholder
1170 * index tuple into the index, then execute the heap scan; transactions
1171 * concurrent with the scan update the placeholder tuple. After the scan, we
1172 * union the placeholder tuple with the one computed by this routine. The
1173 * update of the index value happens in a loop, so that if somebody updates
1174 * the placeholder tuple after we read it, we detect the case and try again.
1175 * This ensures that the concurrently inserted tuples are not lost.
1176 *
1177 * A further corner case is this routine being asked to summarize the partial
1178 * range at the end of the table. heapNumBlocks is the (possibly outdated)
1179 * table size; if we notice that the requested range lies beyond that size,
1180 * we re-compute the table size after inserting the placeholder tuple, to
1181 * avoid missing pages that were appended recently.
1182 */
1183static void
1184summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1185 BlockNumber heapBlk, BlockNumber heapNumBlks)
1186{
1187 Buffer phbuf;
1188 BrinTuple *phtup;
1189 Size phsz;
1190 OffsetNumber offset;
1191 BlockNumber scanNumBlks;
1192
1193 /*
1194 * Insert the placeholder tuple
1195 */
1196 phbuf = InvalidBuffer;
1197 phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1198 offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1199 state->bs_rmAccess, &phbuf,
1200 heapBlk, phtup, phsz);
1201
1202 /*
1203 * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1204 * cannot shrink concurrently (but it can grow).
1205 */
1206 Assert(heapBlk % state->bs_pagesPerRange == 0);
1207 if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1208 {
1209 /*
1210 * If we're asked to scan what we believe to be the final range on the
1211 * table (i.e. a range that might be partial) we need to recompute our
1212 * idea of what the latest page is after inserting the placeholder
1213 * tuple. Anyone that grows the table later will update the
1214 * placeholder tuple, so it doesn't matter that we won't scan these
1215 * pages ourselves. Careful: the table might have been extended
1216 * beyond the current range, so clamp our result.
1217 *
1218 * Fortunately, this should occur infrequently.
1219 */
1220 scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1221 state->bs_pagesPerRange);
1222 }
1223 else
1224 {
1225 /* Easy case: range is known to be complete */
1226 scanNumBlks = state->bs_pagesPerRange;
1227 }
1228
1229 /*
1230 * Execute the partial heap scan covering the heap blocks in the specified
1231 * page range, summarizing the heap tuples in it. This scan stops just
1232 * short of brinbuildCallback creating the new index entry.
1233 *
1234 * Note that it is critical we use the "any visible" mode of
1235 * table_index_build_range_scan here: otherwise, we would miss tuples
1236 * inserted by transactions that are still in progress, among other corner
1237 * cases.
1238 */
1239 state->bs_currRangeStart = heapBlk;
1240 table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1241 heapBlk, scanNumBlks,
1242 brinbuildCallback, (void *) state, NULL);
1243
1244 /*
1245 * Now we update the values obtained by the scan with the placeholder
1246 * tuple. We do this in a loop which only terminates if we're able to
1247 * update the placeholder tuple successfully; if we are not, this means
1248 * somebody else modified the placeholder tuple after we read it.
1249 */
1250 for (;;)
1251 {
1252 BrinTuple *newtup;
1253 Size newsize;
1254 bool didupdate;
1255 bool samepage;
1256
1257 CHECK_FOR_INTERRUPTS();
1258
1259 /*
1260 * Update the summary tuple and try to update.
1261 */
1262 newtup = brin_form_tuple(state->bs_bdesc,
1263 heapBlk, state->bs_dtuple, &newsize);
1264 samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1265 didupdate =
1266 brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1267 state->bs_rmAccess, heapBlk, phbuf, offset,
1268 phtup, phsz, newtup, newsize, samepage);
1269 brin_free_tuple(phtup);
1270 brin_free_tuple(newtup);
1271
1272 /* If the update succeeded, we're done. */
1273 if (didupdate)
1274 break;
1275
1276 /*
1277 * If the update didn't work, it might be because somebody updated the
1278 * placeholder tuple concurrently. Extract the new version, union it
1279 * with the values we have from the scan, and start over. (There are
1280 * other reasons for the update to fail, but it's simple to treat them
1281 * the same.)
1282 */
1283 phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1284 &offset, &phsz, BUFFER_LOCK_SHARE,
1285 NULL);
1286 /* the placeholder tuple must exist */
1287 if (phtup == NULL)
1288 elog(ERROR, "missing placeholder tuple");
1289 phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1290 LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1291
1292 /* merge it into the tuple from the heap scan */
1293 union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1294 }
1295
1296 ReleaseBuffer(phbuf);
1297}
1298
1299/*
1300 * Summarize page ranges that are not already summarized. If pageRange is
1301 * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1302 * page range containing the given heap page number is scanned.
1303 * If include_partial is true, then the partial range at the end of the table
1304 * is summarized, otherwise not.
1305 *
1306 * For each new index tuple inserted, *numSummarized (if not NULL) is
1307 * incremented; for each existing tuple, *numExisting (if not NULL) is
1308 * incremented.
1309 */
1310static void
1311brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1312 bool include_partial, double *numSummarized, double *numExisting)
1313{
1314 BrinRevmap *revmap;
1315 BrinBuildState *state = NULL;
1316 IndexInfo *indexInfo = NULL;
1317 BlockNumber heapNumBlocks;
1318 BlockNumber pagesPerRange;
1319 Buffer buf;
1320 BlockNumber startBlk;
1321
1322 revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1323
1324 /* determine range of pages to process */
1325 heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1326 if (pageRange == BRIN_ALL_BLOCKRANGES)
1327 startBlk = 0;
1328 else
1329 {
1330 startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1331 heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1332 }
1333 if (startBlk > heapNumBlocks)
1334 {
1335 /* Nothing to do if start point is beyond end of table */
1336 brinRevmapTerminate(revmap);
1337 return;
1338 }
1339
1340 /*
1341 * Scan the revmap to find unsummarized items.
1342 */
1343 buf = InvalidBuffer;
1344 for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1345 {
1346 BrinTuple *tup;
1347 OffsetNumber off;
1348
1349 /*
1350 * Unless requested to summarize even a partial range, go away now if
1351 * we think the next range is partial. Caller would pass true when it
1352 * is typically run once bulk data loading is done
1353 * (brin_summarize_new_values), and false when it is typically the
1354 * result of arbitrarily-scheduled maintenance command (vacuuming).
1355 */
1356 if (!include_partial &&
1357 (startBlk + pagesPerRange > heapNumBlocks))
1358 break;
1359
1360 CHECK_FOR_INTERRUPTS();
1361
1362 tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1363 BUFFER_LOCK_SHARE, NULL);
1364 if (tup == NULL)
1365 {
1366 /* no revmap entry for this heap range. Summarize it. */
1367 if (state == NULL)
1368 {
1369 /* first time through */
1370 Assert(!indexInfo);
1371 state = initialize_brin_buildstate(index, revmap,
1372 pagesPerRange);
1373 indexInfo = BuildIndexInfo(index);
1374 }
1375 summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1376
1377 /* and re-initialize state for the next range */
1378 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1379
1380 if (numSummarized)
1381 *numSummarized += 1.0;
1382 }
1383 else
1384 {
1385 if (numExisting)
1386 *numExisting += 1.0;
1387 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1388 }
1389 }
1390
1391 if (BufferIsValid(buf))
1392 ReleaseBuffer(buf);
1393
1394 /* free resources */
1395 brinRevmapTerminate(revmap);
1396 if (state)
1397 {
1398 terminate_brin_buildstate(state);
1399 pfree(indexInfo);
1400 }
1401}
1402
1403/*
1404 * Given a deformed tuple in the build state, convert it into the on-disk
1405 * format and insert it into the index, making the revmap point to it.
1406 */
1407static void
1408form_and_insert_tuple(BrinBuildState *state)
1409{
1410 BrinTuple *tup;
1411 Size size;
1412
1413 tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1414 state->bs_dtuple, &size);
1415 brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1416 &state->bs_currentInsertBuf, state->bs_currRangeStart,
1417 tup, size);
1418 state->bs_numtuples++;
1419
1420 pfree(tup);
1421}
1422
1423/*
1424 * Given two deformed tuples, adjust the first one so that it's consistent
1425 * with the summary values in both.
1426 */
1427static void
1428union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1429{
1430 int keyno;
1431 BrinMemTuple *db;
1432 MemoryContext cxt;
1433 MemoryContext oldcxt;
1434
1435 /* Use our own memory context to avoid retail pfree */
1436 cxt = AllocSetContextCreate(CurrentMemoryContext,
1437 "brin union",
1438 ALLOCSET_DEFAULT_SIZES);
1439 oldcxt = MemoryContextSwitchTo(cxt);
1440 db = brin_deform_tuple(bdesc, b, NULL);
1441 MemoryContextSwitchTo(oldcxt);
1442
1443 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1444 {
1445 FmgrInfo *unionFn;
1446 BrinValues *col_a = &a->bt_columns[keyno];
1447 BrinValues *col_b = &db->bt_columns[keyno];
1448
1449 unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1450 BRIN_PROCNUM_UNION);
1451 FunctionCall3Coll(unionFn,
1452 bdesc->bd_index->rd_indcollation[keyno],
1453 PointerGetDatum(bdesc),
1454 PointerGetDatum(col_a),
1455 PointerGetDatum(col_b));
1456 }
1457
1458 MemoryContextDelete(cxt);
1459}
1460
1461/*
1462 * brin_vacuum_scan
1463 * Do a complete scan of the index during VACUUM.
1464 *
1465 * This routine scans the complete index looking for uncatalogued index pages,
1466 * i.e. those that might have been lost due to a crash after index extension
1467 * and such.
1468 */
1469static void
1470brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1471{
1472 BlockNumber nblocks;
1473 BlockNumber blkno;
1474
1475 /*
1476 * Scan the index in physical order, and clean up any possible mess in
1477 * each page.
1478 */
1479 nblocks = RelationGetNumberOfBlocks(idxrel);
1480 for (blkno = 0; blkno < nblocks; blkno++)
1481 {
1482 Buffer buf;
1483
1484 CHECK_FOR_INTERRUPTS();
1485
1486 buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1487 RBM_NORMAL, strategy);
1488
1489 brin_page_cleanup(idxrel, buf);
1490
1491 ReleaseBuffer(buf);
1492 }
1493
1494 /*
1495 * Update all upper pages in the index's FSM, as well. This ensures not
1496 * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1497 * but also that any pre-existing damage or out-of-dateness is repaired.
1498 */
1499 FreeSpaceMapVacuum(idxrel);
1500}
1501