1/*-------------------------------------------------------------------------
2 *
3 * spgutils.c
4 * various support functions for SP-GiST
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgutils.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/amvalidate.h"
19#include "access/htup_details.h"
20#include "access/reloptions.h"
21#include "access/spgist_private.h"
22#include "access/transam.h"
23#include "access/xact.h"
24#include "catalog/pg_amop.h"
25#include "storage/bufmgr.h"
26#include "storage/indexfsm.h"
27#include "storage/lmgr.h"
28#include "utils/builtins.h"
29#include "utils/catcache.h"
30#include "utils/index_selfuncs.h"
31#include "utils/lsyscache.h"
32#include "utils/syscache.h"
33
34
35/*
36 * SP-GiST handler function: return IndexAmRoutine with access method parameters
37 * and callbacks.
38 */
39Datum
40spghandler(PG_FUNCTION_ARGS)
41{
42 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
43
44 amroutine->amstrategies = 0;
45 amroutine->amsupport = SPGISTNProc;
46 amroutine->amcanorder = false;
47 amroutine->amcanorderbyop = true;
48 amroutine->amcanbackward = false;
49 amroutine->amcanunique = false;
50 amroutine->amcanmulticol = false;
51 amroutine->amoptionalkey = true;
52 amroutine->amsearcharray = false;
53 amroutine->amsearchnulls = true;
54 amroutine->amstorage = false;
55 amroutine->amclusterable = false;
56 amroutine->ampredlocks = false;
57 amroutine->amcanparallel = false;
58 amroutine->amcaninclude = false;
59 amroutine->amkeytype = InvalidOid;
60
61 amroutine->ambuild = spgbuild;
62 amroutine->ambuildempty = spgbuildempty;
63 amroutine->aminsert = spginsert;
64 amroutine->ambulkdelete = spgbulkdelete;
65 amroutine->amvacuumcleanup = spgvacuumcleanup;
66 amroutine->amcanreturn = spgcanreturn;
67 amroutine->amcostestimate = spgcostestimate;
68 amroutine->amoptions = spgoptions;
69 amroutine->amproperty = spgproperty;
70 amroutine->ambuildphasename = NULL;
71 amroutine->amvalidate = spgvalidate;
72 amroutine->ambeginscan = spgbeginscan;
73 amroutine->amrescan = spgrescan;
74 amroutine->amgettuple = spggettuple;
75 amroutine->amgetbitmap = spggetbitmap;
76 amroutine->amendscan = spgendscan;
77 amroutine->ammarkpos = NULL;
78 amroutine->amrestrpos = NULL;
79 amroutine->amestimateparallelscan = NULL;
80 amroutine->aminitparallelscan = NULL;
81 amroutine->amparallelrescan = NULL;
82
83 PG_RETURN_POINTER(amroutine);
84}
85
86/* Fill in a SpGistTypeDesc struct with info about the specified data type */
87static void
88fillTypeDesc(SpGistTypeDesc *desc, Oid type)
89{
90 desc->type = type;
91 get_typlenbyval(type, &desc->attlen, &desc->attbyval);
92}
93
94/*
95 * Fetch local cache of AM-specific info about the index, initializing it
96 * if necessary
97 */
98SpGistCache *
99spgGetCache(Relation index)
100{
101 SpGistCache *cache;
102
103 if (index->rd_amcache == NULL)
104 {
105 Oid atttype;
106 spgConfigIn in;
107 FmgrInfo *procinfo;
108 Buffer metabuffer;
109 SpGistMetaPageData *metadata;
110
111 cache = MemoryContextAllocZero(index->rd_indexcxt,
112 sizeof(SpGistCache));
113
114 /* SPGiST doesn't support multi-column indexes */
115 Assert(index->rd_att->natts == 1);
116
117 /*
118 * Get the actual data type of the indexed column from the index
119 * tupdesc. We pass this to the opclass config function so that
120 * polymorphic opclasses are possible.
121 */
122 atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
123
124 /* Call the config function to get config info for the opclass */
125 in.attType = atttype;
126
127 procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
128 FunctionCall2Coll(procinfo,
129 index->rd_indcollation[0],
130 PointerGetDatum(&in),
131 PointerGetDatum(&cache->config));
132
133 /* Get the information we need about each relevant datatype */
134 fillTypeDesc(&cache->attType, atttype);
135
136 if (OidIsValid(cache->config.leafType) &&
137 cache->config.leafType != atttype)
138 {
139 if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
140 ereport(ERROR,
141 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
142 errmsg("compress method must be defined when leaf type is different from input type")));
143
144 fillTypeDesc(&cache->attLeafType, cache->config.leafType);
145 }
146 else
147 {
148 cache->attLeafType = cache->attType;
149 }
150
151 fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
152 fillTypeDesc(&cache->attLabelType, cache->config.labelType);
153
154 /* Last, get the lastUsedPages data from the metapage */
155 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
156 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
157
158 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
159
160 if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
161 elog(ERROR, "index \"%s\" is not an SP-GiST index",
162 RelationGetRelationName(index));
163
164 cache->lastUsedPages = metadata->lastUsedPages;
165
166 UnlockReleaseBuffer(metabuffer);
167
168 index->rd_amcache = (void *) cache;
169 }
170 else
171 {
172 /* assume it's up to date */
173 cache = (SpGistCache *) index->rd_amcache;
174 }
175
176 return cache;
177}
178
179/* Initialize SpGistState for working with the given index */
180void
181initSpGistState(SpGistState *state, Relation index)
182{
183 SpGistCache *cache;
184
185 /* Get cached static information about index */
186 cache = spgGetCache(index);
187
188 state->config = cache->config;
189 state->attType = cache->attType;
190 state->attLeafType = cache->attLeafType;
191 state->attPrefixType = cache->attPrefixType;
192 state->attLabelType = cache->attLabelType;
193
194 /* Make workspace for constructing dead tuples */
195 state->deadTupleStorage = palloc0(SGDTSIZE);
196
197 /* Set XID to use in redirection tuples */
198 state->myXid = GetTopTransactionIdIfAny();
199
200 /* Assume we're not in an index build (spgbuild will override) */
201 state->isBuild = false;
202}
203
204/*
205 * Allocate a new page (either by recycling, or by extending the index file).
206 *
207 * The returned buffer is already pinned and exclusive-locked.
208 * Caller is responsible for initializing the page by calling SpGistInitBuffer.
209 */
210Buffer
211SpGistNewBuffer(Relation index)
212{
213 Buffer buffer;
214 bool needLock;
215
216 /* First, try to get a page from FSM */
217 for (;;)
218 {
219 BlockNumber blkno = GetFreeIndexPage(index);
220
221 if (blkno == InvalidBlockNumber)
222 break; /* nothing known to FSM */
223
224 /*
225 * The fixed pages shouldn't ever be listed in FSM, but just in case
226 * one is, ignore it.
227 */
228 if (SpGistBlockIsFixed(blkno))
229 continue;
230
231 buffer = ReadBuffer(index, blkno);
232
233 /*
234 * We have to guard against the possibility that someone else already
235 * recycled this page; the buffer may be locked if so.
236 */
237 if (ConditionalLockBuffer(buffer))
238 {
239 Page page = BufferGetPage(buffer);
240
241 if (PageIsNew(page))
242 return buffer; /* OK to use, if never initialized */
243
244 if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
245 return buffer; /* OK to use */
246
247 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
248 }
249
250 /* Can't use it, so release buffer and try again */
251 ReleaseBuffer(buffer);
252 }
253
254 /* Must extend the file */
255 needLock = !RELATION_IS_LOCAL(index);
256 if (needLock)
257 LockRelationForExtension(index, ExclusiveLock);
258
259 buffer = ReadBuffer(index, P_NEW);
260 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
261
262 if (needLock)
263 UnlockRelationForExtension(index, ExclusiveLock);
264
265 return buffer;
266}
267
268/*
269 * Update index metapage's lastUsedPages info from local cache, if possible
270 *
271 * Updating meta page isn't critical for index working, so
272 * 1 use ConditionalLockBuffer to improve concurrency
273 * 2 don't WAL-log metabuffer changes to decrease WAL traffic
274 */
275void
276SpGistUpdateMetaPage(Relation index)
277{
278 SpGistCache *cache = (SpGistCache *) index->rd_amcache;
279
280 if (cache != NULL)
281 {
282 Buffer metabuffer;
283
284 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
285
286 if (ConditionalLockBuffer(metabuffer))
287 {
288 Page metapage = BufferGetPage(metabuffer);
289 SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
290
291 metadata->lastUsedPages = cache->lastUsedPages;
292
293 /*
294 * Set pd_lower just past the end of the metadata. This is
295 * essential, because without doing so, metadata will be lost if
296 * xlog.c compresses the page. (We must do this here because
297 * pre-v11 versions of PG did not set the metapage's pd_lower
298 * correctly, so a pg_upgraded index might contain the wrong
299 * value.)
300 */
301 ((PageHeader) metapage)->pd_lower =
302 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
303
304 MarkBufferDirty(metabuffer);
305 UnlockReleaseBuffer(metabuffer);
306 }
307 else
308 {
309 ReleaseBuffer(metabuffer);
310 }
311 }
312}
313
314/* Macro to select proper element of lastUsedPages cache depending on flags */
315/* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
316#define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
317
318/*
319 * Allocate and initialize a new buffer of the type and parity specified by
320 * flags. The returned buffer is already pinned and exclusive-locked.
321 *
322 * When requesting an inner page, if we get one with the wrong parity,
323 * we just release the buffer and try again. We will get a different page
324 * because GetFreeIndexPage will have marked the page used in FSM. The page
325 * is entered in our local lastUsedPages cache, so there's some hope of
326 * making use of it later in this session, but otherwise we rely on VACUUM
327 * to eventually re-enter the page in FSM, making it available for recycling.
328 * Note that such a page does not get marked dirty here, so unless it's used
329 * fairly soon, the buffer will just get discarded and the page will remain
330 * as it was on disk.
331 *
332 * When we return a buffer to the caller, the page is *not* entered into
333 * the lastUsedPages cache; we expect the caller will do so after it's taken
334 * whatever space it will use. This is because after the caller has used up
335 * some space, the page might have less space than whatever was cached already
336 * so we'd rather not trash the old cache entry.
337 */
338static Buffer
339allocNewBuffer(Relation index, int flags)
340{
341 SpGistCache *cache = spgGetCache(index);
342 uint16 pageflags = 0;
343
344 if (GBUF_REQ_LEAF(flags))
345 pageflags |= SPGIST_LEAF;
346 if (GBUF_REQ_NULLS(flags))
347 pageflags |= SPGIST_NULLS;
348
349 for (;;)
350 {
351 Buffer buffer;
352
353 buffer = SpGistNewBuffer(index);
354 SpGistInitBuffer(buffer, pageflags);
355
356 if (pageflags & SPGIST_LEAF)
357 {
358 /* Leaf pages have no parity concerns, so just use it */
359 return buffer;
360 }
361 else
362 {
363 BlockNumber blkno = BufferGetBlockNumber(buffer);
364 int blkFlags = GBUF_INNER_PARITY(blkno);
365
366 if ((flags & GBUF_PARITY_MASK) == blkFlags)
367 {
368 /* Page has right parity, use it */
369 return buffer;
370 }
371 else
372 {
373 /* Page has wrong parity, record it in cache and try again */
374 if (pageflags & SPGIST_NULLS)
375 blkFlags |= GBUF_NULLS;
376 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
377 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
378 PageGetExactFreeSpace(BufferGetPage(buffer));
379 UnlockReleaseBuffer(buffer);
380 }
381 }
382 }
383}
384
385/*
386 * Get a buffer of the type and parity specified by flags, having at least
387 * as much free space as indicated by needSpace. We use the lastUsedPages
388 * cache to assign the same buffer previously requested when possible.
389 * The returned buffer is already pinned and exclusive-locked.
390 *
391 * *isNew is set true if the page was initialized here, false if it was
392 * already valid.
393 */
394Buffer
395SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
396{
397 SpGistCache *cache = spgGetCache(index);
398 SpGistLastUsedPage *lup;
399
400 /* Bail out if even an empty page wouldn't meet the demand */
401 if (needSpace > SPGIST_PAGE_CAPACITY)
402 elog(ERROR, "desired SPGiST tuple size is too big");
403
404 /*
405 * If possible, increase the space request to include relation's
406 * fillfactor. This ensures that when we add unrelated tuples to a page,
407 * we try to keep 100-fillfactor% available for adding tuples that are
408 * related to the ones already on it. But fillfactor mustn't cause an
409 * error for requests that would otherwise be legal.
410 */
411 needSpace += RelationGetTargetPageFreeSpace(index,
412 SPGIST_DEFAULT_FILLFACTOR);
413 needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
414
415 /* Get the cache entry for this flags setting */
416 lup = GET_LUP(cache, flags);
417
418 /* If we have nothing cached, just turn it over to allocNewBuffer */
419 if (lup->blkno == InvalidBlockNumber)
420 {
421 *isNew = true;
422 return allocNewBuffer(index, flags);
423 }
424
425 /* fixed pages should never be in cache */
426 Assert(!SpGistBlockIsFixed(lup->blkno));
427
428 /* If cached freeSpace isn't enough, don't bother looking at the page */
429 if (lup->freeSpace >= needSpace)
430 {
431 Buffer buffer;
432 Page page;
433
434 buffer = ReadBuffer(index, lup->blkno);
435
436 if (!ConditionalLockBuffer(buffer))
437 {
438 /*
439 * buffer is locked by another process, so return a new buffer
440 */
441 ReleaseBuffer(buffer);
442 *isNew = true;
443 return allocNewBuffer(index, flags);
444 }
445
446 page = BufferGetPage(buffer);
447
448 if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
449 {
450 /* OK to initialize the page */
451 uint16 pageflags = 0;
452
453 if (GBUF_REQ_LEAF(flags))
454 pageflags |= SPGIST_LEAF;
455 if (GBUF_REQ_NULLS(flags))
456 pageflags |= SPGIST_NULLS;
457 SpGistInitBuffer(buffer, pageflags);
458 lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
459 *isNew = true;
460 return buffer;
461 }
462
463 /*
464 * Check that page is of right type and has enough space. We must
465 * recheck this since our cache isn't necessarily up to date.
466 */
467 if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
468 (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
469 {
470 int freeSpace = PageGetExactFreeSpace(page);
471
472 if (freeSpace >= needSpace)
473 {
474 /* Success, update freespace info and return the buffer */
475 lup->freeSpace = freeSpace - needSpace;
476 *isNew = false;
477 return buffer;
478 }
479 }
480
481 /*
482 * fallback to allocation of new buffer
483 */
484 UnlockReleaseBuffer(buffer);
485 }
486
487 /* No success with cache, so return a new buffer */
488 *isNew = true;
489 return allocNewBuffer(index, flags);
490}
491
492/*
493 * Update lastUsedPages cache when done modifying a page.
494 *
495 * We update the appropriate cache entry if it already contained this page
496 * (its freeSpace is likely obsolete), or if this page has more space than
497 * whatever we had cached.
498 */
499void
500SpGistSetLastUsedPage(Relation index, Buffer buffer)
501{
502 SpGistCache *cache = spgGetCache(index);
503 SpGistLastUsedPage *lup;
504 int freeSpace;
505 Page page = BufferGetPage(buffer);
506 BlockNumber blkno = BufferGetBlockNumber(buffer);
507 int flags;
508
509 /* Never enter fixed pages (root pages) in cache, though */
510 if (SpGistBlockIsFixed(blkno))
511 return;
512
513 if (SpGistPageIsLeaf(page))
514 flags = GBUF_LEAF;
515 else
516 flags = GBUF_INNER_PARITY(blkno);
517 if (SpGistPageStoresNulls(page))
518 flags |= GBUF_NULLS;
519
520 lup = GET_LUP(cache, flags);
521
522 freeSpace = PageGetExactFreeSpace(page);
523 if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
524 lup->freeSpace < freeSpace)
525 {
526 lup->blkno = blkno;
527 lup->freeSpace = freeSpace;
528 }
529}
530
531/*
532 * Initialize an SPGiST page to empty, with specified flags
533 */
534void
535SpGistInitPage(Page page, uint16 f)
536{
537 SpGistPageOpaque opaque;
538
539 PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
540 opaque = SpGistPageGetOpaque(page);
541 memset(opaque, 0, sizeof(SpGistPageOpaqueData));
542 opaque->flags = f;
543 opaque->spgist_page_id = SPGIST_PAGE_ID;
544}
545
546/*
547 * Initialize a buffer's page to empty, with specified flags
548 */
549void
550SpGistInitBuffer(Buffer b, uint16 f)
551{
552 Assert(BufferGetPageSize(b) == BLCKSZ);
553 SpGistInitPage(BufferGetPage(b), f);
554}
555
556/*
557 * Initialize metadata page
558 */
559void
560SpGistInitMetapage(Page page)
561{
562 SpGistMetaPageData *metadata;
563 int i;
564
565 SpGistInitPage(page, SPGIST_META);
566 metadata = SpGistPageGetMeta(page);
567 memset(metadata, 0, sizeof(SpGistMetaPageData));
568 metadata->magicNumber = SPGIST_MAGIC_NUMBER;
569
570 /* initialize last-used-page cache to empty */
571 for (i = 0; i < SPGIST_CACHED_PAGES; i++)
572 metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
573
574 /*
575 * Set pd_lower just past the end of the metadata. This is essential,
576 * because without doing so, metadata will be lost if xlog.c compresses
577 * the page.
578 */
579 ((PageHeader) page)->pd_lower =
580 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
581}
582
583/*
584 * reloptions processing for SPGiST
585 */
586bytea *
587spgoptions(Datum reloptions, bool validate)
588{
589 return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
590}
591
592/*
593 * Get the space needed to store a non-null datum of the indicated type.
594 * Note the result is already rounded up to a MAXALIGN boundary.
595 * Also, we follow the SPGiST convention that pass-by-val types are
596 * just stored in their Datum representation (compare memcpyDatum).
597 */
598unsigned int
599SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
600{
601 unsigned int size;
602
603 if (att->attbyval)
604 size = sizeof(Datum);
605 else if (att->attlen > 0)
606 size = att->attlen;
607 else
608 size = VARSIZE_ANY(datum);
609
610 return MAXALIGN(size);
611}
612
613/*
614 * Copy the given non-null datum to *target
615 */
616static void
617memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
618{
619 unsigned int size;
620
621 if (att->attbyval)
622 {
623 memcpy(target, &datum, sizeof(Datum));
624 }
625 else
626 {
627 size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
628 memcpy(target, DatumGetPointer(datum), size);
629 }
630}
631
632/*
633 * Construct a leaf tuple containing the given heap TID and datum value
634 */
635SpGistLeafTuple
636spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
637 Datum datum, bool isnull)
638{
639 SpGistLeafTuple tup;
640 unsigned int size;
641
642 /* compute space needed (note result is already maxaligned) */
643 size = SGLTHDRSZ;
644 if (!isnull)
645 size += SpGistGetTypeSize(&state->attLeafType, datum);
646
647 /*
648 * Ensure that we can replace the tuple with a dead tuple later. This
649 * test is unnecessary when !isnull, but let's be safe.
650 */
651 if (size < SGDTSIZE)
652 size = SGDTSIZE;
653
654 /* OK, form the tuple */
655 tup = (SpGistLeafTuple) palloc0(size);
656
657 tup->size = size;
658 tup->nextOffset = InvalidOffsetNumber;
659 tup->heapPtr = *heapPtr;
660 if (!isnull)
661 memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
662
663 return tup;
664}
665
666/*
667 * Construct a node (to go into an inner tuple) containing the given label
668 *
669 * Note that the node's downlink is just set invalid here. Caller will fill
670 * it in later.
671 */
672SpGistNodeTuple
673spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
674{
675 SpGistNodeTuple tup;
676 unsigned int size;
677 unsigned short infomask = 0;
678
679 /* compute space needed (note result is already maxaligned) */
680 size = SGNTHDRSZ;
681 if (!isnull)
682 size += SpGistGetTypeSize(&state->attLabelType, label);
683
684 /*
685 * Here we make sure that the size will fit in the field reserved for it
686 * in t_info.
687 */
688 if ((size & INDEX_SIZE_MASK) != size)
689 ereport(ERROR,
690 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
691 errmsg("index row requires %zu bytes, maximum size is %zu",
692 (Size) size, (Size) INDEX_SIZE_MASK)));
693
694 tup = (SpGistNodeTuple) palloc0(size);
695
696 if (isnull)
697 infomask |= INDEX_NULL_MASK;
698 /* we don't bother setting the INDEX_VAR_MASK bit */
699 infomask |= size;
700 tup->t_info = infomask;
701
702 /* The TID field will be filled in later */
703 ItemPointerSetInvalid(&tup->t_tid);
704
705 if (!isnull)
706 memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
707
708 return tup;
709}
710
711/*
712 * Construct an inner tuple containing the given prefix and node array
713 */
714SpGistInnerTuple
715spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
716 int nNodes, SpGistNodeTuple *nodes)
717{
718 SpGistInnerTuple tup;
719 unsigned int size;
720 unsigned int prefixSize;
721 int i;
722 char *ptr;
723
724 /* Compute size needed */
725 if (hasPrefix)
726 prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
727 else
728 prefixSize = 0;
729
730 size = SGITHDRSZ + prefixSize;
731
732 /* Note: we rely on node tuple sizes to be maxaligned already */
733 for (i = 0; i < nNodes; i++)
734 size += IndexTupleSize(nodes[i]);
735
736 /*
737 * Ensure that we can replace the tuple with a dead tuple later. This
738 * test is unnecessary given current tuple layouts, but let's be safe.
739 */
740 if (size < SGDTSIZE)
741 size = SGDTSIZE;
742
743 /*
744 * Inner tuple should be small enough to fit on a page
745 */
746 if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
747 ereport(ERROR,
748 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
749 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
750 (Size) size,
751 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
752 errhint("Values larger than a buffer page cannot be indexed.")));
753
754 /*
755 * Check for overflow of header fields --- probably can't fail if the
756 * above succeeded, but let's be paranoid
757 */
758 if (size > SGITMAXSIZE ||
759 prefixSize > SGITMAXPREFIXSIZE ||
760 nNodes > SGITMAXNNODES)
761 elog(ERROR, "SPGiST inner tuple header field is too small");
762
763 /* OK, form the tuple */
764 tup = (SpGistInnerTuple) palloc0(size);
765
766 tup->nNodes = nNodes;
767 tup->prefixSize = prefixSize;
768 tup->size = size;
769
770 if (hasPrefix)
771 memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
772
773 ptr = (char *) SGITNODEPTR(tup);
774
775 for (i = 0; i < nNodes; i++)
776 {
777 SpGistNodeTuple node = nodes[i];
778
779 memcpy(ptr, node, IndexTupleSize(node));
780 ptr += IndexTupleSize(node);
781 }
782
783 return tup;
784}
785
786/*
787 * Construct a "dead" tuple to replace a tuple being deleted.
788 *
789 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
790 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
791 * the xid field is filled in automatically.
792 *
793 * This is called in critical sections, so we don't use palloc; the tuple
794 * is built in preallocated storage. It should be copied before another
795 * call with different parameters can occur.
796 */
797SpGistDeadTuple
798spgFormDeadTuple(SpGistState *state, int tupstate,
799 BlockNumber blkno, OffsetNumber offnum)
800{
801 SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
802
803 tuple->tupstate = tupstate;
804 tuple->size = SGDTSIZE;
805 tuple->nextOffset = InvalidOffsetNumber;
806
807 if (tupstate == SPGIST_REDIRECT)
808 {
809 ItemPointerSet(&tuple->pointer, blkno, offnum);
810 Assert(TransactionIdIsValid(state->myXid));
811 tuple->xid = state->myXid;
812 }
813 else
814 {
815 ItemPointerSetInvalid(&tuple->pointer);
816 tuple->xid = InvalidTransactionId;
817 }
818
819 return tuple;
820}
821
822/*
823 * Extract the label datums of the nodes within innerTuple
824 *
825 * Returns NULL if label datums are NULLs
826 */
827Datum *
828spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
829{
830 Datum *nodeLabels;
831 int i;
832 SpGistNodeTuple node;
833
834 /* Either all the labels must be NULL, or none. */
835 node = SGITNODEPTR(innerTuple);
836 if (IndexTupleHasNulls(node))
837 {
838 SGITITERATE(innerTuple, i, node)
839 {
840 if (!IndexTupleHasNulls(node))
841 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
842 }
843 /* They're all null, so just return NULL */
844 return NULL;
845 }
846 else
847 {
848 nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
849 SGITITERATE(innerTuple, i, node)
850 {
851 if (IndexTupleHasNulls(node))
852 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
853 nodeLabels[i] = SGNTDATUM(node, state);
854 }
855 return nodeLabels;
856 }
857}
858
859/*
860 * Add a new item to the page, replacing a PLACEHOLDER item if possible.
861 * Return the location it's inserted at, or InvalidOffsetNumber on failure.
862 *
863 * If startOffset isn't NULL, we start searching for placeholders at
864 * *startOffset, and update that to the next place to search. This is just
865 * an optimization for repeated insertions.
866 *
867 * If errorOK is false, we throw error when there's not enough room,
868 * rather than returning InvalidOffsetNumber.
869 */
870OffsetNumber
871SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
872 OffsetNumber *startOffset, bool errorOK)
873{
874 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
875 OffsetNumber i,
876 maxoff,
877 offnum;
878
879 if (opaque->nPlaceholder > 0 &&
880 PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
881 {
882 /* Try to replace a placeholder */
883 maxoff = PageGetMaxOffsetNumber(page);
884 offnum = InvalidOffsetNumber;
885
886 for (;;)
887 {
888 if (startOffset && *startOffset != InvalidOffsetNumber)
889 i = *startOffset;
890 else
891 i = FirstOffsetNumber;
892 for (; i <= maxoff; i++)
893 {
894 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
895 PageGetItemId(page, i));
896
897 if (it->tupstate == SPGIST_PLACEHOLDER)
898 {
899 offnum = i;
900 break;
901 }
902 }
903
904 /* Done if we found a placeholder */
905 if (offnum != InvalidOffsetNumber)
906 break;
907
908 if (startOffset && *startOffset != InvalidOffsetNumber)
909 {
910 /* Hint was no good, re-search from beginning */
911 *startOffset = InvalidOffsetNumber;
912 continue;
913 }
914
915 /* Hmm, no placeholder found? */
916 opaque->nPlaceholder = 0;
917 break;
918 }
919
920 if (offnum != InvalidOffsetNumber)
921 {
922 /* Replace the placeholder tuple */
923 PageIndexTupleDelete(page, offnum);
924
925 offnum = PageAddItem(page, item, size, offnum, false, false);
926
927 /*
928 * We should not have failed given the size check at the top of
929 * the function, but test anyway. If we did fail, we must PANIC
930 * because we've already deleted the placeholder tuple, and
931 * there's no other way to keep the damage from getting to disk.
932 */
933 if (offnum != InvalidOffsetNumber)
934 {
935 Assert(opaque->nPlaceholder > 0);
936 opaque->nPlaceholder--;
937 if (startOffset)
938 *startOffset = offnum + 1;
939 }
940 else
941 elog(PANIC, "failed to add item of size %u to SPGiST index page",
942 (int) size);
943
944 return offnum;
945 }
946 }
947
948 /* No luck in replacing a placeholder, so just add it to the page */
949 offnum = PageAddItem(page, item, size,
950 InvalidOffsetNumber, false, false);
951
952 if (offnum == InvalidOffsetNumber && !errorOK)
953 elog(ERROR, "failed to add item of size %u to SPGiST index page",
954 (int) size);
955
956 return offnum;
957}
958
959/*
960 * spgproperty() -- Check boolean properties of indexes.
961 *
962 * This is optional for most AMs, but is required for SP-GiST because the core
963 * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
964 */
965bool
966spgproperty(Oid index_oid, int attno,
967 IndexAMProperty prop, const char *propname,
968 bool *res, bool *isnull)
969{
970 Oid opclass,
971 opfamily,
972 opcintype;
973 CatCList *catlist;
974 int i;
975
976 /* Only answer column-level inquiries */
977 if (attno == 0)
978 return false;
979
980 switch (prop)
981 {
982 case AMPROP_DISTANCE_ORDERABLE:
983 break;
984 default:
985 return false;
986 }
987
988 /*
989 * Currently, SP-GiST distance-ordered scans require that there be a
990 * distance operator in the opclass with the default types. So we assume
991 * that if such a operator exists, then there's a reason for it.
992 */
993
994 /* First we need to know the column's opclass. */
995 opclass = get_index_column_opclass(index_oid, attno);
996 if (!OidIsValid(opclass))
997 {
998 *isnull = true;
999 return true;
1000 }
1001
1002 /* Now look up the opclass family and input datatype. */
1003 if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1004 {
1005 *isnull = true;
1006 return true;
1007 }
1008
1009 /* And now we can check whether the operator is provided. */
1010 catlist = SearchSysCacheList1(AMOPSTRATEGY,
1011 ObjectIdGetDatum(opfamily));
1012
1013 *res = false;
1014
1015 for (i = 0; i < catlist->n_members; i++)
1016 {
1017 HeapTuple amoptup = &catlist->members[i]->tuple;
1018 Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
1019
1020 if (amopform->amoppurpose == AMOP_ORDER &&
1021 (amopform->amoplefttype == opcintype ||
1022 amopform->amoprighttype == opcintype) &&
1023 opfamily_can_sort_type(amopform->amopsortfamily,
1024 get_op_rettype(amopform->amopopr)))
1025 {
1026 *res = true;
1027 break;
1028 }
1029 }
1030
1031 ReleaseSysCacheList(catlist);
1032
1033 *isnull = false;
1034
1035 return true;
1036}
1037