1/*-------------------------------------------------------------------------
2 *
3 * rangetypes_spgist.c
4 * implementation of quad tree over ranges mapped to 2d-points for SP-GiST.
5 *
6 * Quad tree is a data structure similar to a binary tree, but is adapted to
7 * 2d data. Each inner node of a quad tree contains a point (centroid) which
8 * divides the 2d-space into 4 quadrants. Each quadrant is associated with a
9 * child node.
10 *
11 * Ranges are mapped to 2d-points so that the lower bound is one dimension,
12 * and the upper bound is another. By convention, we visualize the lower bound
13 * to be the horizontal axis, and upper bound the vertical axis.
14 *
15 * One quirk with this mapping is the handling of empty ranges. An empty range
16 * doesn't have lower and upper bounds, so it cannot be mapped to 2d space in
17 * a straightforward way. To cope with that, the root node can have a 5th
18 * quadrant, which is reserved for empty ranges. Furthermore, there can be
19 * inner nodes in the tree with no centroid. They contain only two child nodes,
20 * one for empty ranges and another for non-empty ones. Such a node can appear
21 * as the root node, or in the tree under the 5th child of the root node (in
22 * which case it will only contain empty nodes).
23 *
24 * The SP-GiST picksplit function uses medians along both axes as the centroid.
25 * This implementation only uses the comparison function of the range element
26 * datatype, therefore it works for any range type.
27 *
28 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
29 * Portions Copyright (c) 1994, Regents of the University of California
30 *
31 * IDENTIFICATION
32 * src/backend/utils/adt/rangetypes_spgist.c
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include "access/spgist.h"
40#include "access/stratnum.h"
41#include "catalog/pg_type.h"
42#include "utils/builtins.h"
43#include "utils/datum.h"
44#include "utils/rangetypes.h"
45
46static int16 getQuadrant(TypeCacheEntry *typcache, RangeType *centroid,
47 RangeType *tst);
48static int bound_cmp(const void *a, const void *b, void *arg);
49
50static int adjacent_inner_consistent(TypeCacheEntry *typcache,
51 RangeBound *arg, RangeBound *centroid,
52 RangeBound *prev);
53static int adjacent_cmp_bounds(TypeCacheEntry *typcache, RangeBound *arg,
54 RangeBound *centroid);
55
56/*
57 * SP-GiST 'config' interface function.
58 */
59Datum
60spg_range_quad_config(PG_FUNCTION_ARGS)
61{
62 /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
63 spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
64
65 cfg->prefixType = ANYRANGEOID;
66 cfg->labelType = VOIDOID; /* we don't need node labels */
67 cfg->canReturnData = true;
68 cfg->longValuesOK = false;
69 PG_RETURN_VOID();
70}
71
72/*----------
73 * Determine which quadrant a 2d-mapped range falls into, relative to the
74 * centroid.
75 *
76 * Quadrants are numbered like this:
77 *
78 * 4 | 1
79 * ----+----
80 * 3 | 2
81 *
82 * Where the lower bound of range is the horizontal axis and upper bound the
83 * vertical axis.
84 *
85 * Ranges on one of the axes are taken to lie in the quadrant with higher value
86 * along perpendicular axis. That is, a value on the horizontal axis is taken
87 * to belong to quadrant 1 or 4, and a value on the vertical axis is taken to
88 * belong to quadrant 1 or 2. A range equal to centroid is taken to lie in
89 * quadrant 1.
90 *
91 * Empty ranges are taken to lie in the special quadrant 5.
92 *----------
93 */
94static int16
95getQuadrant(TypeCacheEntry *typcache, RangeType *centroid, RangeType *tst)
96{
97 RangeBound centroidLower,
98 centroidUpper;
99 bool centroidEmpty;
100 RangeBound lower,
101 upper;
102 bool empty;
103
104 range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
105 &centroidEmpty);
106 range_deserialize(typcache, tst, &lower, &upper, &empty);
107
108 if (empty)
109 return 5;
110
111 if (range_cmp_bounds(typcache, &lower, &centroidLower) >= 0)
112 {
113 if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
114 return 1;
115 else
116 return 2;
117 }
118 else
119 {
120 if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
121 return 4;
122 else
123 return 3;
124 }
125}
126
127/*
128 * Choose SP-GiST function: choose path for addition of new range.
129 */
130Datum
131spg_range_quad_choose(PG_FUNCTION_ARGS)
132{
133 spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
134 spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
135 RangeType *inRange = DatumGetRangeTypeP(in->datum),
136 *centroid;
137 int16 quadrant;
138 TypeCacheEntry *typcache;
139
140 if (in->allTheSame)
141 {
142 out->resultType = spgMatchNode;
143 /* nodeN will be set by core */
144 out->result.matchNode.levelAdd = 0;
145 out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
146 PG_RETURN_VOID();
147 }
148
149 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(inRange));
150
151 /*
152 * A node with no centroid divides ranges purely on whether they're empty
153 * or not. All empty ranges go to child node 0, all non-empty ranges go to
154 * node 1.
155 */
156 if (!in->hasPrefix)
157 {
158 out->resultType = spgMatchNode;
159 if (RangeIsEmpty(inRange))
160 out->result.matchNode.nodeN = 0;
161 else
162 out->result.matchNode.nodeN = 1;
163 out->result.matchNode.levelAdd = 1;
164 out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
165 PG_RETURN_VOID();
166 }
167
168 centroid = DatumGetRangeTypeP(in->prefixDatum);
169 quadrant = getQuadrant(typcache, centroid, inRange);
170
171 Assert(quadrant <= in->nNodes);
172
173 /* Select node matching to quadrant number */
174 out->resultType = spgMatchNode;
175 out->result.matchNode.nodeN = quadrant - 1;
176 out->result.matchNode.levelAdd = 1;
177 out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
178
179 PG_RETURN_VOID();
180}
181
182/*
183 * Bound comparison for sorting.
184 */
185static int
186bound_cmp(const void *a, const void *b, void *arg)
187{
188 RangeBound *ba = (RangeBound *) a;
189 RangeBound *bb = (RangeBound *) b;
190 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
191
192 return range_cmp_bounds(typcache, ba, bb);
193}
194
195/*
196 * Picksplit SP-GiST function: split ranges into nodes. Select "centroid"
197 * range and distribute ranges according to quadrants.
198 */
199Datum
200spg_range_quad_picksplit(PG_FUNCTION_ARGS)
201{
202 spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
203 spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
204 int i;
205 int j;
206 int nonEmptyCount;
207 RangeType *centroid;
208 bool empty;
209 TypeCacheEntry *typcache;
210
211 /* Use the median values of lower and upper bounds as the centroid range */
212 RangeBound *lowerBounds,
213 *upperBounds;
214
215 typcache = range_get_typcache(fcinfo,
216 RangeTypeGetOid(DatumGetRangeTypeP(in->datums[0])));
217
218 /* Allocate memory for bounds */
219 lowerBounds = palloc(sizeof(RangeBound) * in->nTuples);
220 upperBounds = palloc(sizeof(RangeBound) * in->nTuples);
221 j = 0;
222
223 /* Deserialize bounds of ranges, count non-empty ranges */
224 for (i = 0; i < in->nTuples; i++)
225 {
226 range_deserialize(typcache, DatumGetRangeTypeP(in->datums[i]),
227 &lowerBounds[j], &upperBounds[j], &empty);
228 if (!empty)
229 j++;
230 }
231 nonEmptyCount = j;
232
233 /*
234 * All the ranges are empty. The best we can do is to construct an inner
235 * node with no centroid, and put all ranges into node 0. If non-empty
236 * ranges are added later, they will be routed to node 1.
237 */
238 if (nonEmptyCount == 0)
239 {
240 out->nNodes = 2;
241 out->hasPrefix = false;
242 /* Prefix is empty */
243 out->prefixDatum = PointerGetDatum(NULL);
244 out->nodeLabels = NULL;
245
246 out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
247 out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
248
249 /* Place all ranges into node 0 */
250 for (i = 0; i < in->nTuples; i++)
251 {
252 RangeType *range = DatumGetRangeTypeP(in->datums[i]);
253
254 out->leafTupleDatums[i] = RangeTypePGetDatum(range);
255 out->mapTuplesToNodes[i] = 0;
256 }
257 PG_RETURN_VOID();
258 }
259
260 /* Sort range bounds in order to find medians */
261 qsort_arg(lowerBounds, nonEmptyCount, sizeof(RangeBound),
262 bound_cmp, typcache);
263 qsort_arg(upperBounds, nonEmptyCount, sizeof(RangeBound),
264 bound_cmp, typcache);
265
266 /* Construct "centroid" range from medians of lower and upper bounds */
267 centroid = range_serialize(typcache, &lowerBounds[nonEmptyCount / 2],
268 &upperBounds[nonEmptyCount / 2], false);
269 out->hasPrefix = true;
270 out->prefixDatum = RangeTypePGetDatum(centroid);
271
272 /* Create node for empty ranges only if it is a root node */
273 out->nNodes = (in->level == 0) ? 5 : 4;
274 out->nodeLabels = NULL; /* we don't need node labels */
275
276 out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
277 out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
278
279 /*
280 * Assign ranges to corresponding nodes according to quadrants relative to
281 * "centroid" range.
282 */
283 for (i = 0; i < in->nTuples; i++)
284 {
285 RangeType *range = DatumGetRangeTypeP(in->datums[i]);
286 int16 quadrant = getQuadrant(typcache, centroid, range);
287
288 out->leafTupleDatums[i] = RangeTypePGetDatum(range);
289 out->mapTuplesToNodes[i] = quadrant - 1;
290 }
291
292 PG_RETURN_VOID();
293}
294
295/*
296 * SP-GiST consistent function for inner nodes: check which nodes are
297 * consistent with given set of queries.
298 */
299Datum
300spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
301{
302 spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
303 spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
304 int which;
305 int i;
306 MemoryContext oldCtx;
307
308 /*
309 * For adjacent search we need also previous centroid (if any) to improve
310 * the precision of the consistent check. In this case needPrevious flag
311 * is set and centroid is passed into traversalValue.
312 */
313 bool needPrevious = false;
314
315 if (in->allTheSame)
316 {
317 /* Report that all nodes should be visited */
318 out->nNodes = in->nNodes;
319 out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
320 for (i = 0; i < in->nNodes; i++)
321 out->nodeNumbers[i] = i;
322 PG_RETURN_VOID();
323 }
324
325 if (!in->hasPrefix)
326 {
327 /*
328 * No centroid on this inner node. Such a node has two child nodes,
329 * the first for empty ranges, and the second for non-empty ones.
330 */
331 Assert(in->nNodes == 2);
332
333 /*
334 * Nth bit of which variable means that (N - 1)th node should be
335 * visited. Initially all bits are set. Bits of nodes which should be
336 * skipped will be unset.
337 */
338 which = (1 << 1) | (1 << 2);
339 for (i = 0; i < in->nkeys; i++)
340 {
341 StrategyNumber strategy = in->scankeys[i].sk_strategy;
342 bool empty;
343
344 /*
345 * The only strategy when second argument of operator is not range
346 * is RANGESTRAT_CONTAINS_ELEM.
347 */
348 if (strategy != RANGESTRAT_CONTAINS_ELEM)
349 empty = RangeIsEmpty(
350 DatumGetRangeTypeP(in->scankeys[i].sk_argument));
351 else
352 empty = false;
353
354 switch (strategy)
355 {
356 case RANGESTRAT_BEFORE:
357 case RANGESTRAT_OVERLEFT:
358 case RANGESTRAT_OVERLAPS:
359 case RANGESTRAT_OVERRIGHT:
360 case RANGESTRAT_AFTER:
361 case RANGESTRAT_ADJACENT:
362 /* These strategies return false if any argument is empty */
363 if (empty)
364 which = 0;
365 else
366 which &= (1 << 2);
367 break;
368
369 case RANGESTRAT_CONTAINS:
370
371 /*
372 * All ranges contain an empty range. Only non-empty
373 * ranges can contain a non-empty range.
374 */
375 if (!empty)
376 which &= (1 << 2);
377 break;
378
379 case RANGESTRAT_CONTAINED_BY:
380
381 /*
382 * Only an empty range is contained by an empty range.
383 * Both empty and non-empty ranges can be contained by a
384 * non-empty range.
385 */
386 if (empty)
387 which &= (1 << 1);
388 break;
389
390 case RANGESTRAT_CONTAINS_ELEM:
391 which &= (1 << 2);
392 break;
393
394 case RANGESTRAT_EQ:
395 if (empty)
396 which &= (1 << 1);
397 else
398 which &= (1 << 2);
399 break;
400
401 default:
402 elog(ERROR, "unrecognized range strategy: %d", strategy);
403 break;
404 }
405 if (which == 0)
406 break; /* no need to consider remaining conditions */
407 }
408 }
409 else
410 {
411 RangeBound centroidLower,
412 centroidUpper;
413 bool centroidEmpty;
414 TypeCacheEntry *typcache;
415 RangeType *centroid;
416
417 /* This node has a centroid. Fetch it. */
418 centroid = DatumGetRangeTypeP(in->prefixDatum);
419 typcache = range_get_typcache(fcinfo,
420 RangeTypeGetOid(DatumGetRangeTypeP(centroid)));
421 range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
422 &centroidEmpty);
423
424 Assert(in->nNodes == 4 || in->nNodes == 5);
425
426 /*
427 * Nth bit of which variable means that (N - 1)th node (Nth quadrant)
428 * should be visited. Initially all bits are set. Bits of nodes which
429 * can be skipped will be unset.
430 */
431 which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
432
433 for (i = 0; i < in->nkeys; i++)
434 {
435 StrategyNumber strategy;
436 RangeBound lower,
437 upper;
438 bool empty;
439 RangeType *range = NULL;
440
441 RangeType *prevCentroid = NULL;
442 RangeBound prevLower,
443 prevUpper;
444 bool prevEmpty;
445
446 /* Restrictions on range bounds according to scan strategy */
447 RangeBound *minLower = NULL,
448 *maxLower = NULL,
449 *minUpper = NULL,
450 *maxUpper = NULL;
451
452 /* Are the restrictions on range bounds inclusive? */
453 bool inclusive = true;
454 bool strictEmpty = true;
455 int cmp,
456 which1,
457 which2;
458
459 strategy = in->scankeys[i].sk_strategy;
460
461 /*
462 * RANGESTRAT_CONTAINS_ELEM is just like RANGESTRAT_CONTAINS, but
463 * the argument is a single element. Expand the single element to
464 * a range containing only the element, and treat it like
465 * RANGESTRAT_CONTAINS.
466 */
467 if (strategy == RANGESTRAT_CONTAINS_ELEM)
468 {
469 lower.inclusive = true;
470 lower.infinite = false;
471 lower.lower = true;
472 lower.val = in->scankeys[i].sk_argument;
473
474 upper.inclusive = true;
475 upper.infinite = false;
476 upper.lower = false;
477 upper.val = in->scankeys[i].sk_argument;
478
479 empty = false;
480
481 strategy = RANGESTRAT_CONTAINS;
482 }
483 else
484 {
485 range = DatumGetRangeTypeP(in->scankeys[i].sk_argument);
486 range_deserialize(typcache, range, &lower, &upper, &empty);
487 }
488
489 /*
490 * Most strategies are handled by forming a bounding box from the
491 * search key, defined by a minLower, maxLower, minUpper,
492 * maxUpper. Some modify 'which' directly, to specify exactly
493 * which quadrants need to be visited.
494 *
495 * For most strategies, nothing matches an empty search key, and
496 * an empty range never matches a non-empty key. If a strategy
497 * does not behave like that wrt. empty ranges, set strictEmpty to
498 * false.
499 */
500 switch (strategy)
501 {
502 case RANGESTRAT_BEFORE:
503
504 /*
505 * Range A is before range B if upper bound of A is lower
506 * than lower bound of B.
507 */
508 maxUpper = &lower;
509 inclusive = false;
510 break;
511
512 case RANGESTRAT_OVERLEFT:
513
514 /*
515 * Range A is overleft to range B if upper bound of A is
516 * less or equal to upper bound of B.
517 */
518 maxUpper = &upper;
519 break;
520
521 case RANGESTRAT_OVERLAPS:
522
523 /*
524 * Non-empty ranges overlap, if lower bound of each range
525 * is lower or equal to upper bound of the other range.
526 */
527 maxLower = &upper;
528 minUpper = &lower;
529 break;
530
531 case RANGESTRAT_OVERRIGHT:
532
533 /*
534 * Range A is overright to range B if lower bound of A is
535 * greater or equal to lower bound of B.
536 */
537 minLower = &lower;
538 break;
539
540 case RANGESTRAT_AFTER:
541
542 /*
543 * Range A is after range B if lower bound of A is greater
544 * than upper bound of B.
545 */
546 minLower = &upper;
547 inclusive = false;
548 break;
549
550 case RANGESTRAT_ADJACENT:
551 if (empty)
552 break; /* Skip to strictEmpty check. */
553
554 /*
555 * Previously selected quadrant could exclude possibility
556 * for lower or upper bounds to be adjacent. Deserialize
557 * previous centroid range if present for checking this.
558 */
559 if (in->traversalValue)
560 {
561 prevCentroid = DatumGetRangeTypeP(in->traversalValue);
562 range_deserialize(typcache, prevCentroid,
563 &prevLower, &prevUpper, &prevEmpty);
564 }
565
566 /*
567 * For a range's upper bound to be adjacent to the
568 * argument's lower bound, it will be found along the line
569 * adjacent to (and just below) Y=lower. Therefore, if the
570 * argument's lower bound is less than the centroid's
571 * upper bound, the line falls in quadrants 2 and 3; if
572 * greater, the line falls in quadrants 1 and 4. (see
573 * adjacent_cmp_bounds for description of edge cases).
574 */
575 cmp = adjacent_inner_consistent(typcache, &lower,
576 &centroidUpper,
577 prevCentroid ? &prevUpper : NULL);
578 if (cmp > 0)
579 which1 = (1 << 1) | (1 << 4);
580 else if (cmp < 0)
581 which1 = (1 << 2) | (1 << 3);
582 else
583 which1 = 0;
584
585 /*
586 * Also search for ranges's adjacent to argument's upper
587 * bound. They will be found along the line adjacent to
588 * (and just right of) X=upper, which falls in quadrants 3
589 * and 4, or 1 and 2.
590 */
591 cmp = adjacent_inner_consistent(typcache, &upper,
592 &centroidLower,
593 prevCentroid ? &prevLower : NULL);
594 if (cmp > 0)
595 which2 = (1 << 1) | (1 << 2);
596 else if (cmp < 0)
597 which2 = (1 << 3) | (1 << 4);
598 else
599 which2 = 0;
600
601 /* We must chase down ranges adjacent to either bound. */
602 which &= which1 | which2;
603
604 needPrevious = true;
605 break;
606
607 case RANGESTRAT_CONTAINS:
608
609 /*
610 * Non-empty range A contains non-empty range B if lower
611 * bound of A is lower or equal to lower bound of range B
612 * and upper bound of range A is greater or equal to upper
613 * bound of range A.
614 *
615 * All non-empty ranges contain an empty range.
616 */
617 strictEmpty = false;
618 if (!empty)
619 {
620 which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
621 maxLower = &lower;
622 minUpper = &upper;
623 }
624 break;
625
626 case RANGESTRAT_CONTAINED_BY:
627 /* The opposite of contains. */
628 strictEmpty = false;
629 if (empty)
630 {
631 /* An empty range is only contained by an empty range */
632 which &= (1 << 5);
633 }
634 else
635 {
636 minLower = &lower;
637 maxUpper = &upper;
638 }
639 break;
640
641 case RANGESTRAT_EQ:
642
643 /*
644 * Equal range can be only in the same quadrant where
645 * argument would be placed to.
646 */
647 strictEmpty = false;
648 which &= (1 << getQuadrant(typcache, centroid, range));
649 break;
650
651 default:
652 elog(ERROR, "unrecognized range strategy: %d", strategy);
653 break;
654 }
655
656 if (strictEmpty)
657 {
658 if (empty)
659 {
660 /* Scan key is empty, no branches are satisfying */
661 which = 0;
662 break;
663 }
664 else
665 {
666 /* Shouldn't visit tree branch with empty ranges */
667 which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
668 }
669 }
670
671 /*
672 * Using the bounding box, see which quadrants we have to descend
673 * into.
674 */
675 if (minLower)
676 {
677 /*
678 * If the centroid's lower bound is less than or equal to the
679 * minimum lower bound, anything in the 3rd and 4th quadrants
680 * will have an even smaller lower bound, and thus can't
681 * match.
682 */
683 if (range_cmp_bounds(typcache, &centroidLower, minLower) <= 0)
684 which &= (1 << 1) | (1 << 2) | (1 << 5);
685 }
686 if (maxLower)
687 {
688 /*
689 * If the centroid's lower bound is greater than the maximum
690 * lower bound, anything in the 1st and 2nd quadrants will
691 * also have a greater than or equal lower bound, and thus
692 * can't match. If the centroid's lower bound is equal to the
693 * maximum lower bound, we can still exclude the 1st and 2nd
694 * quadrants if we're looking for a value strictly greater
695 * than the maximum.
696 */
697 int cmp;
698
699 cmp = range_cmp_bounds(typcache, &centroidLower, maxLower);
700 if (cmp > 0 || (!inclusive && cmp == 0))
701 which &= (1 << 3) | (1 << 4) | (1 << 5);
702 }
703 if (minUpper)
704 {
705 /*
706 * If the centroid's upper bound is less than or equal to the
707 * minimum upper bound, anything in the 2nd and 3rd quadrants
708 * will have an even smaller upper bound, and thus can't
709 * match.
710 */
711 if (range_cmp_bounds(typcache, &centroidUpper, minUpper) <= 0)
712 which &= (1 << 1) | (1 << 4) | (1 << 5);
713 }
714 if (maxUpper)
715 {
716 /*
717 * If the centroid's upper bound is greater than the maximum
718 * upper bound, anything in the 1st and 4th quadrants will
719 * also have a greater than or equal upper bound, and thus
720 * can't match. If the centroid's upper bound is equal to the
721 * maximum upper bound, we can still exclude the 1st and 4th
722 * quadrants if we're looking for a value strictly greater
723 * than the maximum.
724 */
725 int cmp;
726
727 cmp = range_cmp_bounds(typcache, &centroidUpper, maxUpper);
728 if (cmp > 0 || (!inclusive && cmp == 0))
729 which &= (1 << 2) | (1 << 3) | (1 << 5);
730 }
731
732 if (which == 0)
733 break; /* no need to consider remaining conditions */
734 }
735 }
736
737 /* We must descend into the quadrant(s) identified by 'which' */
738 out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
739 if (needPrevious)
740 out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
741 out->nNodes = 0;
742
743 /*
744 * Elements of traversalValues should be allocated in
745 * traversalMemoryContext
746 */
747 oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext);
748
749 for (i = 1; i <= in->nNodes; i++)
750 {
751 if (which & (1 << i))
752 {
753 /* Save previous prefix if needed */
754 if (needPrevious)
755 {
756 Datum previousCentroid;
757
758 /*
759 * We know, that in->prefixDatum in this place is varlena,
760 * because it's range
761 */
762 previousCentroid = datumCopy(in->prefixDatum, false, -1);
763 out->traversalValues[out->nNodes] = (void *) previousCentroid;
764 }
765 out->nodeNumbers[out->nNodes] = i - 1;
766 out->nNodes++;
767 }
768 }
769
770 MemoryContextSwitchTo(oldCtx);
771
772 PG_RETURN_VOID();
773}
774
775/*
776 * adjacent_cmp_bounds
777 *
778 * Given an argument and centroid bound, this function determines if any
779 * bounds that are adjacent to the argument are smaller than, or greater than
780 * or equal to centroid. For brevity, we call the arg < centroid "left", and
781 * arg >= centroid case "right". This corresponds to how the quadrants are
782 * arranged, if you imagine that "left" is equivalent to "down" and "right"
783 * is equivalent to "up".
784 *
785 * For the "left" case, returns -1, and for the "right" case, returns 1.
786 */
787static int
788adjacent_cmp_bounds(TypeCacheEntry *typcache, RangeBound *arg,
789 RangeBound *centroid)
790{
791 int cmp;
792
793 Assert(arg->lower != centroid->lower);
794
795 cmp = range_cmp_bounds(typcache, arg, centroid);
796
797 if (centroid->lower)
798 {
799 /*------
800 * The argument is an upper bound, we are searching for adjacent lower
801 * bounds. A matching adjacent lower bound must be *larger* than the
802 * argument, but only just.
803 *
804 * The following table illustrates the desired result with a fixed
805 * argument bound, and different centroids. The CMP column shows
806 * the value of 'cmp' variable, and ADJ shows whether the argument
807 * and centroid are adjacent, per bounds_adjacent(). (N) means we
808 * don't need to check for that case, because it's implied by CMP.
809 * With the argument range [..., 500), the adjacent range we're
810 * searching for is [500, ...):
811 *
812 * ARGUMENT CENTROID CMP ADJ
813 * [..., 500) [498, ...) > (N) [500, ...) is to the right
814 * [..., 500) [499, ...) = (N) [500, ...) is to the right
815 * [..., 500) [500, ...) < Y [500, ...) is to the right
816 * [..., 500) [501, ...) < N [500, ...) is to the left
817 *
818 * So, we must search left when the argument is smaller than, and not
819 * adjacent, to the centroid. Otherwise search right.
820 *------
821 */
822 if (cmp < 0 && !bounds_adjacent(typcache, *arg, *centroid))
823 return -1;
824 else
825 return 1;
826 }
827 else
828 {
829 /*------
830 * The argument is a lower bound, we are searching for adjacent upper
831 * bounds. A matching adjacent upper bound must be *smaller* than the
832 * argument, but only just.
833 *
834 * ARGUMENT CENTROID CMP ADJ
835 * [500, ...) [..., 499) > (N) [..., 500) is to the right
836 * [500, ...) [..., 500) > (Y) [..., 500) is to the right
837 * [500, ...) [..., 501) = (N) [..., 500) is to the left
838 * [500, ...) [..., 502) < (N) [..., 500) is to the left
839 *
840 * We must search left when the argument is smaller than or equal to
841 * the centroid. Otherwise search right. We don't need to check
842 * whether the argument is adjacent with the centroid, because it
843 * doesn't matter.
844 *------
845 */
846 if (cmp <= 0)
847 return -1;
848 else
849 return 1;
850 }
851}
852
853/*----------
854 * adjacent_inner_consistent
855 *
856 * Like adjacent_cmp_bounds, but also takes into account the previous
857 * level's centroid. We might've traversed left (or right) at the previous
858 * node, in search for ranges adjacent to the other bound, even though we
859 * already ruled out the possibility for any matches in that direction for
860 * this bound. By comparing the argument with the previous centroid, and
861 * the previous centroid with the current centroid, we can determine which
862 * direction we should've moved in at previous level, and which direction we
863 * actually moved.
864 *
865 * If there can be any matches to the left, returns -1. If to the right,
866 * returns 1. If there can be no matches below this centroid, because we
867 * already ruled them out at the previous level, returns 0.
868 *
869 * XXX: Comparing just the previous and current level isn't foolproof; we
870 * might still search some branches unnecessarily. For example, imagine that
871 * we are searching for value 15, and we traverse the following centroids
872 * (only considering one bound for the moment):
873 *
874 * Level 1: 20
875 * Level 2: 50
876 * Level 3: 25
877 *
878 * At this point, previous centroid is 50, current centroid is 25, and the
879 * target value is to the left. But because we already moved right from
880 * centroid 20 to 50 in the first level, there cannot be any values < 20 in
881 * the current branch. But we don't know that just by looking at the previous
882 * and current centroid, so we traverse left, unnecessarily. The reason we are
883 * down this branch is that we're searching for matches with the *other*
884 * bound. If we kept track of which bound we are searching for explicitly,
885 * instead of deducing that from the previous and current centroid, we could
886 * avoid some unnecessary work.
887 *----------
888 */
889static int
890adjacent_inner_consistent(TypeCacheEntry *typcache, RangeBound *arg,
891 RangeBound *centroid, RangeBound *prev)
892{
893 if (prev)
894 {
895 int prevcmp;
896 int cmp;
897
898 /*
899 * Which direction were we supposed to traverse at previous level,
900 * left or right?
901 */
902 prevcmp = adjacent_cmp_bounds(typcache, arg, prev);
903
904 /* and which direction did we actually go? */
905 cmp = range_cmp_bounds(typcache, centroid, prev);
906
907 /* if the two don't agree, there's nothing to see here */
908 if ((prevcmp < 0 && cmp >= 0) || (prevcmp > 0 && cmp < 0))
909 return 0;
910 }
911
912 return adjacent_cmp_bounds(typcache, arg, centroid);
913}
914
915/*
916 * SP-GiST consistent function for leaf nodes: check leaf value against query
917 * using corresponding function.
918 */
919Datum
920spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
921{
922 spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
923 spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
924 RangeType *leafRange = DatumGetRangeTypeP(in->leafDatum);
925 TypeCacheEntry *typcache;
926 bool res;
927 int i;
928
929 /* all tests are exact */
930 out->recheck = false;
931
932 /* leafDatum is what it is... */
933 out->leafValue = in->leafDatum;
934
935 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(leafRange));
936
937 /* Perform the required comparison(s) */
938 res = true;
939 for (i = 0; i < in->nkeys; i++)
940 {
941 Datum keyDatum = in->scankeys[i].sk_argument;
942
943 /* Call the function corresponding to the scan strategy */
944 switch (in->scankeys[i].sk_strategy)
945 {
946 case RANGESTRAT_BEFORE:
947 res = range_before_internal(typcache, leafRange,
948 DatumGetRangeTypeP(keyDatum));
949 break;
950 case RANGESTRAT_OVERLEFT:
951 res = range_overleft_internal(typcache, leafRange,
952 DatumGetRangeTypeP(keyDatum));
953 break;
954 case RANGESTRAT_OVERLAPS:
955 res = range_overlaps_internal(typcache, leafRange,
956 DatumGetRangeTypeP(keyDatum));
957 break;
958 case RANGESTRAT_OVERRIGHT:
959 res = range_overright_internal(typcache, leafRange,
960 DatumGetRangeTypeP(keyDatum));
961 break;
962 case RANGESTRAT_AFTER:
963 res = range_after_internal(typcache, leafRange,
964 DatumGetRangeTypeP(keyDatum));
965 break;
966 case RANGESTRAT_ADJACENT:
967 res = range_adjacent_internal(typcache, leafRange,
968 DatumGetRangeTypeP(keyDatum));
969 break;
970 case RANGESTRAT_CONTAINS:
971 res = range_contains_internal(typcache, leafRange,
972 DatumGetRangeTypeP(keyDatum));
973 break;
974 case RANGESTRAT_CONTAINED_BY:
975 res = range_contained_by_internal(typcache, leafRange,
976 DatumGetRangeTypeP(keyDatum));
977 break;
978 case RANGESTRAT_CONTAINS_ELEM:
979 res = range_contains_elem_internal(typcache, leafRange,
980 keyDatum);
981 break;
982 case RANGESTRAT_EQ:
983 res = range_eq_internal(typcache, leafRange,
984 DatumGetRangeTypeP(keyDatum));
985 break;
986 default:
987 elog(ERROR, "unrecognized range strategy: %d",
988 in->scankeys[i].sk_strategy);
989 break;
990 }
991
992 /*
993 * If leaf datum doesn't match to a query key, no need to check
994 * subsequent keys.
995 */
996 if (!res)
997 break;
998 }
999
1000 PG_RETURN_BOOL(res);
1001}
1002