1/*-------------------------------------------------------------------------
2 *
3 * rangetypes_gist.c
4 * GiST support for range types.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/utils/adt/rangetypes_gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/gist.h"
18#include "access/stratnum.h"
19#include "utils/float.h"
20#include "utils/fmgrprotos.h"
21#include "utils/datum.h"
22#include "utils/rangetypes.h"
23
24
25/*
26 * Range class properties used to segregate different classes of ranges in
27 * GiST. Each unique combination of properties is a class. CLS_EMPTY cannot
28 * be combined with anything else.
29 */
30#define CLS_NORMAL 0 /* Ordinary finite range (no bits set) */
31#define CLS_LOWER_INF 1 /* Lower bound is infinity */
32#define CLS_UPPER_INF 2 /* Upper bound is infinity */
33#define CLS_CONTAIN_EMPTY 4 /* Contains underlying empty ranges */
34#define CLS_EMPTY 8 /* Special class for empty ranges */
35
36#define CLS_COUNT 9 /* # of classes; includes all combinations of
37 * properties. CLS_EMPTY doesn't combine with
38 * anything else, so it's only 2^3 + 1. */
39
40/*
41 * Minimum accepted ratio of split for items of the same class. If the items
42 * are of different classes, we will separate along those lines regardless of
43 * the ratio.
44 */
45#define LIMIT_RATIO 0.3
46
47/* Constants for fixed penalty values */
48#define INFINITE_BOUND_PENALTY 2.0
49#define CONTAIN_EMPTY_PENALTY 1.0
50#define DEFAULT_SUBTYPE_DIFF_PENALTY 1.0
51
52/*
53 * Per-item data for range_gist_single_sorting_split.
54 */
55typedef struct
56{
57 int index;
58 RangeBound bound;
59} SingleBoundSortItem;
60
61/* place on left or right side of split? */
62typedef enum
63{
64 SPLIT_LEFT = 0, /* makes initialization to SPLIT_LEFT easier */
65 SPLIT_RIGHT
66} SplitLR;
67
68/*
69 * Context for range_gist_consider_split.
70 */
71typedef struct
72{
73 TypeCacheEntry *typcache; /* typcache for range type */
74 bool has_subtype_diff; /* does it have subtype_diff? */
75 int entries_count; /* total number of entries being split */
76
77 /* Information about currently selected split follows */
78
79 bool first; /* true if no split was selected yet */
80
81 RangeBound *left_upper; /* upper bound of left interval */
82 RangeBound *right_lower; /* lower bound of right interval */
83
84 float4 ratio; /* split ratio */
85 float4 overlap; /* overlap between left and right predicate */
86 int common_left; /* # common entries destined for each side */
87 int common_right;
88} ConsiderSplitContext;
89
90/*
91 * Bounds extracted from a non-empty range, for use in
92 * range_gist_double_sorting_split.
93 */
94typedef struct
95{
96 RangeBound lower;
97 RangeBound upper;
98} NonEmptyRange;
99
100/*
101 * Represents information about an entry that can be placed in either group
102 * without affecting overlap over selected axis ("common entry").
103 */
104typedef struct
105{
106 /* Index of entry in the initial array */
107 int index;
108 /* Delta between closeness of range to each of the two groups */
109 double delta;
110} CommonEntry;
111
112/* Helper macros to place an entry in the left or right group during split */
113/* Note direct access to variables v, typcache, left_range, right_range */
114#define PLACE_LEFT(range, off) \
115 do { \
116 if (v->spl_nleft > 0) \
117 left_range = range_super_union(typcache, left_range, range); \
118 else \
119 left_range = (range); \
120 v->spl_left[v->spl_nleft++] = (off); \
121 } while(0)
122
123#define PLACE_RIGHT(range, off) \
124 do { \
125 if (v->spl_nright > 0) \
126 right_range = range_super_union(typcache, right_range, range); \
127 else \
128 right_range = (range); \
129 v->spl_right[v->spl_nright++] = (off); \
130 } while(0)
131
132/* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
133#define rangeCopy(r) \
134 ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
135 false, -1)))
136
137static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1,
138 RangeType *r2);
139static bool range_gist_consistent_int(TypeCacheEntry *typcache,
140 StrategyNumber strategy, RangeType *key,
141 Datum query);
142static bool range_gist_consistent_leaf(TypeCacheEntry *typcache,
143 StrategyNumber strategy, RangeType *key,
144 Datum query);
145static void range_gist_fallback_split(TypeCacheEntry *typcache,
146 GistEntryVector *entryvec,
147 GIST_SPLITVEC *v);
148static void range_gist_class_split(TypeCacheEntry *typcache,
149 GistEntryVector *entryvec,
150 GIST_SPLITVEC *v,
151 SplitLR *classes_groups);
152static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
153 GistEntryVector *entryvec,
154 GIST_SPLITVEC *v,
155 bool use_upper_bound);
156static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
157 GistEntryVector *entryvec,
158 GIST_SPLITVEC *v);
159static void range_gist_consider_split(ConsiderSplitContext *context,
160 RangeBound *right_lower, int min_left_count,
161 RangeBound *left_upper, int max_left_count);
162static int get_gist_range_class(RangeType *range);
163static int single_bound_cmp(const void *a, const void *b, void *arg);
164static int interval_cmp_lower(const void *a, const void *b, void *arg);
165static int interval_cmp_upper(const void *a, const void *b, void *arg);
166static int common_entry_cmp(const void *i1, const void *i2);
167static float8 call_subtype_diff(TypeCacheEntry *typcache,
168 Datum val1, Datum val2);
169
170
171/* GiST query consistency check */
172Datum
173range_gist_consistent(PG_FUNCTION_ARGS)
174{
175 GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
176 Datum query = PG_GETARG_DATUM(1);
177 StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
178
179 /* Oid subtype = PG_GETARG_OID(3); */
180 bool *recheck = (bool *) PG_GETARG_POINTER(4);
181 RangeType *key = DatumGetRangeTypeP(entry->key);
182 TypeCacheEntry *typcache;
183
184 /* All operators served by this function are exact */
185 *recheck = false;
186
187 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
188
189 if (GIST_LEAF(entry))
190 PG_RETURN_BOOL(range_gist_consistent_leaf(typcache, strategy,
191 key, query));
192 else
193 PG_RETURN_BOOL(range_gist_consistent_int(typcache, strategy,
194 key, query));
195}
196
197/* form union range */
198Datum
199range_gist_union(PG_FUNCTION_ARGS)
200{
201 GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
202 GISTENTRY *ent = entryvec->vector;
203 RangeType *result_range;
204 TypeCacheEntry *typcache;
205 int i;
206
207 result_range = DatumGetRangeTypeP(ent[0].key);
208
209 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
210
211 for (i = 1; i < entryvec->n; i++)
212 {
213 result_range = range_super_union(typcache, result_range,
214 DatumGetRangeTypeP(ent[i].key));
215 }
216
217 PG_RETURN_RANGE_P(result_range);
218}
219
220/*
221 * We store ranges as ranges in GiST indexes, so we do not need
222 * compress, decompress, or fetch functions. Note this implies a limit
223 * on the size of range values that can be indexed.
224 */
225
226/*
227 * GiST page split penalty function.
228 *
229 * The penalty function has the following goals (in order from most to least
230 * important):
231 * - Keep normal ranges separate
232 * - Avoid broadening the class of the original predicate
233 * - Avoid broadening (as determined by subtype_diff) the original predicate
234 * - Favor adding ranges to narrower original predicates
235 */
236Datum
237range_gist_penalty(PG_FUNCTION_ARGS)
238{
239 GISTENTRY *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
240 GISTENTRY *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
241 float *penalty = (float *) PG_GETARG_POINTER(2);
242 RangeType *orig = DatumGetRangeTypeP(origentry->key);
243 RangeType *new = DatumGetRangeTypeP(newentry->key);
244 TypeCacheEntry *typcache;
245 bool has_subtype_diff;
246 RangeBound orig_lower,
247 new_lower,
248 orig_upper,
249 new_upper;
250 bool orig_empty,
251 new_empty;
252
253 if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
254 elog(ERROR, "range types do not match");
255
256 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
257
258 has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
259
260 range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
261 range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
262
263 /*
264 * Distinct branches for handling distinct classes of ranges. Note that
265 * penalty values only need to be commensurate within the same class of
266 * new range.
267 */
268 if (new_empty)
269 {
270 /* Handle insertion of empty range */
271 if (orig_empty)
272 {
273 /*
274 * The best case is to insert it to empty original range.
275 * Insertion here means no broadening of original range. Also
276 * original range is the most narrow.
277 */
278 *penalty = 0.0;
279 }
280 else if (RangeIsOrContainsEmpty(orig))
281 {
282 /*
283 * The second case is to insert empty range into range which
284 * contains at least one underlying empty range. There is still
285 * no broadening of original range, but original range is not as
286 * narrow as possible.
287 */
288 *penalty = CONTAIN_EMPTY_PENALTY;
289 }
290 else if (orig_lower.infinite && orig_upper.infinite)
291 {
292 /*
293 * Original range requires broadening. (-inf; +inf) is most far
294 * from normal range in this case.
295 */
296 *penalty = 2 * CONTAIN_EMPTY_PENALTY;
297 }
298 else if (orig_lower.infinite || orig_upper.infinite)
299 {
300 /*
301 * (-inf, x) or (x, +inf) original ranges are closer to normal
302 * ranges, so it's worse to mix it with empty ranges.
303 */
304 *penalty = 3 * CONTAIN_EMPTY_PENALTY;
305 }
306 else
307 {
308 /*
309 * The least preferred case is broadening of normal range.
310 */
311 *penalty = 4 * CONTAIN_EMPTY_PENALTY;
312 }
313 }
314 else if (new_lower.infinite && new_upper.infinite)
315 {
316 /* Handle insertion of (-inf, +inf) range */
317 if (orig_lower.infinite && orig_upper.infinite)
318 {
319 /*
320 * Best case is inserting to (-inf, +inf) original range.
321 */
322 *penalty = 0.0;
323 }
324 else if (orig_lower.infinite || orig_upper.infinite)
325 {
326 /*
327 * When original range is (-inf, x) or (x, +inf) it requires
328 * broadening of original range (extension of one bound to
329 * infinity).
330 */
331 *penalty = INFINITE_BOUND_PENALTY;
332 }
333 else
334 {
335 /*
336 * Insertion to normal original range is least preferred.
337 */
338 *penalty = 2 * INFINITE_BOUND_PENALTY;
339 }
340
341 if (RangeIsOrContainsEmpty(orig))
342 {
343 /*
344 * Original range is narrower when it doesn't contain empty
345 * ranges. Add additional penalty otherwise.
346 */
347 *penalty += CONTAIN_EMPTY_PENALTY;
348 }
349 }
350 else if (new_lower.infinite)
351 {
352 /* Handle insertion of (-inf, x) range */
353 if (!orig_empty && orig_lower.infinite)
354 {
355 if (orig_upper.infinite)
356 {
357 /*
358 * (-inf, +inf) range won't be extended by insertion of (-inf,
359 * x) range. It's a less desirable case than insertion to
360 * (-inf, y) original range without extension, because in that
361 * case original range is narrower. But we can't express that
362 * in single float value.
363 */
364 *penalty = 0.0;
365 }
366 else
367 {
368 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
369 {
370 /*
371 * Get extension of original range using subtype_diff. Use
372 * constant if subtype_diff unavailable.
373 */
374 if (has_subtype_diff)
375 *penalty = call_subtype_diff(typcache,
376 new_upper.val,
377 orig_upper.val);
378 else
379 *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
380 }
381 else
382 {
383 /* No extension of original range */
384 *penalty = 0.0;
385 }
386 }
387 }
388 else
389 {
390 /*
391 * If lower bound of original range is not -inf, then extension of
392 * it is infinity.
393 */
394 *penalty = get_float4_infinity();
395 }
396 }
397 else if (new_upper.infinite)
398 {
399 /* Handle insertion of (x, +inf) range */
400 if (!orig_empty && orig_upper.infinite)
401 {
402 if (orig_lower.infinite)
403 {
404 /*
405 * (-inf, +inf) range won't be extended by insertion of (x,
406 * +inf) range. It's a less desirable case than insertion to
407 * (y, +inf) original range without extension, because in that
408 * case original range is narrower. But we can't express that
409 * in single float value.
410 */
411 *penalty = 0.0;
412 }
413 else
414 {
415 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
416 {
417 /*
418 * Get extension of original range using subtype_diff. Use
419 * constant if subtype_diff unavailable.
420 */
421 if (has_subtype_diff)
422 *penalty = call_subtype_diff(typcache,
423 orig_lower.val,
424 new_lower.val);
425 else
426 *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
427 }
428 else
429 {
430 /* No extension of original range */
431 *penalty = 0.0;
432 }
433 }
434 }
435 else
436 {
437 /*
438 * If upper bound of original range is not +inf, then extension of
439 * it is infinity.
440 */
441 *penalty = get_float4_infinity();
442 }
443 }
444 else
445 {
446 /* Handle insertion of normal (non-empty, non-infinite) range */
447 if (orig_empty || orig_lower.infinite || orig_upper.infinite)
448 {
449 /*
450 * Avoid mixing normal ranges with infinite and empty ranges.
451 */
452 *penalty = get_float4_infinity();
453 }
454 else
455 {
456 /*
457 * Calculate extension of original range by calling subtype_diff.
458 * Use constant if subtype_diff unavailable.
459 */
460 float8 diff = 0.0;
461
462 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
463 {
464 if (has_subtype_diff)
465 diff += call_subtype_diff(typcache,
466 orig_lower.val,
467 new_lower.val);
468 else
469 diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
470 }
471 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
472 {
473 if (has_subtype_diff)
474 diff += call_subtype_diff(typcache,
475 new_upper.val,
476 orig_upper.val);
477 else
478 diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
479 }
480 *penalty = diff;
481 }
482 }
483
484 PG_RETURN_POINTER(penalty);
485}
486
487/*
488 * The GiST PickSplit method for ranges
489 *
490 * Primarily, we try to segregate ranges of different classes. If splitting
491 * ranges of the same class, use the appropriate split method for that class.
492 */
493Datum
494range_gist_picksplit(PG_FUNCTION_ARGS)
495{
496 GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
497 GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
498 TypeCacheEntry *typcache;
499 OffsetNumber i;
500 RangeType *pred_left;
501 int nbytes;
502 OffsetNumber maxoff;
503 int count_in_classes[CLS_COUNT];
504 int j;
505 int non_empty_classes_count = 0;
506 int biggest_class = -1;
507 int biggest_class_count = 0;
508 int total_count;
509
510 /* use first item to look up range type's info */
511 pred_left = DatumGetRangeTypeP(entryvec->vector[FirstOffsetNumber].key);
512 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
513
514 maxoff = entryvec->n - 1;
515 nbytes = (maxoff + 1) * sizeof(OffsetNumber);
516 v->spl_left = (OffsetNumber *) palloc(nbytes);
517 v->spl_right = (OffsetNumber *) palloc(nbytes);
518
519 /*
520 * Get count distribution of range classes.
521 */
522 memset(count_in_classes, 0, sizeof(count_in_classes));
523 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
524 {
525 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key);
526
527 count_in_classes[get_gist_range_class(range)]++;
528 }
529
530 /*
531 * Count non-empty classes and find biggest class.
532 */
533 total_count = maxoff;
534 for (j = 0; j < CLS_COUNT; j++)
535 {
536 if (count_in_classes[j] > 0)
537 {
538 if (count_in_classes[j] > biggest_class_count)
539 {
540 biggest_class_count = count_in_classes[j];
541 biggest_class = j;
542 }
543 non_empty_classes_count++;
544 }
545 }
546
547 Assert(non_empty_classes_count > 0);
548
549 if (non_empty_classes_count == 1)
550 {
551 /* One non-empty class, so split inside class */
552 if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
553 {
554 /* double sorting split for normal ranges */
555 range_gist_double_sorting_split(typcache, entryvec, v);
556 }
557 else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
558 {
559 /* upper bound sorting split for (-inf, x) ranges */
560 range_gist_single_sorting_split(typcache, entryvec, v, true);
561 }
562 else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
563 {
564 /* lower bound sorting split for (x, +inf) ranges */
565 range_gist_single_sorting_split(typcache, entryvec, v, false);
566 }
567 else
568 {
569 /* trivial split for all (-inf, +inf) or all empty ranges */
570 range_gist_fallback_split(typcache, entryvec, v);
571 }
572 }
573 else
574 {
575 /*
576 * Class based split.
577 *
578 * To which side of the split should each class go? Initialize them
579 * all to go to the left side.
580 */
581 SplitLR classes_groups[CLS_COUNT];
582
583 memset(classes_groups, 0, sizeof(classes_groups));
584
585 if (count_in_classes[CLS_NORMAL] > 0)
586 {
587 /* separate normal ranges if any */
588 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
589 }
590 else
591 {
592 /*----------
593 * Try to split classes in one of two ways:
594 * 1) containing infinities - not containing infinities
595 * 2) containing empty - not containing empty
596 *
597 * Select the way which balances the ranges between left and right
598 * the best. If split in these ways is not possible, there are at
599 * most 3 classes, so just separate biggest class.
600 *----------
601 */
602 int infCount,
603 nonInfCount;
604 int emptyCount,
605 nonEmptyCount;
606
607 nonInfCount =
608 count_in_classes[CLS_NORMAL] +
609 count_in_classes[CLS_CONTAIN_EMPTY] +
610 count_in_classes[CLS_EMPTY];
611 infCount = total_count - nonInfCount;
612
613 nonEmptyCount =
614 count_in_classes[CLS_NORMAL] +
615 count_in_classes[CLS_LOWER_INF] +
616 count_in_classes[CLS_UPPER_INF] +
617 count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
618 emptyCount = total_count - nonEmptyCount;
619
620 if (infCount > 0 && nonInfCount > 0 &&
621 (Abs(infCount - nonInfCount) <=
622 Abs(emptyCount - nonEmptyCount)))
623 {
624 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
625 classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
626 classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
627 }
628 else if (emptyCount > 0 && nonEmptyCount > 0)
629 {
630 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
631 classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
632 classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
633 classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
634 }
635 else
636 {
637 /*
638 * Either total_count == emptyCount or total_count ==
639 * infCount.
640 */
641 classes_groups[biggest_class] = SPLIT_RIGHT;
642 }
643 }
644
645 range_gist_class_split(typcache, entryvec, v, classes_groups);
646 }
647
648 PG_RETURN_POINTER(v);
649}
650
651/* equality comparator for GiST */
652Datum
653range_gist_same(PG_FUNCTION_ARGS)
654{
655 RangeType *r1 = PG_GETARG_RANGE_P(0);
656 RangeType *r2 = PG_GETARG_RANGE_P(1);
657 bool *result = (bool *) PG_GETARG_POINTER(2);
658
659 /*
660 * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check
661 * that for ourselves. More generally, if the entries have been properly
662 * normalized, then unequal flags bytes must mean unequal ranges ... so
663 * let's just test all the flag bits at once.
664 */
665 if (range_get_flags(r1) != range_get_flags(r2))
666 *result = false;
667 else
668 {
669 TypeCacheEntry *typcache;
670
671 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
672
673 *result = range_eq_internal(typcache, r1, r2);
674 }
675
676 PG_RETURN_POINTER(result);
677}
678
679/*
680 *----------------------------------------------------------
681 * STATIC FUNCTIONS
682 *----------------------------------------------------------
683 */
684
685/*
686 * Return the smallest range that contains r1 and r2
687 *
688 * This differs from regular range_union in two critical ways:
689 * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb
690 * the intervening values into the result range.
691 * 2. We track whether any empty range has been union'd into the result,
692 * so that contained_by searches can be indexed. Note that this means
693 * that *all* unions formed within the GiST index must go through here.
694 */
695static RangeType *
696range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
697{
698 RangeType *result;
699 RangeBound lower1,
700 lower2;
701 RangeBound upper1,
702 upper2;
703 bool empty1,
704 empty2;
705 char flags1,
706 flags2;
707 RangeBound *result_lower;
708 RangeBound *result_upper;
709
710 range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
711 range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
712 flags1 = range_get_flags(r1);
713 flags2 = range_get_flags(r2);
714
715 if (empty1)
716 {
717 /* We can return r2 as-is if it already is or contains empty */
718 if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
719 return r2;
720 /* Else we'd better copy it (modify-in-place isn't safe) */
721 r2 = rangeCopy(r2);
722 range_set_contain_empty(r2);
723 return r2;
724 }
725 if (empty2)
726 {
727 /* We can return r1 as-is if it already is or contains empty */
728 if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
729 return r1;
730 /* Else we'd better copy it (modify-in-place isn't safe) */
731 r1 = rangeCopy(r1);
732 range_set_contain_empty(r1);
733 return r1;
734 }
735
736 if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
737 result_lower = &lower1;
738 else
739 result_lower = &lower2;
740
741 if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
742 result_upper = &upper1;
743 else
744 result_upper = &upper2;
745
746 /* optimization to avoid constructing a new range */
747 if (result_lower == &lower1 && result_upper == &upper1 &&
748 ((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY)))
749 return r1;
750 if (result_lower == &lower2 && result_upper == &upper2 &&
751 ((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY)))
752 return r2;
753
754 result = make_range(typcache, result_lower, result_upper, false);
755
756 if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY))
757 range_set_contain_empty(result);
758
759 return result;
760}
761
762/*
763 * GiST consistent test on an index internal page
764 */
765static bool
766range_gist_consistent_int(TypeCacheEntry *typcache, StrategyNumber strategy,
767 RangeType *key, Datum query)
768{
769 switch (strategy)
770 {
771 case RANGESTRAT_BEFORE:
772 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
773 return false;
774 return (!range_overright_internal(typcache, key,
775 DatumGetRangeTypeP(query)));
776 case RANGESTRAT_OVERLEFT:
777 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
778 return false;
779 return (!range_after_internal(typcache, key,
780 DatumGetRangeTypeP(query)));
781 case RANGESTRAT_OVERLAPS:
782 return range_overlaps_internal(typcache, key,
783 DatumGetRangeTypeP(query));
784 case RANGESTRAT_OVERRIGHT:
785 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
786 return false;
787 return (!range_before_internal(typcache, key,
788 DatumGetRangeTypeP(query)));
789 case RANGESTRAT_AFTER:
790 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
791 return false;
792 return (!range_overleft_internal(typcache, key,
793 DatumGetRangeTypeP(query)));
794 case RANGESTRAT_ADJACENT:
795 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
796 return false;
797 if (range_adjacent_internal(typcache, key,
798 DatumGetRangeTypeP(query)))
799 return true;
800 return range_overlaps_internal(typcache, key,
801 DatumGetRangeTypeP(query));
802 case RANGESTRAT_CONTAINS:
803 return range_contains_internal(typcache, key,
804 DatumGetRangeTypeP(query));
805 case RANGESTRAT_CONTAINED_BY:
806
807 /*
808 * Empty ranges are contained by anything, so if key is or
809 * contains any empty ranges, we must descend into it. Otherwise,
810 * descend only if key overlaps the query.
811 */
812 if (RangeIsOrContainsEmpty(key))
813 return true;
814 return range_overlaps_internal(typcache, key,
815 DatumGetRangeTypeP(query));
816 case RANGESTRAT_CONTAINS_ELEM:
817 return range_contains_elem_internal(typcache, key, query);
818 case RANGESTRAT_EQ:
819
820 /*
821 * If query is empty, descend only if the key is or contains any
822 * empty ranges. Otherwise, descend if key contains query.
823 */
824 if (RangeIsEmpty(DatumGetRangeTypeP(query)))
825 return RangeIsOrContainsEmpty(key);
826 return range_contains_internal(typcache, key,
827 DatumGetRangeTypeP(query));
828 default:
829 elog(ERROR, "unrecognized range strategy: %d", strategy);
830 return false; /* keep compiler quiet */
831 }
832}
833
834/*
835 * GiST consistent test on an index leaf page
836 */
837static bool
838range_gist_consistent_leaf(TypeCacheEntry *typcache, StrategyNumber strategy,
839 RangeType *key, Datum query)
840{
841 switch (strategy)
842 {
843 case RANGESTRAT_BEFORE:
844 return range_before_internal(typcache, key,
845 DatumGetRangeTypeP(query));
846 case RANGESTRAT_OVERLEFT:
847 return range_overleft_internal(typcache, key,
848 DatumGetRangeTypeP(query));
849 case RANGESTRAT_OVERLAPS:
850 return range_overlaps_internal(typcache, key,
851 DatumGetRangeTypeP(query));
852 case RANGESTRAT_OVERRIGHT:
853 return range_overright_internal(typcache, key,
854 DatumGetRangeTypeP(query));
855 case RANGESTRAT_AFTER:
856 return range_after_internal(typcache, key,
857 DatumGetRangeTypeP(query));
858 case RANGESTRAT_ADJACENT:
859 return range_adjacent_internal(typcache, key,
860 DatumGetRangeTypeP(query));
861 case RANGESTRAT_CONTAINS:
862 return range_contains_internal(typcache, key,
863 DatumGetRangeTypeP(query));
864 case RANGESTRAT_CONTAINED_BY:
865 return range_contained_by_internal(typcache, key,
866 DatumGetRangeTypeP(query));
867 case RANGESTRAT_CONTAINS_ELEM:
868 return range_contains_elem_internal(typcache, key, query);
869 case RANGESTRAT_EQ:
870 return range_eq_internal(typcache, key, DatumGetRangeTypeP(query));
871 default:
872 elog(ERROR, "unrecognized range strategy: %d", strategy);
873 return false; /* keep compiler quiet */
874 }
875}
876
877/*
878 * Trivial split: half of entries will be placed on one page
879 * and the other half on the other page.
880 */
881static void
882range_gist_fallback_split(TypeCacheEntry *typcache,
883 GistEntryVector *entryvec,
884 GIST_SPLITVEC *v)
885{
886 RangeType *left_range = NULL;
887 RangeType *right_range = NULL;
888 OffsetNumber i,
889 maxoff,
890 split_idx;
891
892 maxoff = entryvec->n - 1;
893 /* Split entries before this to left page, after to right: */
894 split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
895
896 v->spl_nleft = 0;
897 v->spl_nright = 0;
898 for (i = FirstOffsetNumber; i <= maxoff; i++)
899 {
900 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key);
901
902 if (i < split_idx)
903 PLACE_LEFT(range, i);
904 else
905 PLACE_RIGHT(range, i);
906 }
907
908 v->spl_ldatum = RangeTypePGetDatum(left_range);
909 v->spl_rdatum = RangeTypePGetDatum(right_range);
910}
911
912/*
913 * Split based on classes of ranges.
914 *
915 * See get_gist_range_class for class definitions.
916 * classes_groups is an array of length CLS_COUNT indicating the side of the
917 * split to which each class should go.
918 */
919static void
920range_gist_class_split(TypeCacheEntry *typcache,
921 GistEntryVector *entryvec,
922 GIST_SPLITVEC *v,
923 SplitLR *classes_groups)
924{
925 RangeType *left_range = NULL;
926 RangeType *right_range = NULL;
927 OffsetNumber i,
928 maxoff;
929
930 maxoff = entryvec->n - 1;
931
932 v->spl_nleft = 0;
933 v->spl_nright = 0;
934 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
935 {
936 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key);
937 int class;
938
939 /* Get class of range */
940 class = get_gist_range_class(range);
941
942 /* Place range to appropriate page */
943 if (classes_groups[class] == SPLIT_LEFT)
944 PLACE_LEFT(range, i);
945 else
946 {
947 Assert(classes_groups[class] == SPLIT_RIGHT);
948 PLACE_RIGHT(range, i);
949 }
950 }
951
952 v->spl_ldatum = RangeTypePGetDatum(left_range);
953 v->spl_rdatum = RangeTypePGetDatum(right_range);
954}
955
956/*
957 * Sorting based split. First half of entries according to the sort will be
958 * placed to one page, and second half of entries will be placed to other
959 * page. use_upper_bound parameter indicates whether to use upper or lower
960 * bound for sorting.
961 */
962static void
963range_gist_single_sorting_split(TypeCacheEntry *typcache,
964 GistEntryVector *entryvec,
965 GIST_SPLITVEC *v,
966 bool use_upper_bound)
967{
968 SingleBoundSortItem *sortItems;
969 RangeType *left_range = NULL;
970 RangeType *right_range = NULL;
971 OffsetNumber i,
972 maxoff,
973 split_idx;
974
975 maxoff = entryvec->n - 1;
976
977 sortItems = (SingleBoundSortItem *)
978 palloc(maxoff * sizeof(SingleBoundSortItem));
979
980 /*
981 * Prepare auxiliary array and sort the values.
982 */
983 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
984 {
985 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key);
986 RangeBound bound2;
987 bool empty;
988
989 sortItems[i - 1].index = i;
990 /* Put appropriate bound into array */
991 if (use_upper_bound)
992 range_deserialize(typcache, range, &bound2,
993 &sortItems[i - 1].bound, &empty);
994 else
995 range_deserialize(typcache, range, &sortItems[i - 1].bound,
996 &bound2, &empty);
997 Assert(!empty);
998 }
999
1000 qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
1001 single_bound_cmp, typcache);
1002
1003 split_idx = maxoff / 2;
1004
1005 v->spl_nleft = 0;
1006 v->spl_nright = 0;
1007
1008 for (i = 0; i < maxoff; i++)
1009 {
1010 int idx = sortItems[i].index;
1011 RangeType *range = DatumGetRangeTypeP(entryvec->vector[idx].key);
1012
1013 if (i < split_idx)
1014 PLACE_LEFT(range, idx);
1015 else
1016 PLACE_RIGHT(range, idx);
1017 }
1018
1019 v->spl_ldatum = RangeTypePGetDatum(left_range);
1020 v->spl_rdatum = RangeTypePGetDatum(right_range);
1021}
1022
1023/*
1024 * Double sorting split algorithm.
1025 *
1026 * The algorithm considers dividing ranges into two groups. The first (left)
1027 * group contains general left bound. The second (right) group contains
1028 * general right bound. The challenge is to find upper bound of left group
1029 * and lower bound of right group so that overlap of groups is minimal and
1030 * ratio of distribution is acceptable. Algorithm finds for each lower bound of
1031 * right group minimal upper bound of left group, and for each upper bound of
1032 * left group maximal lower bound of right group. For each found pair
1033 * range_gist_consider_split considers replacement of currently selected
1034 * split with the new one.
1035 *
1036 * After that, all the entries are divided into three groups:
1037 * 1) Entries which should be placed to the left group
1038 * 2) Entries which should be placed to the right group
1039 * 3) "Common entries" which can be placed to either group without affecting
1040 * amount of overlap.
1041 *
1042 * The common ranges are distributed by difference of distance from lower
1043 * bound of common range to lower bound of right group and distance from upper
1044 * bound of common range to upper bound of left group.
1045 *
1046 * For details see:
1047 * "A new double sorting-based node splitting algorithm for R-tree",
1048 * A. Korotkov
1049 * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
1050 */
1051static void
1052range_gist_double_sorting_split(TypeCacheEntry *typcache,
1053 GistEntryVector *entryvec,
1054 GIST_SPLITVEC *v)
1055{
1056 ConsiderSplitContext context;
1057 OffsetNumber i,
1058 maxoff;
1059 RangeType *range,
1060 *left_range = NULL,
1061 *right_range = NULL;
1062 int common_entries_count;
1063 NonEmptyRange *by_lower,
1064 *by_upper;
1065 CommonEntry *common_entries;
1066 int nentries,
1067 i1,
1068 i2;
1069 RangeBound *right_lower,
1070 *left_upper;
1071
1072 memset(&context, 0, sizeof(ConsiderSplitContext));
1073 context.typcache = typcache;
1074 context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
1075
1076 maxoff = entryvec->n - 1;
1077 nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
1078 context.first = true;
1079
1080 /* Allocate arrays for sorted range bounds */
1081 by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1082 by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1083
1084 /* Fill arrays of bounds */
1085 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1086 {
1087 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key);
1088 bool empty;
1089
1090 range_deserialize(typcache, range,
1091 &by_lower[i - FirstOffsetNumber].lower,
1092 &by_lower[i - FirstOffsetNumber].upper,
1093 &empty);
1094 Assert(!empty);
1095 }
1096
1097 /*
1098 * Make two arrays of range bounds: one sorted by lower bound and another
1099 * sorted by upper bound.
1100 */
1101 memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
1102 qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
1103 interval_cmp_lower, typcache);
1104 qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
1105 interval_cmp_upper, typcache);
1106
1107 /*----------
1108 * The goal is to form a left and right range, so that every entry
1109 * range is contained by either left or right interval (or both).
1110 *
1111 * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
1112 *
1113 * 0 1 2 3 4
1114 * +-+
1115 * +---+
1116 * +-+
1117 * +---+
1118 *
1119 * The left and right ranges are of the form (0,a) and (b,4).
1120 * We first consider splits where b is the lower bound of an entry.
1121 * We iterate through all entries, and for each b, calculate the
1122 * smallest possible a. Then we consider splits where a is the
1123 * upper bound of an entry, and for each a, calculate the greatest
1124 * possible b.
1125 *
1126 * In the above example, the first loop would consider splits:
1127 * b=0: (0,1)-(0,4)
1128 * b=1: (0,1)-(1,4)
1129 * b=2: (0,3)-(2,4)
1130 *
1131 * And the second loop:
1132 * a=1: (0,1)-(1,4)
1133 * a=3: (0,3)-(2,4)
1134 * a=4: (0,4)-(2,4)
1135 *----------
1136 */
1137
1138 /*
1139 * Iterate over lower bound of right group, finding smallest possible
1140 * upper bound of left group.
1141 */
1142 i1 = 0;
1143 i2 = 0;
1144 right_lower = &by_lower[i1].lower;
1145 left_upper = &by_upper[i2].lower;
1146 while (true)
1147 {
1148 /*
1149 * Find next lower bound of right group.
1150 */
1151 while (i1 < nentries &&
1152 range_cmp_bounds(typcache, right_lower,
1153 &by_lower[i1].lower) == 0)
1154 {
1155 if (range_cmp_bounds(typcache, &by_lower[i1].upper,
1156 left_upper) > 0)
1157 left_upper = &by_lower[i1].upper;
1158 i1++;
1159 }
1160 if (i1 >= nentries)
1161 break;
1162 right_lower = &by_lower[i1].lower;
1163
1164 /*
1165 * Find count of ranges which anyway should be placed to the left
1166 * group.
1167 */
1168 while (i2 < nentries &&
1169 range_cmp_bounds(typcache, &by_upper[i2].upper,
1170 left_upper) <= 0)
1171 i2++;
1172
1173 /*
1174 * Consider found split to see if it's better than what we had.
1175 */
1176 range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
1177 }
1178
1179 /*
1180 * Iterate over upper bound of left group finding greatest possible lower
1181 * bound of right group.
1182 */
1183 i1 = nentries - 1;
1184 i2 = nentries - 1;
1185 right_lower = &by_lower[i1].upper;
1186 left_upper = &by_upper[i2].upper;
1187 while (true)
1188 {
1189 /*
1190 * Find next upper bound of left group.
1191 */
1192 while (i2 >= 0 &&
1193 range_cmp_bounds(typcache, left_upper,
1194 &by_upper[i2].upper) == 0)
1195 {
1196 if (range_cmp_bounds(typcache, &by_upper[i2].lower,
1197 right_lower) < 0)
1198 right_lower = &by_upper[i2].lower;
1199 i2--;
1200 }
1201 if (i2 < 0)
1202 break;
1203 left_upper = &by_upper[i2].upper;
1204
1205 /*
1206 * Find count of intervals which anyway should be placed to the right
1207 * group.
1208 */
1209 while (i1 >= 0 &&
1210 range_cmp_bounds(typcache, &by_lower[i1].lower,
1211 right_lower) >= 0)
1212 i1--;
1213
1214 /*
1215 * Consider found split to see if it's better than what we had.
1216 */
1217 range_gist_consider_split(&context, right_lower, i1 + 1,
1218 left_upper, i2 + 1);
1219 }
1220
1221 /*
1222 * If we failed to find any acceptable splits, use trivial split.
1223 */
1224 if (context.first)
1225 {
1226 range_gist_fallback_split(typcache, entryvec, v);
1227 return;
1228 }
1229
1230 /*
1231 * Ok, we have now selected bounds of the groups. Now we have to
1232 * distribute entries themselves. At first we distribute entries which can
1233 * be placed unambiguously and collect "common entries" to array.
1234 */
1235
1236 /* Allocate vectors for results */
1237 v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1238 v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1239 v->spl_nleft = 0;
1240 v->spl_nright = 0;
1241
1242 /*
1243 * Allocate an array for "common entries" - entries which can be placed to
1244 * either group without affecting overlap along selected axis.
1245 */
1246 common_entries_count = 0;
1247 common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
1248
1249 /*
1250 * Distribute entries which can be distributed unambiguously, and collect
1251 * common entries.
1252 */
1253 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1254 {
1255 RangeBound lower,
1256 upper;
1257 bool empty;
1258
1259 /*
1260 * Get upper and lower bounds along selected axis.
1261 */
1262 range = DatumGetRangeTypeP(entryvec->vector[i].key);
1263
1264 range_deserialize(typcache, range, &lower, &upper, &empty);
1265
1266 if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
1267 {
1268 /* Fits in the left group */
1269 if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
1270 {
1271 /* Fits also in the right group, so "common entry" */
1272 common_entries[common_entries_count].index = i;
1273 if (context.has_subtype_diff)
1274 {
1275 /*
1276 * delta = (lower - context.right_lower) -
1277 * (context.left_upper - upper)
1278 */
1279 common_entries[common_entries_count].delta =
1280 call_subtype_diff(typcache,
1281 lower.val,
1282 context.right_lower->val) -
1283 call_subtype_diff(typcache,
1284 context.left_upper->val,
1285 upper.val);
1286 }
1287 else
1288 {
1289 /* Without subtype_diff, take all deltas as zero */
1290 common_entries[common_entries_count].delta = 0;
1291 }
1292 common_entries_count++;
1293 }
1294 else
1295 {
1296 /* Doesn't fit to the right group, so join to the left group */
1297 PLACE_LEFT(range, i);
1298 }
1299 }
1300 else
1301 {
1302 /*
1303 * Each entry should fit on either left or right group. Since this
1304 * entry didn't fit in the left group, it better fit in the right
1305 * group.
1306 */
1307 Assert(range_cmp_bounds(typcache, &lower,
1308 context.right_lower) >= 0);
1309 PLACE_RIGHT(range, i);
1310 }
1311 }
1312
1313 /*
1314 * Distribute "common entries", if any.
1315 */
1316 if (common_entries_count > 0)
1317 {
1318 /*
1319 * Sort "common entries" by calculated deltas in order to distribute
1320 * the most ambiguous entries first.
1321 */
1322 qsort(common_entries, common_entries_count, sizeof(CommonEntry),
1323 common_entry_cmp);
1324
1325 /*
1326 * Distribute "common entries" between groups according to sorting.
1327 */
1328 for (i = 0; i < common_entries_count; i++)
1329 {
1330 int idx = common_entries[i].index;
1331
1332 range = DatumGetRangeTypeP(entryvec->vector[idx].key);
1333
1334 /*
1335 * Check if we have to place this entry in either group to achieve
1336 * LIMIT_RATIO.
1337 */
1338 if (i < context.common_left)
1339 PLACE_LEFT(range, idx);
1340 else
1341 PLACE_RIGHT(range, idx);
1342 }
1343 }
1344
1345 v->spl_ldatum = PointerGetDatum(left_range);
1346 v->spl_rdatum = PointerGetDatum(right_range);
1347}
1348
1349/*
1350 * Consider replacement of currently selected split with a better one
1351 * during range_gist_double_sorting_split.
1352 */
1353static void
1354range_gist_consider_split(ConsiderSplitContext *context,
1355 RangeBound *right_lower, int min_left_count,
1356 RangeBound *left_upper, int max_left_count)
1357{
1358 int left_count,
1359 right_count;
1360 float4 ratio,
1361 overlap;
1362
1363 /*
1364 * Calculate entries distribution ratio assuming most uniform distribution
1365 * of common entries.
1366 */
1367 if (min_left_count >= (context->entries_count + 1) / 2)
1368 left_count = min_left_count;
1369 else if (max_left_count <= context->entries_count / 2)
1370 left_count = max_left_count;
1371 else
1372 left_count = context->entries_count / 2;
1373 right_count = context->entries_count - left_count;
1374
1375 /*
1376 * Ratio of split: quotient between size of smaller group and total
1377 * entries count. This is necessarily 0.5 or less; if it's less than
1378 * LIMIT_RATIO then we will never accept the new split.
1379 */
1380 ratio = ((float4) Min(left_count, right_count)) /
1381 ((float4) context->entries_count);
1382
1383 if (ratio > LIMIT_RATIO)
1384 {
1385 bool selectthis = false;
1386
1387 /*
1388 * The ratio is acceptable, so compare current split with previously
1389 * selected one. We search for minimal overlap (allowing negative
1390 * values) and minimal ratio secondarily. If subtype_diff is
1391 * available, it's used for overlap measure. Without subtype_diff we
1392 * use number of "common entries" as an overlap measure.
1393 */
1394 if (context->has_subtype_diff)
1395 overlap = call_subtype_diff(context->typcache,
1396 left_upper->val,
1397 right_lower->val);
1398 else
1399 overlap = max_left_count - min_left_count;
1400
1401 /* If there is no previous selection, select this split */
1402 if (context->first)
1403 selectthis = true;
1404 else
1405 {
1406 /*
1407 * Choose the new split if it has a smaller overlap, or same
1408 * overlap but better ratio.
1409 */
1410 if (overlap < context->overlap ||
1411 (overlap == context->overlap && ratio > context->ratio))
1412 selectthis = true;
1413 }
1414
1415 if (selectthis)
1416 {
1417 /* save information about selected split */
1418 context->first = false;
1419 context->ratio = ratio;
1420 context->overlap = overlap;
1421 context->right_lower = right_lower;
1422 context->left_upper = left_upper;
1423 context->common_left = max_left_count - left_count;
1424 context->common_right = left_count - min_left_count;
1425 }
1426 }
1427}
1428
1429/*
1430 * Find class number for range.
1431 *
1432 * The class number is a valid combination of the properties of the
1433 * range. Note: the highest possible number is 8, because CLS_EMPTY
1434 * can't be combined with anything else.
1435 */
1436static int
1437get_gist_range_class(RangeType *range)
1438{
1439 int classNumber;
1440 char flags;
1441
1442 flags = range_get_flags(range);
1443 if (flags & RANGE_EMPTY)
1444 {
1445 classNumber = CLS_EMPTY;
1446 }
1447 else
1448 {
1449 classNumber = 0;
1450 if (flags & RANGE_LB_INF)
1451 classNumber |= CLS_LOWER_INF;
1452 if (flags & RANGE_UB_INF)
1453 classNumber |= CLS_UPPER_INF;
1454 if (flags & RANGE_CONTAIN_EMPTY)
1455 classNumber |= CLS_CONTAIN_EMPTY;
1456 }
1457 return classNumber;
1458}
1459
1460/*
1461 * Comparison function for range_gist_single_sorting_split.
1462 */
1463static int
1464single_bound_cmp(const void *a, const void *b, void *arg)
1465{
1466 SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
1467 SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
1468 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1469
1470 return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
1471}
1472
1473/*
1474 * Compare NonEmptyRanges by lower bound.
1475 */
1476static int
1477interval_cmp_lower(const void *a, const void *b, void *arg)
1478{
1479 NonEmptyRange *i1 = (NonEmptyRange *) a;
1480 NonEmptyRange *i2 = (NonEmptyRange *) b;
1481 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1482
1483 return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
1484}
1485
1486/*
1487 * Compare NonEmptyRanges by upper bound.
1488 */
1489static int
1490interval_cmp_upper(const void *a, const void *b, void *arg)
1491{
1492 NonEmptyRange *i1 = (NonEmptyRange *) a;
1493 NonEmptyRange *i2 = (NonEmptyRange *) b;
1494 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1495
1496 return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
1497}
1498
1499/*
1500 * Compare CommonEntrys by their deltas.
1501 */
1502static int
1503common_entry_cmp(const void *i1, const void *i2)
1504{
1505 double delta1 = ((CommonEntry *) i1)->delta;
1506 double delta2 = ((CommonEntry *) i2)->delta;
1507
1508 if (delta1 < delta2)
1509 return -1;
1510 else if (delta1 > delta2)
1511 return 1;
1512 else
1513 return 0;
1514}
1515
1516/*
1517 * Convenience function to invoke type-specific subtype_diff function.
1518 * Caller must have already checked that there is one for the range type.
1519 */
1520static float8
1521call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
1522{
1523 float8 value;
1524
1525 value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
1526 typcache->rng_collation,
1527 val1, val2));
1528 /* Cope with buggy subtype_diff function by returning zero */
1529 if (value >= 0.0)
1530 return value;
1531 return 0.0;
1532}
1533