1/*-------------------------------------------------------------------------
2 *
3 * orderedsetaggs.c
4 * Ordered-set aggregate functions.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/utils/adt/orderedsetaggs.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include <math.h>
18
19#include "catalog/pg_aggregate.h"
20#include "catalog/pg_operator.h"
21#include "catalog/pg_type.h"
22#include "executor/executor.h"
23#include "miscadmin.h"
24#include "nodes/nodeFuncs.h"
25#include "optimizer/optimizer.h"
26#include "utils/array.h"
27#include "utils/builtins.h"
28#include "utils/lsyscache.h"
29#include "utils/memutils.h"
30#include "utils/timestamp.h"
31#include "utils/tuplesort.h"
32
33
34/*
35 * Generic support for ordered-set aggregates
36 *
37 * The state for an ordered-set aggregate is divided into a per-group struct
38 * (which is the internal-type transition state datum returned to nodeAgg.c)
39 * and a per-query struct, which contains data and sub-objects that we can
40 * create just once per query because they will not change across groups.
41 * The per-query struct and subsidiary data live in the executor's per-query
42 * memory context, and go away implicitly at ExecutorEnd().
43 *
44 * These structs are set up during the first call of the transition function.
45 * Because we allow nodeAgg.c to merge ordered-set aggregates (but not
46 * hypothetical aggregates) with identical inputs and transition functions,
47 * this info must not depend on the particular aggregate (ie, particular
48 * final-function), nor on the direct argument(s) of the aggregate.
49 */
50
51typedef struct OSAPerQueryState
52{
53 /* Representative Aggref for this aggregate: */
54 Aggref *aggref;
55 /* Memory context containing this struct and other per-query data: */
56 MemoryContext qcontext;
57 /* Context for expression evaluation */
58 ExprContext *econtext;
59 /* Do we expect multiple final-function calls within one group? */
60 bool rescan_needed;
61
62 /* These fields are used only when accumulating tuples: */
63
64 /* Tuple descriptor for tuples inserted into sortstate: */
65 TupleDesc tupdesc;
66 /* Tuple slot we can use for inserting/extracting tuples: */
67 TupleTableSlot *tupslot;
68 /* Per-sort-column sorting information */
69 int numSortCols;
70 AttrNumber *sortColIdx;
71 Oid *sortOperators;
72 Oid *eqOperators;
73 Oid *sortCollations;
74 bool *sortNullsFirsts;
75 /* Equality operator call info, created only if needed: */
76 ExprState *compareTuple;
77
78 /* These fields are used only when accumulating datums: */
79
80 /* Info about datatype of datums being sorted: */
81 Oid sortColType;
82 int16 typLen;
83 bool typByVal;
84 char typAlign;
85 /* Info about sort ordering: */
86 Oid sortOperator;
87 Oid eqOperator;
88 Oid sortCollation;
89 bool sortNullsFirst;
90 /* Equality operator call info, created only if needed: */
91 FmgrInfo equalfn;
92} OSAPerQueryState;
93
94typedef struct OSAPerGroupState
95{
96 /* Link to the per-query state for this aggregate: */
97 OSAPerQueryState *qstate;
98 /* Memory context containing per-group data: */
99 MemoryContext gcontext;
100 /* Sort object we're accumulating data in: */
101 Tuplesortstate *sortstate;
102 /* Number of normal rows inserted into sortstate: */
103 int64 number_of_rows;
104 /* Have we already done tuplesort_performsort? */
105 bool sort_done;
106} OSAPerGroupState;
107
108static void ordered_set_shutdown(Datum arg);
109
110
111/*
112 * Set up working state for an ordered-set aggregate
113 */
114static OSAPerGroupState *
115ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
116{
117 OSAPerGroupState *osastate;
118 OSAPerQueryState *qstate;
119 MemoryContext gcontext;
120 MemoryContext oldcontext;
121
122 /*
123 * Check we're called as aggregate (and not a window function), and get
124 * the Agg node's group-lifespan context (which might change from group to
125 * group, so we shouldn't cache it in the per-query state).
126 */
127 if (AggCheckCallContext(fcinfo, &gcontext) != AGG_CONTEXT_AGGREGATE)
128 elog(ERROR, "ordered-set aggregate called in non-aggregate context");
129
130 /*
131 * We keep a link to the per-query state in fn_extra; if it's not there,
132 * create it, and do the per-query setup we need.
133 */
134 qstate = (OSAPerQueryState *) fcinfo->flinfo->fn_extra;
135 if (qstate == NULL)
136 {
137 Aggref *aggref;
138 MemoryContext qcontext;
139 List *sortlist;
140 int numSortCols;
141
142 /* Get the Aggref so we can examine aggregate's arguments */
143 aggref = AggGetAggref(fcinfo);
144 if (!aggref)
145 elog(ERROR, "ordered-set aggregate called in non-aggregate context");
146 if (!AGGKIND_IS_ORDERED_SET(aggref->aggkind))
147 elog(ERROR, "ordered-set aggregate support function called for non-ordered-set aggregate");
148
149 /*
150 * Prepare per-query structures in the fn_mcxt, which we assume is the
151 * executor's per-query context; in any case it's the right place to
152 * keep anything found via fn_extra.
153 */
154 qcontext = fcinfo->flinfo->fn_mcxt;
155 oldcontext = MemoryContextSwitchTo(qcontext);
156
157 qstate = (OSAPerQueryState *) palloc0(sizeof(OSAPerQueryState));
158 qstate->aggref = aggref;
159 qstate->qcontext = qcontext;
160
161 /* We need to support rescans if the trans state is shared */
162 qstate->rescan_needed = AggStateIsShared(fcinfo);
163
164 /* Extract the sort information */
165 sortlist = aggref->aggorder;
166 numSortCols = list_length(sortlist);
167
168 if (use_tuples)
169 {
170 bool ishypothetical = (aggref->aggkind == AGGKIND_HYPOTHETICAL);
171 ListCell *lc;
172 int i;
173
174 if (ishypothetical)
175 numSortCols++; /* make space for flag column */
176 qstate->numSortCols = numSortCols;
177 qstate->sortColIdx = (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
178 qstate->sortOperators = (Oid *) palloc(numSortCols * sizeof(Oid));
179 qstate->eqOperators = (Oid *) palloc(numSortCols * sizeof(Oid));
180 qstate->sortCollations = (Oid *) palloc(numSortCols * sizeof(Oid));
181 qstate->sortNullsFirsts = (bool *) palloc(numSortCols * sizeof(bool));
182
183 i = 0;
184 foreach(lc, sortlist)
185 {
186 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
187 TargetEntry *tle = get_sortgroupclause_tle(sortcl,
188 aggref->args);
189
190 /* the parser should have made sure of this */
191 Assert(OidIsValid(sortcl->sortop));
192
193 qstate->sortColIdx[i] = tle->resno;
194 qstate->sortOperators[i] = sortcl->sortop;
195 qstate->eqOperators[i] = sortcl->eqop;
196 qstate->sortCollations[i] = exprCollation((Node *) tle->expr);
197 qstate->sortNullsFirsts[i] = sortcl->nulls_first;
198 i++;
199 }
200
201 if (ishypothetical)
202 {
203 /* Add an integer flag column as the last sort column */
204 qstate->sortColIdx[i] = list_length(aggref->args) + 1;
205 qstate->sortOperators[i] = Int4LessOperator;
206 qstate->eqOperators[i] = Int4EqualOperator;
207 qstate->sortCollations[i] = InvalidOid;
208 qstate->sortNullsFirsts[i] = false;
209 i++;
210 }
211
212 Assert(i == numSortCols);
213
214 /*
215 * Get a tupledesc corresponding to the aggregated inputs
216 * (including sort expressions) of the agg.
217 */
218 qstate->tupdesc = ExecTypeFromTL(aggref->args);
219
220 /* If we need a flag column, hack the tupledesc to include that */
221 if (ishypothetical)
222 {
223 TupleDesc newdesc;
224 int natts = qstate->tupdesc->natts;
225
226 newdesc = CreateTemplateTupleDesc(natts + 1);
227 for (i = 1; i <= natts; i++)
228 TupleDescCopyEntry(newdesc, i, qstate->tupdesc, i);
229
230 TupleDescInitEntry(newdesc,
231 (AttrNumber) ++natts,
232 "flag",
233 INT4OID,
234 -1,
235 0);
236
237 FreeTupleDesc(qstate->tupdesc);
238 qstate->tupdesc = newdesc;
239 }
240
241 /* Create slot we'll use to store/retrieve rows */
242 qstate->tupslot = MakeSingleTupleTableSlot(qstate->tupdesc,
243 &TTSOpsMinimalTuple);
244 }
245 else
246 {
247 /* Sort single datums */
248 SortGroupClause *sortcl;
249 TargetEntry *tle;
250
251 if (numSortCols != 1 || aggref->aggkind == AGGKIND_HYPOTHETICAL)
252 elog(ERROR, "ordered-set aggregate support function does not support multiple aggregated columns");
253
254 sortcl = (SortGroupClause *) linitial(sortlist);
255 tle = get_sortgroupclause_tle(sortcl, aggref->args);
256
257 /* the parser should have made sure of this */
258 Assert(OidIsValid(sortcl->sortop));
259
260 /* Save sort ordering info */
261 qstate->sortColType = exprType((Node *) tle->expr);
262 qstate->sortOperator = sortcl->sortop;
263 qstate->eqOperator = sortcl->eqop;
264 qstate->sortCollation = exprCollation((Node *) tle->expr);
265 qstate->sortNullsFirst = sortcl->nulls_first;
266
267 /* Save datatype info */
268 get_typlenbyvalalign(qstate->sortColType,
269 &qstate->typLen,
270 &qstate->typByVal,
271 &qstate->typAlign);
272 }
273
274 fcinfo->flinfo->fn_extra = (void *) qstate;
275
276 MemoryContextSwitchTo(oldcontext);
277 }
278
279 /* Now build the stuff we need in group-lifespan context */
280 oldcontext = MemoryContextSwitchTo(gcontext);
281
282 osastate = (OSAPerGroupState *) palloc(sizeof(OSAPerGroupState));
283 osastate->qstate = qstate;
284 osastate->gcontext = gcontext;
285
286 /*
287 * Initialize tuplesort object.
288 */
289 if (use_tuples)
290 osastate->sortstate = tuplesort_begin_heap(qstate->tupdesc,
291 qstate->numSortCols,
292 qstate->sortColIdx,
293 qstate->sortOperators,
294 qstate->sortCollations,
295 qstate->sortNullsFirsts,
296 work_mem,
297 NULL,
298 qstate->rescan_needed);
299 else
300 osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
301 qstate->sortOperator,
302 qstate->sortCollation,
303 qstate->sortNullsFirst,
304 work_mem,
305 NULL,
306 qstate->rescan_needed);
307
308 osastate->number_of_rows = 0;
309 osastate->sort_done = false;
310
311 /* Now register a shutdown callback to clean things up at end of group */
312 AggRegisterCallback(fcinfo,
313 ordered_set_shutdown,
314 PointerGetDatum(osastate));
315
316 MemoryContextSwitchTo(oldcontext);
317
318 return osastate;
319}
320
321/*
322 * Clean up when evaluation of an ordered-set aggregate is complete.
323 *
324 * We don't need to bother freeing objects in the per-group memory context,
325 * since that will get reset anyway by nodeAgg.c; nor should we free anything
326 * in the per-query context, which will get cleared (if this was the last
327 * group) by ExecutorEnd. But we must take care to release any potential
328 * non-memory resources.
329 *
330 * In the case where we're not expecting multiple finalfn calls, we could
331 * arguably rely on the finalfn to clean up; but it's easier and more testable
332 * if we just do it the same way in either case.
333 */
334static void
335ordered_set_shutdown(Datum arg)
336{
337 OSAPerGroupState *osastate = (OSAPerGroupState *) DatumGetPointer(arg);
338
339 /* Tuplesort object might have temp files. */
340 if (osastate->sortstate)
341 tuplesort_end(osastate->sortstate);
342 osastate->sortstate = NULL;
343 /* The tupleslot probably can't be holding a pin, but let's be safe. */
344 if (osastate->qstate->tupslot)
345 ExecClearTuple(osastate->qstate->tupslot);
346}
347
348
349/*
350 * Generic transition function for ordered-set aggregates
351 * with a single input column in which we want to suppress nulls
352 */
353Datum
354ordered_set_transition(PG_FUNCTION_ARGS)
355{
356 OSAPerGroupState *osastate;
357
358 /* If first call, create the transition state workspace */
359 if (PG_ARGISNULL(0))
360 osastate = ordered_set_startup(fcinfo, false);
361 else
362 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
363
364 /* Load the datum into the tuplesort object, but only if it's not null */
365 if (!PG_ARGISNULL(1))
366 {
367 tuplesort_putdatum(osastate->sortstate, PG_GETARG_DATUM(1), false);
368 osastate->number_of_rows++;
369 }
370
371 PG_RETURN_POINTER(osastate);
372}
373
374/*
375 * Generic transition function for ordered-set aggregates
376 * with (potentially) multiple aggregated input columns
377 */
378Datum
379ordered_set_transition_multi(PG_FUNCTION_ARGS)
380{
381 OSAPerGroupState *osastate;
382 TupleTableSlot *slot;
383 int nargs;
384 int i;
385
386 /* If first call, create the transition state workspace */
387 if (PG_ARGISNULL(0))
388 osastate = ordered_set_startup(fcinfo, true);
389 else
390 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
391
392 /* Form a tuple from all the other inputs besides the transition value */
393 slot = osastate->qstate->tupslot;
394 ExecClearTuple(slot);
395 nargs = PG_NARGS() - 1;
396 for (i = 0; i < nargs; i++)
397 {
398 slot->tts_values[i] = PG_GETARG_DATUM(i + 1);
399 slot->tts_isnull[i] = PG_ARGISNULL(i + 1);
400 }
401 if (osastate->qstate->aggref->aggkind == AGGKIND_HYPOTHETICAL)
402 {
403 /* Add a zero flag value to mark this row as a normal input row */
404 slot->tts_values[i] = Int32GetDatum(0);
405 slot->tts_isnull[i] = false;
406 i++;
407 }
408 Assert(i == slot->tts_tupleDescriptor->natts);
409 ExecStoreVirtualTuple(slot);
410
411 /* Load the row into the tuplesort object */
412 tuplesort_puttupleslot(osastate->sortstate, slot);
413 osastate->number_of_rows++;
414
415 PG_RETURN_POINTER(osastate);
416}
417
418
419/*
420 * percentile_disc(float8) within group(anyelement) - discrete percentile
421 */
422Datum
423percentile_disc_final(PG_FUNCTION_ARGS)
424{
425 OSAPerGroupState *osastate;
426 double percentile;
427 Datum val;
428 bool isnull;
429 int64 rownum;
430
431 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
432
433 /* Get and check the percentile argument */
434 if (PG_ARGISNULL(1))
435 PG_RETURN_NULL();
436
437 percentile = PG_GETARG_FLOAT8(1);
438
439 if (percentile < 0 || percentile > 1 || isnan(percentile))
440 ereport(ERROR,
441 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
442 errmsg("percentile value %g is not between 0 and 1",
443 percentile)));
444
445 /* If there were no regular rows, the result is NULL */
446 if (PG_ARGISNULL(0))
447 PG_RETURN_NULL();
448
449 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
450
451 /* number_of_rows could be zero if we only saw NULL input values */
452 if (osastate->number_of_rows == 0)
453 PG_RETURN_NULL();
454
455 /* Finish the sort, or rescan if we already did */
456 if (!osastate->sort_done)
457 {
458 tuplesort_performsort(osastate->sortstate);
459 osastate->sort_done = true;
460 }
461 else
462 tuplesort_rescan(osastate->sortstate);
463
464 /*----------
465 * We need the smallest K such that (K/N) >= percentile.
466 * N>0, therefore K >= N*percentile, therefore K = ceil(N*percentile).
467 * So we skip K-1 rows (if K>0) and return the next row fetched.
468 *----------
469 */
470 rownum = (int64) ceil(percentile * osastate->number_of_rows);
471 Assert(rownum <= osastate->number_of_rows);
472
473 if (rownum > 1)
474 {
475 if (!tuplesort_skiptuples(osastate->sortstate, rownum - 1, true))
476 elog(ERROR, "missing row in percentile_disc");
477 }
478
479 if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL))
480 elog(ERROR, "missing row in percentile_disc");
481
482 /* We shouldn't have stored any nulls, but do the right thing anyway */
483 if (isnull)
484 PG_RETURN_NULL();
485 else
486 PG_RETURN_DATUM(val);
487}
488
489
490/*
491 * For percentile_cont, we need a way to interpolate between consecutive
492 * values. Use a helper function for that, so that we can share the rest
493 * of the code between types.
494 */
495typedef Datum (*LerpFunc) (Datum lo, Datum hi, double pct);
496
497static Datum
498float8_lerp(Datum lo, Datum hi, double pct)
499{
500 double loval = DatumGetFloat8(lo);
501 double hival = DatumGetFloat8(hi);
502
503 return Float8GetDatum(loval + (pct * (hival - loval)));
504}
505
506static Datum
507interval_lerp(Datum lo, Datum hi, double pct)
508{
509 Datum diff_result = DirectFunctionCall2(interval_mi, hi, lo);
510 Datum mul_result = DirectFunctionCall2(interval_mul,
511 diff_result,
512 Float8GetDatumFast(pct));
513
514 return DirectFunctionCall2(interval_pl, mul_result, lo);
515}
516
517/*
518 * Continuous percentile
519 */
520static Datum
521percentile_cont_final_common(FunctionCallInfo fcinfo,
522 Oid expect_type,
523 LerpFunc lerpfunc)
524{
525 OSAPerGroupState *osastate;
526 double percentile;
527 int64 first_row = 0;
528 int64 second_row = 0;
529 Datum val;
530 Datum first_val;
531 Datum second_val;
532 double proportion;
533 bool isnull;
534
535 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
536
537 /* Get and check the percentile argument */
538 if (PG_ARGISNULL(1))
539 PG_RETURN_NULL();
540
541 percentile = PG_GETARG_FLOAT8(1);
542
543 if (percentile < 0 || percentile > 1 || isnan(percentile))
544 ereport(ERROR,
545 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
546 errmsg("percentile value %g is not between 0 and 1",
547 percentile)));
548
549 /* If there were no regular rows, the result is NULL */
550 if (PG_ARGISNULL(0))
551 PG_RETURN_NULL();
552
553 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
554
555 /* number_of_rows could be zero if we only saw NULL input values */
556 if (osastate->number_of_rows == 0)
557 PG_RETURN_NULL();
558
559 Assert(expect_type == osastate->qstate->sortColType);
560
561 /* Finish the sort, or rescan if we already did */
562 if (!osastate->sort_done)
563 {
564 tuplesort_performsort(osastate->sortstate);
565 osastate->sort_done = true;
566 }
567 else
568 tuplesort_rescan(osastate->sortstate);
569
570 first_row = floor(percentile * (osastate->number_of_rows - 1));
571 second_row = ceil(percentile * (osastate->number_of_rows - 1));
572
573 Assert(first_row < osastate->number_of_rows);
574
575 if (!tuplesort_skiptuples(osastate->sortstate, first_row, true))
576 elog(ERROR, "missing row in percentile_cont");
577
578 if (!tuplesort_getdatum(osastate->sortstate, true, &first_val, &isnull, NULL))
579 elog(ERROR, "missing row in percentile_cont");
580 if (isnull)
581 PG_RETURN_NULL();
582
583 if (first_row == second_row)
584 {
585 val = first_val;
586 }
587 else
588 {
589 if (!tuplesort_getdatum(osastate->sortstate, true, &second_val, &isnull, NULL))
590 elog(ERROR, "missing row in percentile_cont");
591
592 if (isnull)
593 PG_RETURN_NULL();
594
595 proportion = (percentile * (osastate->number_of_rows - 1)) - first_row;
596 val = lerpfunc(first_val, second_val, proportion);
597 }
598
599 PG_RETURN_DATUM(val);
600}
601
602/*
603 * percentile_cont(float8) within group (float8) - continuous percentile
604 */
605Datum
606percentile_cont_float8_final(PG_FUNCTION_ARGS)
607{
608 return percentile_cont_final_common(fcinfo, FLOAT8OID, float8_lerp);
609}
610
611/*
612 * percentile_cont(float8) within group (interval) - continuous percentile
613 */
614Datum
615percentile_cont_interval_final(PG_FUNCTION_ARGS)
616{
617 return percentile_cont_final_common(fcinfo, INTERVALOID, interval_lerp);
618}
619
620
621/*
622 * Support code for handling arrays of percentiles
623 *
624 * Note: in each pct_info entry, second_row should be equal to or
625 * exactly one more than first_row.
626 */
627struct pct_info
628{
629 int64 first_row; /* first row to sample */
630 int64 second_row; /* possible second row to sample */
631 double proportion; /* interpolation fraction */
632 int idx; /* index of this item in original array */
633};
634
635/*
636 * Sort comparator to sort pct_infos by first_row then second_row
637 */
638static int
639pct_info_cmp(const void *pa, const void *pb)
640{
641 const struct pct_info *a = (const struct pct_info *) pa;
642 const struct pct_info *b = (const struct pct_info *) pb;
643
644 if (a->first_row != b->first_row)
645 return (a->first_row < b->first_row) ? -1 : 1;
646 if (a->second_row != b->second_row)
647 return (a->second_row < b->second_row) ? -1 : 1;
648 return 0;
649}
650
651/*
652 * Construct array showing which rows to sample for percentiles.
653 */
654static struct pct_info *
655setup_pct_info(int num_percentiles,
656 Datum *percentiles_datum,
657 bool *percentiles_null,
658 int64 rowcount,
659 bool continuous)
660{
661 struct pct_info *pct_info;
662 int i;
663
664 pct_info = (struct pct_info *) palloc(num_percentiles * sizeof(struct pct_info));
665
666 for (i = 0; i < num_percentiles; i++)
667 {
668 pct_info[i].idx = i;
669
670 if (percentiles_null[i])
671 {
672 /* dummy entry for any NULL in array */
673 pct_info[i].first_row = 0;
674 pct_info[i].second_row = 0;
675 pct_info[i].proportion = 0;
676 }
677 else
678 {
679 double p = DatumGetFloat8(percentiles_datum[i]);
680
681 if (p < 0 || p > 1 || isnan(p))
682 ereport(ERROR,
683 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
684 errmsg("percentile value %g is not between 0 and 1",
685 p)));
686
687 if (continuous)
688 {
689 pct_info[i].first_row = 1 + floor(p * (rowcount - 1));
690 pct_info[i].second_row = 1 + ceil(p * (rowcount - 1));
691 pct_info[i].proportion = (p * (rowcount - 1)) - floor(p * (rowcount - 1));
692 }
693 else
694 {
695 /*----------
696 * We need the smallest K such that (K/N) >= percentile.
697 * N>0, therefore K >= N*percentile, therefore
698 * K = ceil(N*percentile); but not less than 1.
699 *----------
700 */
701 int64 row = (int64) ceil(p * rowcount);
702
703 row = Max(1, row);
704 pct_info[i].first_row = row;
705 pct_info[i].second_row = row;
706 pct_info[i].proportion = 0;
707 }
708 }
709 }
710
711 /*
712 * The parameter array wasn't necessarily in sorted order, but we need to
713 * visit the rows in order, so sort by first_row/second_row.
714 */
715 qsort(pct_info, num_percentiles, sizeof(struct pct_info), pct_info_cmp);
716
717 return pct_info;
718}
719
720/*
721 * percentile_disc(float8[]) within group (anyelement) - discrete percentiles
722 */
723Datum
724percentile_disc_multi_final(PG_FUNCTION_ARGS)
725{
726 OSAPerGroupState *osastate;
727 ArrayType *param;
728 Datum *percentiles_datum;
729 bool *percentiles_null;
730 int num_percentiles;
731 struct pct_info *pct_info;
732 Datum *result_datum;
733 bool *result_isnull;
734 int64 rownum = 0;
735 Datum val = (Datum) 0;
736 bool isnull = true;
737 int i;
738
739 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
740
741 /* If there were no regular rows, the result is NULL */
742 if (PG_ARGISNULL(0))
743 PG_RETURN_NULL();
744
745 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
746
747 /* number_of_rows could be zero if we only saw NULL input values */
748 if (osastate->number_of_rows == 0)
749 PG_RETURN_NULL();
750
751 /* Deconstruct the percentile-array input */
752 if (PG_ARGISNULL(1))
753 PG_RETURN_NULL();
754 param = PG_GETARG_ARRAYTYPE_P(1);
755
756 deconstruct_array(param, FLOAT8OID,
757 /* hard-wired info on type float8 */
758 8, FLOAT8PASSBYVAL, 'd',
759 &percentiles_datum,
760 &percentiles_null,
761 &num_percentiles);
762
763 if (num_percentiles == 0)
764 PG_RETURN_POINTER(construct_empty_array(osastate->qstate->sortColType));
765
766 pct_info = setup_pct_info(num_percentiles,
767 percentiles_datum,
768 percentiles_null,
769 osastate->number_of_rows,
770 false);
771
772 result_datum = (Datum *) palloc(num_percentiles * sizeof(Datum));
773 result_isnull = (bool *) palloc(num_percentiles * sizeof(bool));
774
775 /*
776 * Start by dealing with any nulls in the param array - those are sorted
777 * to the front on row=0, so set the corresponding result indexes to null
778 */
779 for (i = 0; i < num_percentiles; i++)
780 {
781 int idx = pct_info[i].idx;
782
783 if (pct_info[i].first_row > 0)
784 break;
785
786 result_datum[idx] = (Datum) 0;
787 result_isnull[idx] = true;
788 }
789
790 /*
791 * If there's anything left after doing the nulls, then grind the input
792 * and extract the needed values
793 */
794 if (i < num_percentiles)
795 {
796 /* Finish the sort, or rescan if we already did */
797 if (!osastate->sort_done)
798 {
799 tuplesort_performsort(osastate->sortstate);
800 osastate->sort_done = true;
801 }
802 else
803 tuplesort_rescan(osastate->sortstate);
804
805 for (; i < num_percentiles; i++)
806 {
807 int64 target_row = pct_info[i].first_row;
808 int idx = pct_info[i].idx;
809
810 /* Advance to target row, if not already there */
811 if (target_row > rownum)
812 {
813 if (!tuplesort_skiptuples(osastate->sortstate, target_row - rownum - 1, true))
814 elog(ERROR, "missing row in percentile_disc");
815
816 if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL))
817 elog(ERROR, "missing row in percentile_disc");
818
819 rownum = target_row;
820 }
821
822 result_datum[idx] = val;
823 result_isnull[idx] = isnull;
824 }
825 }
826
827 /* We make the output array the same shape as the input */
828 PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
829 ARR_NDIM(param),
830 ARR_DIMS(param),
831 ARR_LBOUND(param),
832 osastate->qstate->sortColType,
833 osastate->qstate->typLen,
834 osastate->qstate->typByVal,
835 osastate->qstate->typAlign));
836}
837
838/*
839 * percentile_cont(float8[]) within group () - continuous percentiles
840 */
841static Datum
842percentile_cont_multi_final_common(FunctionCallInfo fcinfo,
843 Oid expect_type,
844 int16 typLen, bool typByVal, char typAlign,
845 LerpFunc lerpfunc)
846{
847 OSAPerGroupState *osastate;
848 ArrayType *param;
849 Datum *percentiles_datum;
850 bool *percentiles_null;
851 int num_percentiles;
852 struct pct_info *pct_info;
853 Datum *result_datum;
854 bool *result_isnull;
855 int64 rownum = 0;
856 Datum first_val = (Datum) 0;
857 Datum second_val = (Datum) 0;
858 bool isnull;
859 int i;
860
861 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
862
863 /* If there were no regular rows, the result is NULL */
864 if (PG_ARGISNULL(0))
865 PG_RETURN_NULL();
866
867 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
868
869 /* number_of_rows could be zero if we only saw NULL input values */
870 if (osastate->number_of_rows == 0)
871 PG_RETURN_NULL();
872
873 Assert(expect_type == osastate->qstate->sortColType);
874
875 /* Deconstruct the percentile-array input */
876 if (PG_ARGISNULL(1))
877 PG_RETURN_NULL();
878 param = PG_GETARG_ARRAYTYPE_P(1);
879
880 deconstruct_array(param, FLOAT8OID,
881 /* hard-wired info on type float8 */
882 8, FLOAT8PASSBYVAL, 'd',
883 &percentiles_datum,
884 &percentiles_null,
885 &num_percentiles);
886
887 if (num_percentiles == 0)
888 PG_RETURN_POINTER(construct_empty_array(osastate->qstate->sortColType));
889
890 pct_info = setup_pct_info(num_percentiles,
891 percentiles_datum,
892 percentiles_null,
893 osastate->number_of_rows,
894 true);
895
896 result_datum = (Datum *) palloc(num_percentiles * sizeof(Datum));
897 result_isnull = (bool *) palloc(num_percentiles * sizeof(bool));
898
899 /*
900 * Start by dealing with any nulls in the param array - those are sorted
901 * to the front on row=0, so set the corresponding result indexes to null
902 */
903 for (i = 0; i < num_percentiles; i++)
904 {
905 int idx = pct_info[i].idx;
906
907 if (pct_info[i].first_row > 0)
908 break;
909
910 result_datum[idx] = (Datum) 0;
911 result_isnull[idx] = true;
912 }
913
914 /*
915 * If there's anything left after doing the nulls, then grind the input
916 * and extract the needed values
917 */
918 if (i < num_percentiles)
919 {
920 /* Finish the sort, or rescan if we already did */
921 if (!osastate->sort_done)
922 {
923 tuplesort_performsort(osastate->sortstate);
924 osastate->sort_done = true;
925 }
926 else
927 tuplesort_rescan(osastate->sortstate);
928
929 for (; i < num_percentiles; i++)
930 {
931 int64 first_row = pct_info[i].first_row;
932 int64 second_row = pct_info[i].second_row;
933 int idx = pct_info[i].idx;
934
935 /*
936 * Advance to first_row, if not already there. Note that we might
937 * already have rownum beyond first_row, in which case first_val
938 * is already correct. (This occurs when interpolating between
939 * the same two input rows as for the previous percentile.)
940 */
941 if (first_row > rownum)
942 {
943 if (!tuplesort_skiptuples(osastate->sortstate, first_row - rownum - 1, true))
944 elog(ERROR, "missing row in percentile_cont");
945
946 if (!tuplesort_getdatum(osastate->sortstate, true, &first_val,
947 &isnull, NULL) || isnull)
948 elog(ERROR, "missing row in percentile_cont");
949
950 rownum = first_row;
951 /* Always advance second_val to be latest input value */
952 second_val = first_val;
953 }
954 else if (first_row == rownum)
955 {
956 /*
957 * We are already at the desired row, so we must previously
958 * have read its value into second_val (and perhaps first_val
959 * as well, but this assignment is harmless in that case).
960 */
961 first_val = second_val;
962 }
963
964 /* Fetch second_row if needed */
965 if (second_row > rownum)
966 {
967 if (!tuplesort_getdatum(osastate->sortstate, true, &second_val,
968 &isnull, NULL) || isnull)
969 elog(ERROR, "missing row in percentile_cont");
970 rownum++;
971 }
972 /* We should now certainly be on second_row exactly */
973 Assert(second_row == rownum);
974
975 /* Compute appropriate result */
976 if (second_row > first_row)
977 result_datum[idx] = lerpfunc(first_val, second_val,
978 pct_info[i].proportion);
979 else
980 result_datum[idx] = first_val;
981
982 result_isnull[idx] = false;
983 }
984 }
985
986 /* We make the output array the same shape as the input */
987 PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
988 ARR_NDIM(param),
989 ARR_DIMS(param), ARR_LBOUND(param),
990 expect_type,
991 typLen,
992 typByVal,
993 typAlign));
994}
995
996/*
997 * percentile_cont(float8[]) within group (float8) - continuous percentiles
998 */
999Datum
1000percentile_cont_float8_multi_final(PG_FUNCTION_ARGS)
1001{
1002 return percentile_cont_multi_final_common(fcinfo,
1003 FLOAT8OID,
1004 /* hard-wired info on type float8 */
1005 8, FLOAT8PASSBYVAL, 'd',
1006 float8_lerp);
1007}
1008
1009/*
1010 * percentile_cont(float8[]) within group (interval) - continuous percentiles
1011 */
1012Datum
1013percentile_cont_interval_multi_final(PG_FUNCTION_ARGS)
1014{
1015 return percentile_cont_multi_final_common(fcinfo,
1016 INTERVALOID,
1017 /* hard-wired info on type interval */
1018 16, false, 'd',
1019 interval_lerp);
1020}
1021
1022
1023/*
1024 * mode() within group (anyelement) - most common value
1025 */
1026Datum
1027mode_final(PG_FUNCTION_ARGS)
1028{
1029 OSAPerGroupState *osastate;
1030 Datum val;
1031 bool isnull;
1032 Datum mode_val = (Datum) 0;
1033 int64 mode_freq = 0;
1034 Datum last_val = (Datum) 0;
1035 int64 last_val_freq = 0;
1036 bool last_val_is_mode = false;
1037 FmgrInfo *equalfn;
1038 Datum abbrev_val = (Datum) 0;
1039 Datum last_abbrev_val = (Datum) 0;
1040 bool shouldfree;
1041
1042 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
1043
1044 /* If there were no regular rows, the result is NULL */
1045 if (PG_ARGISNULL(0))
1046 PG_RETURN_NULL();
1047
1048 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
1049
1050 /* number_of_rows could be zero if we only saw NULL input values */
1051 if (osastate->number_of_rows == 0)
1052 PG_RETURN_NULL();
1053
1054 /* Look up the equality function for the datatype, if we didn't already */
1055 equalfn = &(osastate->qstate->equalfn);
1056 if (!OidIsValid(equalfn->fn_oid))
1057 fmgr_info_cxt(get_opcode(osastate->qstate->eqOperator), equalfn,
1058 osastate->qstate->qcontext);
1059
1060 shouldfree = !(osastate->qstate->typByVal);
1061
1062 /* Finish the sort, or rescan if we already did */
1063 if (!osastate->sort_done)
1064 {
1065 tuplesort_performsort(osastate->sortstate);
1066 osastate->sort_done = true;
1067 }
1068 else
1069 tuplesort_rescan(osastate->sortstate);
1070
1071 /* Scan tuples and count frequencies */
1072 while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val))
1073 {
1074 /* we don't expect any nulls, but ignore them if found */
1075 if (isnull)
1076 continue;
1077
1078 if (last_val_freq == 0)
1079 {
1080 /* first nonnull value - it's the mode for now */
1081 mode_val = last_val = val;
1082 mode_freq = last_val_freq = 1;
1083 last_val_is_mode = true;
1084 last_abbrev_val = abbrev_val;
1085 }
1086 else if (abbrev_val == last_abbrev_val &&
1087 DatumGetBool(FunctionCall2Coll(equalfn, PG_GET_COLLATION(), val, last_val)))
1088 {
1089 /* value equal to previous value, count it */
1090 if (last_val_is_mode)
1091 mode_freq++; /* needn't maintain last_val_freq */
1092 else if (++last_val_freq > mode_freq)
1093 {
1094 /* last_val becomes new mode */
1095 if (shouldfree)
1096 pfree(DatumGetPointer(mode_val));
1097 mode_val = last_val;
1098 mode_freq = last_val_freq;
1099 last_val_is_mode = true;
1100 }
1101 if (shouldfree)
1102 pfree(DatumGetPointer(val));
1103 }
1104 else
1105 {
1106 /* val should replace last_val */
1107 if (shouldfree && !last_val_is_mode)
1108 pfree(DatumGetPointer(last_val));
1109 last_val = val;
1110 /* avoid equality function calls by reusing abbreviated keys */
1111 last_abbrev_val = abbrev_val;
1112 last_val_freq = 1;
1113 last_val_is_mode = false;
1114 }
1115
1116 CHECK_FOR_INTERRUPTS();
1117 }
1118
1119 if (shouldfree && !last_val_is_mode)
1120 pfree(DatumGetPointer(last_val));
1121
1122 if (mode_freq)
1123 PG_RETURN_DATUM(mode_val);
1124 else
1125 PG_RETURN_NULL();
1126}
1127
1128
1129/*
1130 * Common code to sanity-check args for hypothetical-set functions. No need
1131 * for friendly errors, these can only happen if someone's messing up the
1132 * aggregate definitions. The checks are needed for security, however.
1133 */
1134static void
1135hypothetical_check_argtypes(FunctionCallInfo fcinfo, int nargs,
1136 TupleDesc tupdesc)
1137{
1138 int i;
1139
1140 /* check that we have an int4 flag column */
1141 if (!tupdesc ||
1142 (nargs + 1) != tupdesc->natts ||
1143 TupleDescAttr(tupdesc, nargs)->atttypid != INT4OID)
1144 elog(ERROR, "type mismatch in hypothetical-set function");
1145
1146 /* check that direct args match in type with aggregated args */
1147 for (i = 0; i < nargs; i++)
1148 {
1149 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
1150
1151 if (get_fn_expr_argtype(fcinfo->flinfo, i + 1) != attr->atttypid)
1152 elog(ERROR, "type mismatch in hypothetical-set function");
1153 }
1154}
1155
1156/*
1157 * compute rank of hypothetical row
1158 *
1159 * flag should be -1 to sort hypothetical row ahead of its peers, or +1
1160 * to sort behind.
1161 * total number of regular rows is returned into *number_of_rows.
1162 */
1163static int64
1164hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
1165 int64 *number_of_rows)
1166{
1167 int nargs = PG_NARGS() - 1;
1168 int64 rank = 1;
1169 OSAPerGroupState *osastate;
1170 TupleTableSlot *slot;
1171 int i;
1172
1173 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
1174
1175 /* If there were no regular rows, the rank is always 1 */
1176 if (PG_ARGISNULL(0))
1177 {
1178 *number_of_rows = 0;
1179 return 1;
1180 }
1181
1182 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
1183 *number_of_rows = osastate->number_of_rows;
1184
1185 /* Adjust nargs to be the number of direct (or aggregated) args */
1186 if (nargs % 2 != 0)
1187 elog(ERROR, "wrong number of arguments in hypothetical-set function");
1188 nargs /= 2;
1189
1190 hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc);
1191
1192 /* because we need a hypothetical row, we can't share transition state */
1193 Assert(!osastate->sort_done);
1194
1195 /* insert the hypothetical row into the sort */
1196 slot = osastate->qstate->tupslot;
1197 ExecClearTuple(slot);
1198 for (i = 0; i < nargs; i++)
1199 {
1200 slot->tts_values[i] = PG_GETARG_DATUM(i + 1);
1201 slot->tts_isnull[i] = PG_ARGISNULL(i + 1);
1202 }
1203 slot->tts_values[i] = Int32GetDatum(flag);
1204 slot->tts_isnull[i] = false;
1205 ExecStoreVirtualTuple(slot);
1206
1207 tuplesort_puttupleslot(osastate->sortstate, slot);
1208
1209 /* finish the sort */
1210 tuplesort_performsort(osastate->sortstate);
1211 osastate->sort_done = true;
1212
1213 /* iterate till we find the hypothetical row */
1214 while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot, NULL))
1215 {
1216 bool isnull;
1217 Datum d = slot_getattr(slot, nargs + 1, &isnull);
1218
1219 if (!isnull && DatumGetInt32(d) != 0)
1220 break;
1221
1222 rank++;
1223
1224 CHECK_FOR_INTERRUPTS();
1225 }
1226
1227 ExecClearTuple(slot);
1228
1229 return rank;
1230}
1231
1232
1233/*
1234 * rank() - rank of hypothetical row
1235 */
1236Datum
1237hypothetical_rank_final(PG_FUNCTION_ARGS)
1238{
1239 int64 rank;
1240 int64 rowcount;
1241
1242 rank = hypothetical_rank_common(fcinfo, -1, &rowcount);
1243
1244 PG_RETURN_INT64(rank);
1245}
1246
1247/*
1248 * percent_rank() - percentile rank of hypothetical row
1249 */
1250Datum
1251hypothetical_percent_rank_final(PG_FUNCTION_ARGS)
1252{
1253 int64 rank;
1254 int64 rowcount;
1255 double result_val;
1256
1257 rank = hypothetical_rank_common(fcinfo, -1, &rowcount);
1258
1259 if (rowcount == 0)
1260 PG_RETURN_FLOAT8(0);
1261
1262 result_val = (double) (rank - 1) / (double) (rowcount);
1263
1264 PG_RETURN_FLOAT8(result_val);
1265}
1266
1267/*
1268 * cume_dist() - cumulative distribution of hypothetical row
1269 */
1270Datum
1271hypothetical_cume_dist_final(PG_FUNCTION_ARGS)
1272{
1273 int64 rank;
1274 int64 rowcount;
1275 double result_val;
1276
1277 rank = hypothetical_rank_common(fcinfo, 1, &rowcount);
1278
1279 result_val = (double) (rank) / (double) (rowcount + 1);
1280
1281 PG_RETURN_FLOAT8(result_val);
1282}
1283
1284/*
1285 * dense_rank() - rank of hypothetical row without gaps in ranking
1286 */
1287Datum
1288hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
1289{
1290 ExprContext *econtext;
1291 ExprState *compareTuple;
1292 int nargs = PG_NARGS() - 1;
1293 int64 rank = 1;
1294 int64 duplicate_count = 0;
1295 OSAPerGroupState *osastate;
1296 int numDistinctCols;
1297 Datum abbrevVal = (Datum) 0;
1298 Datum abbrevOld = (Datum) 0;
1299 TupleTableSlot *slot;
1300 TupleTableSlot *extraslot;
1301 TupleTableSlot *slot2;
1302 int i;
1303
1304 Assert(AggCheckCallContext(fcinfo, NULL) == AGG_CONTEXT_AGGREGATE);
1305
1306 /* If there were no regular rows, the rank is always 1 */
1307 if (PG_ARGISNULL(0))
1308 PG_RETURN_INT64(rank);
1309
1310 osastate = (OSAPerGroupState *) PG_GETARG_POINTER(0);
1311 econtext = osastate->qstate->econtext;
1312 if (!econtext)
1313 {
1314 MemoryContext oldcontext;
1315
1316 /* Make sure to we create econtext under correct parent context. */
1317 oldcontext = MemoryContextSwitchTo(osastate->qstate->qcontext);
1318 osastate->qstate->econtext = CreateStandaloneExprContext();
1319 econtext = osastate->qstate->econtext;
1320 MemoryContextSwitchTo(oldcontext);
1321 }
1322
1323 /* Adjust nargs to be the number of direct (or aggregated) args */
1324 if (nargs % 2 != 0)
1325 elog(ERROR, "wrong number of arguments in hypothetical-set function");
1326 nargs /= 2;
1327
1328 hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc);
1329
1330 /*
1331 * When comparing tuples, we can omit the flag column since we will only
1332 * compare rows with flag == 0.
1333 */
1334 numDistinctCols = osastate->qstate->numSortCols - 1;
1335
1336 /* Build tuple comparator, if we didn't already */
1337 compareTuple = osastate->qstate->compareTuple;
1338 if (compareTuple == NULL)
1339 {
1340 AttrNumber *sortColIdx = osastate->qstate->sortColIdx;
1341 MemoryContext oldContext;
1342
1343 oldContext = MemoryContextSwitchTo(osastate->qstate->qcontext);
1344 compareTuple = execTuplesMatchPrepare(osastate->qstate->tupdesc,
1345 numDistinctCols,
1346 sortColIdx,
1347 osastate->qstate->eqOperators,
1348 osastate->qstate->sortCollations,
1349 NULL);
1350 MemoryContextSwitchTo(oldContext);
1351 osastate->qstate->compareTuple = compareTuple;
1352 }
1353
1354 /* because we need a hypothetical row, we can't share transition state */
1355 Assert(!osastate->sort_done);
1356
1357 /* insert the hypothetical row into the sort */
1358 slot = osastate->qstate->tupslot;
1359 ExecClearTuple(slot);
1360 for (i = 0; i < nargs; i++)
1361 {
1362 slot->tts_values[i] = PG_GETARG_DATUM(i + 1);
1363 slot->tts_isnull[i] = PG_ARGISNULL(i + 1);
1364 }
1365 slot->tts_values[i] = Int32GetDatum(-1);
1366 slot->tts_isnull[i] = false;
1367 ExecStoreVirtualTuple(slot);
1368
1369 tuplesort_puttupleslot(osastate->sortstate, slot);
1370
1371 /* finish the sort */
1372 tuplesort_performsort(osastate->sortstate);
1373 osastate->sort_done = true;
1374
1375 /*
1376 * We alternate fetching into tupslot and extraslot so that we have the
1377 * previous row available for comparisons. This is accomplished by
1378 * swapping the slot pointer variables after each row.
1379 */
1380 extraslot = MakeSingleTupleTableSlot(osastate->qstate->tupdesc,
1381 &TTSOpsMinimalTuple);
1382 slot2 = extraslot;
1383
1384 /* iterate till we find the hypothetical row */
1385 while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot,
1386 &abbrevVal))
1387 {
1388 bool isnull;
1389 Datum d = slot_getattr(slot, nargs + 1, &isnull);
1390 TupleTableSlot *tmpslot;
1391
1392 if (!isnull && DatumGetInt32(d) != 0)
1393 break;
1394
1395 /* count non-distinct tuples */
1396 econtext->ecxt_outertuple = slot;
1397 econtext->ecxt_innertuple = slot2;
1398
1399 if (!TupIsNull(slot2) &&
1400 abbrevVal == abbrevOld &&
1401 ExecQualAndReset(compareTuple, econtext))
1402 duplicate_count++;
1403
1404 tmpslot = slot2;
1405 slot2 = slot;
1406 slot = tmpslot;
1407 /* avoid ExecQual() calls by reusing abbreviated keys */
1408 abbrevOld = abbrevVal;
1409
1410 rank++;
1411
1412 CHECK_FOR_INTERRUPTS();
1413 }
1414
1415 ExecClearTuple(slot);
1416 ExecClearTuple(slot2);
1417
1418 ExecDropSingleTupleTableSlot(extraslot);
1419
1420 rank = rank - duplicate_count;
1421
1422 PG_RETURN_INT64(rank);
1423}
1424