1/*-------------------------------------------------------------------------
2 *
3 * nodeAgg.c
4 * Routines to handle aggregate nodes.
5 *
6 * ExecAgg normally evaluates each aggregate in the following steps:
7 *
8 * transvalue = initcond
9 * foreach input_tuple do
10 * transvalue = transfunc(transvalue, input_value(s))
11 * result = finalfunc(transvalue, direct_argument(s))
12 *
13 * If a finalfunc is not supplied then the result is just the ending
14 * value of transvalue.
15 *
16 * Other behaviors can be selected by the "aggsplit" mode, which exists
17 * to support partial aggregation. It is possible to:
18 * * Skip running the finalfunc, so that the output is always the
19 * final transvalue state.
20 * * Substitute the combinefunc for the transfunc, so that transvalue
21 * states (propagated up from a child partial-aggregation step) are merged
22 * rather than processing raw input rows. (The statements below about
23 * the transfunc apply equally to the combinefunc, when it's selected.)
24 * * Apply the serializefunc to the output values (this only makes sense
25 * when skipping the finalfunc, since the serializefunc works on the
26 * transvalue data type).
27 * * Apply the deserializefunc to the input values (this only makes sense
28 * when using the combinefunc, for similar reasons).
29 * It is the planner's responsibility to connect up Agg nodes using these
30 * alternate behaviors in a way that makes sense, with partial aggregation
31 * results being fed to nodes that expect them.
32 *
33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 * input tuples and eliminate duplicates (if required) before performing
35 * the above-depicted process. (However, we don't do that for ordered-set
36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 * so far as this module is concerned.) Note that partial aggregation
38 * is not supported in these cases, since we couldn't ensure global
39 * ordering or distinctness of the inputs.
40 *
41 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 * then the first non-NULL input_value is assigned directly to transvalue,
43 * and transfunc isn't applied until the second non-NULL input_value.
44 * The agg's first input type and transtype must be the same in this case!
45 *
46 * If transfunc is marked "strict" then NULL input_values are skipped,
47 * keeping the previous transvalue. If transfunc is not strict then it
48 * is called for every input tuple and must deal with NULL initcond
49 * or NULL input_values for itself.
50 *
51 * If finalfunc is marked "strict" then it is not called when the
52 * ending transvalue is NULL, instead a NULL result is created
53 * automatically (this is just the usual handling of strict functions,
54 * of course). A non-strict finalfunc can make its own choice of
55 * what to return for a NULL ending transvalue.
56 *
57 * Ordered-set aggregates are treated specially in one other way: we
58 * evaluate any "direct" arguments and pass them to the finalfunc along
59 * with the transition value.
60 *
61 * A finalfunc can have additional arguments beyond the transvalue and
62 * any "direct" arguments, corresponding to the input arguments of the
63 * aggregate. These are always just passed as NULL. Such arguments may be
64 * needed to allow resolution of a polymorphic aggregate's result type.
65 *
66 * We compute aggregate input expressions and run the transition functions
67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 * once per input tuple, so when the transvalue datatype is
69 * pass-by-reference, we have to be careful to copy it into a longer-lived
70 * memory context, and free the prior value to avoid memory leakage. We
71 * store transvalues in another set of econtexts, aggstate->aggcontexts
72 * (one per grouping set, see below), which are also used for the hashtable
73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 * reset, at group boundaries so that aggregate transition functions can
75 * register shutdown callbacks via AggRegisterCallback.
76 *
77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 * run finalize functions and compute the output tuple; this context can be
79 * reset once per output tuple.
80 *
81 * The executor's AggState node is passed as the fmgr "context" value in
82 * all transfunc and finalfunc calls. It is not recommended that the
83 * transition functions look at the AggState node directly, but they can
84 * use AggCheckCallContext() to verify that they are being called by
85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 * transition function might want to know this is so that it can avoid
87 * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 * it can instead just scribble on and return its left input. Ordinarily
89 * it is completely forbidden for functions to modify pass-by-ref inputs,
90 * but in the aggregate case we know the left input is either the initial
91 * transition value or a previous function result, and in either case its
92 * value need not be preserved. See int8inc() for an example. Notice that
93 * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94 * the previous transition value pointer is returned. It is also possible
95 * to avoid repeated data copying when the transition value is an expanded
96 * object: to do that, the transition function must take care to return
97 * an expanded object that is in a child context of the memory context
98 * returned by AggCheckCallContext(). Also, some transition functions want
99 * to store working state in addition to the nominal transition value; they
100 * can use the memory context returned by AggCheckCallContext() to do that.
101 *
102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 * AggState is available as context in earlier releases (back to 8.1),
104 * but direct examination of the node is needed to use it before 9.0.
105 *
106 * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 * to get hold of the Aggref expression node for their aggregate call.
108 * This is mainly intended for ordered-set aggregates, which are not
109 * supported as window functions. (A regular aggregate function would
110 * need some fallback logic to use this, since there's no Aggref node
111 * for a window function.)
112 *
113 * Grouping sets:
114 *
115 * A list of grouping sets which is structurally equivalent to a ROLLUP
116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 * ordered data. We do this by keeping a separate set of transition values
118 * for each grouping set being concurrently processed; for each input tuple
119 * we update them all, and on group boundaries we reset those states
120 * (starting at the front of the list) whose grouping values have changed
121 * (the list of grouping sets is ordered from most specific to least
122 * specific).
123 *
124 * Where more complex grouping sets are used, we break them down into
125 * "phases", where each phase has a different sort order (except phase 0
126 * which is reserved for hashing). During each phase but the last, the
127 * input tuples are additionally stored in a tuplesort which is keyed to the
128 * next phase's sort order; during each phase but the first, the input
129 * tuples are drawn from the previously sorted data. (The sorting of the
130 * data for the first phase is handled by the planner, as it might be
131 * satisfied by underlying nodes.)
132 *
133 * Hashing can be mixed with sorted grouping. To do this, we have an
134 * AGG_MIXED strategy that populates the hashtables during the first sorted
135 * phase, and switches to reading them out after completing all sort phases.
136 * We can also support AGG_HASHED with multiple hash tables and no sorting
137 * at all.
138 *
139 * From the perspective of aggregate transition and final functions, the
140 * only issue regarding grouping sets is this: a single call site (flinfo)
141 * of an aggregate function may be used for updating several different
142 * transition values in turn. So the function must not cache in the flinfo
143 * anything which logically belongs as part of the transition value (most
144 * importantly, the memory context in which the transition value exists).
145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 * sensitive to the grouping set for which the aggregate function is
147 * currently being called.
148 *
149 * Plan structure:
150 *
151 * What we get from the planner is actually one "real" Agg node which is
152 * part of the plan tree proper, but which optionally has an additional list
153 * of Agg nodes hung off the side via the "chain" field. This is because an
154 * Agg node happens to be a convenient representation of all the data we
155 * need for grouping sets.
156 *
157 * For many purposes, we treat the "real" node as if it were just the first
158 * node in the chain. The chain must be ordered such that hashed entries
159 * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 * there are both types present (in which case the real node describes one
161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 * chained nodes.
166 *
167 * We collect all hashed nodes into a single "phase", numbered 0, and create
168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 * Phase 0 is allocated even if there are no hashes, but remains unused in
170 * that case.
171 *
172 * AGG_HASHED nodes actually refer to only a single grouping set each,
173 * because for each hashed grouping we need a separate grpColIdx and
174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 * grouping sets that share a sort order. Each AGG_SORTED node other than
176 * the first one has an associated Sort node which describes the sort order
177 * to be used; the first sorted node takes its input from the outer subtree,
178 * which the planner has already arranged to provide ordered data.
179 *
180 * Memory and ExprContext usage:
181 *
182 * Because we're accumulating aggregate values across input rows, we need to
183 * use more memory contexts than just simple input/output tuple contexts.
184 * In fact, for a rollup, we need a separate context for each grouping set
185 * so that we can reset the inner (finer-grained) aggregates on their group
186 * boundaries while continuing to accumulate values for outer
187 * (coarser-grained) groupings. On top of this, we might be simultaneously
188 * populating hashtables; however, we only need one context for all the
189 * hashtables.
190 *
191 * So we create an array, aggcontexts, with an ExprContext for each grouping
192 * set in the largest rollup that we're going to process, and use the
193 * per-tuple memory context of those ExprContexts to store the aggregate
194 * transition values. hashcontext is the single context created to support
195 * all hash tables.
196 *
197 * Transition / Combine function invocation:
198 *
199 * For performance reasons transition functions, including combine
200 * functions, aren't invoked one-by-one from nodeAgg.c after computing
201 * arguments using the expression evaluation engine. Instead
202 * ExecBuildAggTrans() builds one large expression that does both argument
203 * evaluation and transition function invocation. That avoids performance
204 * issues due to repeated uses of expression evaluation, complications due
205 * to filter expressions having to be evaluated early, and allows to JIT
206 * the entire expression into one native function.
207 *
208 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
209 * Portions Copyright (c) 1994, Regents of the University of California
210 *
211 * IDENTIFICATION
212 * src/backend/executor/nodeAgg.c
213 *
214 *-------------------------------------------------------------------------
215 */
216
217#include "postgres.h"
218
219#include "access/htup_details.h"
220#include "catalog/objectaccess.h"
221#include "catalog/pg_aggregate.h"
222#include "catalog/pg_proc.h"
223#include "catalog/pg_type.h"
224#include "executor/executor.h"
225#include "executor/nodeAgg.h"
226#include "miscadmin.h"
227#include "nodes/makefuncs.h"
228#include "nodes/nodeFuncs.h"
229#include "optimizer/optimizer.h"
230#include "parser/parse_agg.h"
231#include "parser/parse_coerce.h"
232#include "utils/acl.h"
233#include "utils/builtins.h"
234#include "utils/lsyscache.h"
235#include "utils/memutils.h"
236#include "utils/syscache.h"
237#include "utils/tuplesort.h"
238#include "utils/datum.h"
239
240
241static void select_current_set(AggState *aggstate, int setno, bool is_hash);
242static void initialize_phase(AggState *aggstate, int newphase);
243static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
244static void initialize_aggregates(AggState *aggstate,
245 AggStatePerGroup *pergroups,
246 int numReset);
247static void advance_transition_function(AggState *aggstate,
248 AggStatePerTrans pertrans,
249 AggStatePerGroup pergroupstate);
250static void advance_aggregates(AggState *aggstate);
251static void process_ordered_aggregate_single(AggState *aggstate,
252 AggStatePerTrans pertrans,
253 AggStatePerGroup pergroupstate);
254static void process_ordered_aggregate_multi(AggState *aggstate,
255 AggStatePerTrans pertrans,
256 AggStatePerGroup pergroupstate);
257static void finalize_aggregate(AggState *aggstate,
258 AggStatePerAgg peragg,
259 AggStatePerGroup pergroupstate,
260 Datum *resultVal, bool *resultIsNull);
261static void finalize_partialaggregate(AggState *aggstate,
262 AggStatePerAgg peragg,
263 AggStatePerGroup pergroupstate,
264 Datum *resultVal, bool *resultIsNull);
265static void prepare_projection_slot(AggState *aggstate,
266 TupleTableSlot *slot,
267 int currentSet);
268static void finalize_aggregates(AggState *aggstate,
269 AggStatePerAgg peragg,
270 AggStatePerGroup pergroup);
271static TupleTableSlot *project_aggregates(AggState *aggstate);
272static Bitmapset *find_unaggregated_cols(AggState *aggstate);
273static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
274static void build_hash_table(AggState *aggstate);
275static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
276static void lookup_hash_entries(AggState *aggstate);
277static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
278static void agg_fill_hash_table(AggState *aggstate);
279static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
280static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
281static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
282 AggState *aggstate, EState *estate,
283 Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
284 Oid aggserialfn, Oid aggdeserialfn,
285 Datum initValue, bool initValueIsNull,
286 Oid *inputTypes, int numArguments);
287static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
288 int lastaggno, List **same_input_transnos);
289static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
290 bool shareable,
291 Oid aggtransfn, Oid aggtranstype,
292 Oid aggserialfn, Oid aggdeserialfn,
293 Datum initValue, bool initValueIsNull,
294 List *transnos);
295
296
297/*
298 * Select the current grouping set; affects current_set and
299 * curaggcontext.
300 */
301static void
302select_current_set(AggState *aggstate, int setno, bool is_hash)
303{
304 /* when changing this, also adapt ExecInterpExpr() and friends */
305 if (is_hash)
306 aggstate->curaggcontext = aggstate->hashcontext;
307 else
308 aggstate->curaggcontext = aggstate->aggcontexts[setno];
309
310 aggstate->current_set = setno;
311}
312
313/*
314 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
315 * current_phase + 1. Juggle the tuplesorts accordingly.
316 *
317 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
318 * case, so when entering phase 0, all we need to do is drop open sorts.
319 */
320static void
321initialize_phase(AggState *aggstate, int newphase)
322{
323 Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
324
325 /*
326 * Whatever the previous state, we're now done with whatever input
327 * tuplesort was in use.
328 */
329 if (aggstate->sort_in)
330 {
331 tuplesort_end(aggstate->sort_in);
332 aggstate->sort_in = NULL;
333 }
334
335 if (newphase <= 1)
336 {
337 /*
338 * Discard any existing output tuplesort.
339 */
340 if (aggstate->sort_out)
341 {
342 tuplesort_end(aggstate->sort_out);
343 aggstate->sort_out = NULL;
344 }
345 }
346 else
347 {
348 /*
349 * The old output tuplesort becomes the new input one, and this is the
350 * right time to actually sort it.
351 */
352 aggstate->sort_in = aggstate->sort_out;
353 aggstate->sort_out = NULL;
354 Assert(aggstate->sort_in);
355 tuplesort_performsort(aggstate->sort_in);
356 }
357
358 /*
359 * If this isn't the last phase, we need to sort appropriately for the
360 * next phase in sequence.
361 */
362 if (newphase > 0 && newphase < aggstate->numphases - 1)
363 {
364 Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
365 PlanState *outerNode = outerPlanState(aggstate);
366 TupleDesc tupDesc = ExecGetResultType(outerNode);
367
368 aggstate->sort_out = tuplesort_begin_heap(tupDesc,
369 sortnode->numCols,
370 sortnode->sortColIdx,
371 sortnode->sortOperators,
372 sortnode->collations,
373 sortnode->nullsFirst,
374 work_mem,
375 NULL, false);
376 }
377
378 aggstate->current_phase = newphase;
379 aggstate->phase = &aggstate->phases[newphase];
380}
381
382/*
383 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
384 * populated by the previous phase. Copy it to the sorter for the next phase
385 * if any.
386 *
387 * Callers cannot rely on memory for tuple in returned slot remaining valid
388 * past any subsequently fetched tuple.
389 */
390static TupleTableSlot *
391fetch_input_tuple(AggState *aggstate)
392{
393 TupleTableSlot *slot;
394
395 if (aggstate->sort_in)
396 {
397 /* make sure we check for interrupts in either path through here */
398 CHECK_FOR_INTERRUPTS();
399 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
400 aggstate->sort_slot, NULL))
401 return NULL;
402 slot = aggstate->sort_slot;
403 }
404 else
405 slot = ExecProcNode(outerPlanState(aggstate));
406
407 if (!TupIsNull(slot) && aggstate->sort_out)
408 tuplesort_puttupleslot(aggstate->sort_out, slot);
409
410 return slot;
411}
412
413/*
414 * (Re)Initialize an individual aggregate.
415 *
416 * This function handles only one grouping set, already set in
417 * aggstate->current_set.
418 *
419 * When called, CurrentMemoryContext should be the per-query context.
420 */
421static void
422initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
423 AggStatePerGroup pergroupstate)
424{
425 /*
426 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
427 */
428 if (pertrans->numSortCols > 0)
429 {
430 /*
431 * In case of rescan, maybe there could be an uncompleted sort
432 * operation? Clean it up if so.
433 */
434 if (pertrans->sortstates[aggstate->current_set])
435 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
436
437
438 /*
439 * We use a plain Datum sorter when there's a single input column;
440 * otherwise sort the full tuple. (See comments for
441 * process_ordered_aggregate_single.)
442 */
443 if (pertrans->numInputs == 1)
444 {
445 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
446
447 pertrans->sortstates[aggstate->current_set] =
448 tuplesort_begin_datum(attr->atttypid,
449 pertrans->sortOperators[0],
450 pertrans->sortCollations[0],
451 pertrans->sortNullsFirst[0],
452 work_mem, NULL, false);
453 }
454 else
455 pertrans->sortstates[aggstate->current_set] =
456 tuplesort_begin_heap(pertrans->sortdesc,
457 pertrans->numSortCols,
458 pertrans->sortColIdx,
459 pertrans->sortOperators,
460 pertrans->sortCollations,
461 pertrans->sortNullsFirst,
462 work_mem, NULL, false);
463 }
464
465 /*
466 * (Re)set transValue to the initial value.
467 *
468 * Note that when the initial value is pass-by-ref, we must copy it (into
469 * the aggcontext) since we will pfree the transValue later.
470 */
471 if (pertrans->initValueIsNull)
472 pergroupstate->transValue = pertrans->initValue;
473 else
474 {
475 MemoryContext oldContext;
476
477 oldContext = MemoryContextSwitchTo(
478 aggstate->curaggcontext->ecxt_per_tuple_memory);
479 pergroupstate->transValue = datumCopy(pertrans->initValue,
480 pertrans->transtypeByVal,
481 pertrans->transtypeLen);
482 MemoryContextSwitchTo(oldContext);
483 }
484 pergroupstate->transValueIsNull = pertrans->initValueIsNull;
485
486 /*
487 * If the initial value for the transition state doesn't exist in the
488 * pg_aggregate table then we will let the first non-NULL value returned
489 * from the outer procNode become the initial value. (This is useful for
490 * aggregates like max() and min().) The noTransValue flag signals that we
491 * still need to do this.
492 */
493 pergroupstate->noTransValue = pertrans->initValueIsNull;
494}
495
496/*
497 * Initialize all aggregate transition states for a new group of input values.
498 *
499 * If there are multiple grouping sets, we initialize only the first numReset
500 * of them (the grouping sets are ordered so that the most specific one, which
501 * is reset most often, is first). As a convenience, if numReset is 0, we
502 * reinitialize all sets.
503 *
504 * NB: This cannot be used for hash aggregates, as for those the grouping set
505 * number has to be specified from further up.
506 *
507 * When called, CurrentMemoryContext should be the per-query context.
508 */
509static void
510initialize_aggregates(AggState *aggstate,
511 AggStatePerGroup *pergroups,
512 int numReset)
513{
514 int transno;
515 int numGroupingSets = Max(aggstate->phase->numsets, 1);
516 int setno = 0;
517 int numTrans = aggstate->numtrans;
518 AggStatePerTrans transstates = aggstate->pertrans;
519
520 if (numReset == 0)
521 numReset = numGroupingSets;
522
523 for (setno = 0; setno < numReset; setno++)
524 {
525 AggStatePerGroup pergroup = pergroups[setno];
526
527 select_current_set(aggstate, setno, false);
528
529 for (transno = 0; transno < numTrans; transno++)
530 {
531 AggStatePerTrans pertrans = &transstates[transno];
532 AggStatePerGroup pergroupstate = &pergroup[transno];
533
534 initialize_aggregate(aggstate, pertrans, pergroupstate);
535 }
536 }
537}
538
539/*
540 * Given new input value(s), advance the transition function of one aggregate
541 * state within one grouping set only (already set in aggstate->current_set)
542 *
543 * The new values (and null flags) have been preloaded into argument positions
544 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
545 * pass to the transition function. We also expect that the static fields of
546 * the fcinfo are already initialized; that was done by ExecInitAgg().
547 *
548 * It doesn't matter which memory context this is called in.
549 */
550static void
551advance_transition_function(AggState *aggstate,
552 AggStatePerTrans pertrans,
553 AggStatePerGroup pergroupstate)
554{
555 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
556 MemoryContext oldContext;
557 Datum newVal;
558
559 if (pertrans->transfn.fn_strict)
560 {
561 /*
562 * For a strict transfn, nothing happens when there's a NULL input; we
563 * just keep the prior transValue.
564 */
565 int numTransInputs = pertrans->numTransInputs;
566 int i;
567
568 for (i = 1; i <= numTransInputs; i++)
569 {
570 if (fcinfo->args[i].isnull)
571 return;
572 }
573 if (pergroupstate->noTransValue)
574 {
575 /*
576 * transValue has not been initialized. This is the first non-NULL
577 * input value. We use it as the initial value for transValue. (We
578 * already checked that the agg's input type is binary-compatible
579 * with its transtype, so straight copy here is OK.)
580 *
581 * We must copy the datum into aggcontext if it is pass-by-ref. We
582 * do not need to pfree the old transValue, since it's NULL.
583 */
584 oldContext = MemoryContextSwitchTo(
585 aggstate->curaggcontext->ecxt_per_tuple_memory);
586 pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
587 pertrans->transtypeByVal,
588 pertrans->transtypeLen);
589 pergroupstate->transValueIsNull = false;
590 pergroupstate->noTransValue = false;
591 MemoryContextSwitchTo(oldContext);
592 return;
593 }
594 if (pergroupstate->transValueIsNull)
595 {
596 /*
597 * Don't call a strict function with NULL inputs. Note it is
598 * possible to get here despite the above tests, if the transfn is
599 * strict *and* returned a NULL on a prior cycle. If that happens
600 * we will propagate the NULL all the way to the end.
601 */
602 return;
603 }
604 }
605
606 /* We run the transition functions in per-input-tuple memory context */
607 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
608
609 /* set up aggstate->curpertrans for AggGetAggref() */
610 aggstate->curpertrans = pertrans;
611
612 /*
613 * OK to call the transition function
614 */
615 fcinfo->args[0].value = pergroupstate->transValue;
616 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
617 fcinfo->isnull = false; /* just in case transfn doesn't set it */
618
619 newVal = FunctionCallInvoke(fcinfo);
620
621 aggstate->curpertrans = NULL;
622
623 /*
624 * If pass-by-ref datatype, must copy the new value into aggcontext and
625 * free the prior transValue. But if transfn returned a pointer to its
626 * first input, we don't need to do anything. Also, if transfn returned a
627 * pointer to a R/W expanded object that is already a child of the
628 * aggcontext, assume we can adopt that value without copying it.
629 */
630 if (!pertrans->transtypeByVal &&
631 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
632 {
633 if (!fcinfo->isnull)
634 {
635 MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
636 if (DatumIsReadWriteExpandedObject(newVal,
637 false,
638 pertrans->transtypeLen) &&
639 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
640 /* do nothing */ ;
641 else
642 newVal = datumCopy(newVal,
643 pertrans->transtypeByVal,
644 pertrans->transtypeLen);
645 }
646 if (!pergroupstate->transValueIsNull)
647 {
648 if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
649 false,
650 pertrans->transtypeLen))
651 DeleteExpandedObject(pergroupstate->transValue);
652 else
653 pfree(DatumGetPointer(pergroupstate->transValue));
654 }
655 }
656
657 pergroupstate->transValue = newVal;
658 pergroupstate->transValueIsNull = fcinfo->isnull;
659
660 MemoryContextSwitchTo(oldContext);
661}
662
663/*
664 * Advance each aggregate transition state for one input tuple. The input
665 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
666 * accessible to ExecEvalExpr.
667 *
668 * We have two sets of transition states to handle: one for sorted aggregation
669 * and one for hashed; we do them both here, to avoid multiple evaluation of
670 * the inputs.
671 *
672 * When called, CurrentMemoryContext should be the per-query context.
673 */
674static void
675advance_aggregates(AggState *aggstate)
676{
677 bool dummynull;
678
679 ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
680 aggstate->tmpcontext,
681 &dummynull);
682}
683
684/*
685 * Run the transition function for a DISTINCT or ORDER BY aggregate
686 * with only one input. This is called after we have completed
687 * entering all the input values into the sort object. We complete the
688 * sort, read out the values in sorted order, and run the transition
689 * function on each value (applying DISTINCT if appropriate).
690 *
691 * Note that the strictness of the transition function was checked when
692 * entering the values into the sort, so we don't check it again here;
693 * we just apply standard SQL DISTINCT logic.
694 *
695 * The one-input case is handled separately from the multi-input case
696 * for performance reasons: for single by-value inputs, such as the
697 * common case of count(distinct id), the tuplesort_getdatum code path
698 * is around 300% faster. (The speedup for by-reference types is less
699 * but still noticeable.)
700 *
701 * This function handles only one grouping set (already set in
702 * aggstate->current_set).
703 *
704 * When called, CurrentMemoryContext should be the per-query context.
705 */
706static void
707process_ordered_aggregate_single(AggState *aggstate,
708 AggStatePerTrans pertrans,
709 AggStatePerGroup pergroupstate)
710{
711 Datum oldVal = (Datum) 0;
712 bool oldIsNull = true;
713 bool haveOldVal = false;
714 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
715 MemoryContext oldContext;
716 bool isDistinct = (pertrans->numDistinctCols > 0);
717 Datum newAbbrevVal = (Datum) 0;
718 Datum oldAbbrevVal = (Datum) 0;
719 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
720 Datum *newVal;
721 bool *isNull;
722
723 Assert(pertrans->numDistinctCols < 2);
724
725 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
726
727 /* Load the column into argument 1 (arg 0 will be transition value) */
728 newVal = &fcinfo->args[1].value;
729 isNull = &fcinfo->args[1].isnull;
730
731 /*
732 * Note: if input type is pass-by-ref, the datums returned by the sort are
733 * freshly palloc'd in the per-query context, so we must be careful to
734 * pfree them when they are no longer needed.
735 */
736
737 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
738 true, newVal, isNull, &newAbbrevVal))
739 {
740 /*
741 * Clear and select the working context for evaluation of the equality
742 * function and transition function.
743 */
744 MemoryContextReset(workcontext);
745 oldContext = MemoryContextSwitchTo(workcontext);
746
747 /*
748 * If DISTINCT mode, and not distinct from prior, skip it.
749 */
750 if (isDistinct &&
751 haveOldVal &&
752 ((oldIsNull && *isNull) ||
753 (!oldIsNull && !*isNull &&
754 oldAbbrevVal == newAbbrevVal &&
755 DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
756 pertrans->aggCollation,
757 oldVal, *newVal)))))
758 {
759 /* equal to prior, so forget this one */
760 if (!pertrans->inputtypeByVal && !*isNull)
761 pfree(DatumGetPointer(*newVal));
762 }
763 else
764 {
765 advance_transition_function(aggstate, pertrans, pergroupstate);
766 /* forget the old value, if any */
767 if (!oldIsNull && !pertrans->inputtypeByVal)
768 pfree(DatumGetPointer(oldVal));
769 /* and remember the new one for subsequent equality checks */
770 oldVal = *newVal;
771 oldAbbrevVal = newAbbrevVal;
772 oldIsNull = *isNull;
773 haveOldVal = true;
774 }
775
776 MemoryContextSwitchTo(oldContext);
777 }
778
779 if (!oldIsNull && !pertrans->inputtypeByVal)
780 pfree(DatumGetPointer(oldVal));
781
782 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
783 pertrans->sortstates[aggstate->current_set] = NULL;
784}
785
786/*
787 * Run the transition function for a DISTINCT or ORDER BY aggregate
788 * with more than one input. This is called after we have completed
789 * entering all the input values into the sort object. We complete the
790 * sort, read out the values in sorted order, and run the transition
791 * function on each value (applying DISTINCT if appropriate).
792 *
793 * This function handles only one grouping set (already set in
794 * aggstate->current_set).
795 *
796 * When called, CurrentMemoryContext should be the per-query context.
797 */
798static void
799process_ordered_aggregate_multi(AggState *aggstate,
800 AggStatePerTrans pertrans,
801 AggStatePerGroup pergroupstate)
802{
803 ExprContext *tmpcontext = aggstate->tmpcontext;
804 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
805 TupleTableSlot *slot1 = pertrans->sortslot;
806 TupleTableSlot *slot2 = pertrans->uniqslot;
807 int numTransInputs = pertrans->numTransInputs;
808 int numDistinctCols = pertrans->numDistinctCols;
809 Datum newAbbrevVal = (Datum) 0;
810 Datum oldAbbrevVal = (Datum) 0;
811 bool haveOldValue = false;
812 TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
813 int i;
814
815 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
816
817 ExecClearTuple(slot1);
818 if (slot2)
819 ExecClearTuple(slot2);
820
821 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
822 true, true, slot1, &newAbbrevVal))
823 {
824 CHECK_FOR_INTERRUPTS();
825
826 tmpcontext->ecxt_outertuple = slot1;
827 tmpcontext->ecxt_innertuple = slot2;
828
829 if (numDistinctCols == 0 ||
830 !haveOldValue ||
831 newAbbrevVal != oldAbbrevVal ||
832 !ExecQual(pertrans->equalfnMulti, tmpcontext))
833 {
834 /*
835 * Extract the first numTransInputs columns as datums to pass to
836 * the transfn.
837 */
838 slot_getsomeattrs(slot1, numTransInputs);
839
840 /* Load values into fcinfo */
841 /* Start from 1, since the 0th arg will be the transition value */
842 for (i = 0; i < numTransInputs; i++)
843 {
844 fcinfo->args[i + 1].value = slot1->tts_values[i];
845 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
846 }
847
848 advance_transition_function(aggstate, pertrans, pergroupstate);
849
850 if (numDistinctCols > 0)
851 {
852 /* swap the slot pointers to retain the current tuple */
853 TupleTableSlot *tmpslot = slot2;
854
855 slot2 = slot1;
856 slot1 = tmpslot;
857 /* avoid ExecQual() calls by reusing abbreviated keys */
858 oldAbbrevVal = newAbbrevVal;
859 haveOldValue = true;
860 }
861 }
862
863 /* Reset context each time */
864 ResetExprContext(tmpcontext);
865
866 ExecClearTuple(slot1);
867 }
868
869 if (slot2)
870 ExecClearTuple(slot2);
871
872 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
873 pertrans->sortstates[aggstate->current_set] = NULL;
874
875 /* restore previous slot, potentially in use for grouping sets */
876 tmpcontext->ecxt_outertuple = save;
877}
878
879/*
880 * Compute the final value of one aggregate.
881 *
882 * This function handles only one grouping set (already set in
883 * aggstate->current_set).
884 *
885 * The finalfunction will be run, and the result delivered, in the
886 * output-tuple context; caller's CurrentMemoryContext does not matter.
887 *
888 * The finalfn uses the state as set in the transno. This also might be
889 * being used by another aggregate function, so it's important that we do
890 * nothing destructive here.
891 */
892static void
893finalize_aggregate(AggState *aggstate,
894 AggStatePerAgg peragg,
895 AggStatePerGroup pergroupstate,
896 Datum *resultVal, bool *resultIsNull)
897{
898 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
899 bool anynull = false;
900 MemoryContext oldContext;
901 int i;
902 ListCell *lc;
903 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
904
905 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
906
907 /*
908 * Evaluate any direct arguments. We do this even if there's no finalfn
909 * (which is unlikely anyway), so that side-effects happen as expected.
910 * The direct arguments go into arg positions 1 and up, leaving position 0
911 * for the transition state value.
912 */
913 i = 1;
914 foreach(lc, peragg->aggdirectargs)
915 {
916 ExprState *expr = (ExprState *) lfirst(lc);
917
918 fcinfo->args[i].value = ExecEvalExpr(expr,
919 aggstate->ss.ps.ps_ExprContext,
920 &fcinfo->args[i].isnull);
921 anynull |= fcinfo->args[i].isnull;
922 i++;
923 }
924
925 /*
926 * Apply the agg's finalfn if one is provided, else return transValue.
927 */
928 if (OidIsValid(peragg->finalfn_oid))
929 {
930 int numFinalArgs = peragg->numFinalArgs;
931
932 /* set up aggstate->curperagg for AggGetAggref() */
933 aggstate->curperagg = peragg;
934
935 InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
936 numFinalArgs,
937 pertrans->aggCollation,
938 (void *) aggstate, NULL);
939
940 /* Fill in the transition state value */
941 fcinfo->args[0].value =
942 MakeExpandedObjectReadOnly(pergroupstate->transValue,
943 pergroupstate->transValueIsNull,
944 pertrans->transtypeLen);
945 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
946 anynull |= pergroupstate->transValueIsNull;
947
948 /* Fill any remaining argument positions with nulls */
949 for (; i < numFinalArgs; i++)
950 {
951 fcinfo->args[i].value = (Datum) 0;
952 fcinfo->args[i].isnull = true;
953 anynull = true;
954 }
955
956 if (fcinfo->flinfo->fn_strict && anynull)
957 {
958 /* don't call a strict function with NULL inputs */
959 *resultVal = (Datum) 0;
960 *resultIsNull = true;
961 }
962 else
963 {
964 *resultVal = FunctionCallInvoke(fcinfo);
965 *resultIsNull = fcinfo->isnull;
966 }
967 aggstate->curperagg = NULL;
968 }
969 else
970 {
971 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
972 *resultVal = pergroupstate->transValue;
973 *resultIsNull = pergroupstate->transValueIsNull;
974 }
975
976 /*
977 * If result is pass-by-ref, make sure it is in the right context.
978 */
979 if (!peragg->resulttypeByVal && !*resultIsNull &&
980 !MemoryContextContains(CurrentMemoryContext,
981 DatumGetPointer(*resultVal)))
982 *resultVal = datumCopy(*resultVal,
983 peragg->resulttypeByVal,
984 peragg->resulttypeLen);
985
986 MemoryContextSwitchTo(oldContext);
987}
988
989/*
990 * Compute the output value of one partial aggregate.
991 *
992 * The serialization function will be run, and the result delivered, in the
993 * output-tuple context; caller's CurrentMemoryContext does not matter.
994 */
995static void
996finalize_partialaggregate(AggState *aggstate,
997 AggStatePerAgg peragg,
998 AggStatePerGroup pergroupstate,
999 Datum *resultVal, bool *resultIsNull)
1000{
1001 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1002 MemoryContext oldContext;
1003
1004 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1005
1006 /*
1007 * serialfn_oid will be set if we must serialize the transvalue before
1008 * returning it
1009 */
1010 if (OidIsValid(pertrans->serialfn_oid))
1011 {
1012 /* Don't call a strict serialization function with NULL input. */
1013 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1014 {
1015 *resultVal = (Datum) 0;
1016 *resultIsNull = true;
1017 }
1018 else
1019 {
1020 FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1021
1022 fcinfo->args[0].value =
1023 MakeExpandedObjectReadOnly(pergroupstate->transValue,
1024 pergroupstate->transValueIsNull,
1025 pertrans->transtypeLen);
1026 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1027
1028 *resultVal = FunctionCallInvoke(fcinfo);
1029 *resultIsNull = fcinfo->isnull;
1030 }
1031 }
1032 else
1033 {
1034 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1035 *resultVal = pergroupstate->transValue;
1036 *resultIsNull = pergroupstate->transValueIsNull;
1037 }
1038
1039 /* If result is pass-by-ref, make sure it is in the right context. */
1040 if (!peragg->resulttypeByVal && !*resultIsNull &&
1041 !MemoryContextContains(CurrentMemoryContext,
1042 DatumGetPointer(*resultVal)))
1043 *resultVal = datumCopy(*resultVal,
1044 peragg->resulttypeByVal,
1045 peragg->resulttypeLen);
1046
1047 MemoryContextSwitchTo(oldContext);
1048}
1049
1050/*
1051 * Prepare to finalize and project based on the specified representative tuple
1052 * slot and grouping set.
1053 *
1054 * In the specified tuple slot, force to null all attributes that should be
1055 * read as null in the context of the current grouping set. Also stash the
1056 * current group bitmap where GroupingExpr can get at it.
1057 *
1058 * This relies on three conditions:
1059 *
1060 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1061 * only reference it in evaluations, which will only access individual
1062 * attributes.
1063 *
1064 * 2) No system columns are going to need to be nulled. (If a system column is
1065 * referenced in a group clause, it is actually projected in the outer plan
1066 * tlist.)
1067 *
1068 * 3) Within a given phase, we never need to recover the value of an attribute
1069 * once it has been set to null.
1070 *
1071 * Poking into the slot this way is a bit ugly, but the consensus is that the
1072 * alternative was worse.
1073 */
1074static void
1075prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1076{
1077 if (aggstate->phase->grouped_cols)
1078 {
1079 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1080
1081 aggstate->grouped_cols = grouped_cols;
1082
1083 if (TTS_EMPTY(slot))
1084 {
1085 /*
1086 * Force all values to be NULL if working on an empty input tuple
1087 * (i.e. an empty grouping set for which no input rows were
1088 * supplied).
1089 */
1090 ExecStoreAllNullTuple(slot);
1091 }
1092 else if (aggstate->all_grouped_cols)
1093 {
1094 ListCell *lc;
1095
1096 /* all_grouped_cols is arranged in desc order */
1097 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1098
1099 foreach(lc, aggstate->all_grouped_cols)
1100 {
1101 int attnum = lfirst_int(lc);
1102
1103 if (!bms_is_member(attnum, grouped_cols))
1104 slot->tts_isnull[attnum - 1] = true;
1105 }
1106 }
1107 }
1108}
1109
1110/*
1111 * Compute the final value of all aggregates for one group.
1112 *
1113 * This function handles only one grouping set at a time, which the caller must
1114 * have selected. It's also the caller's responsibility to adjust the supplied
1115 * pergroup parameter to point to the current set's transvalues.
1116 *
1117 * Results are stored in the output econtext aggvalues/aggnulls.
1118 */
1119static void
1120finalize_aggregates(AggState *aggstate,
1121 AggStatePerAgg peraggs,
1122 AggStatePerGroup pergroup)
1123{
1124 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1125 Datum *aggvalues = econtext->ecxt_aggvalues;
1126 bool *aggnulls = econtext->ecxt_aggnulls;
1127 int aggno;
1128 int transno;
1129
1130 /*
1131 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1132 * inputs and run the transition functions.
1133 */
1134 for (transno = 0; transno < aggstate->numtrans; transno++)
1135 {
1136 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1137 AggStatePerGroup pergroupstate;
1138
1139 pergroupstate = &pergroup[transno];
1140
1141 if (pertrans->numSortCols > 0)
1142 {
1143 Assert(aggstate->aggstrategy != AGG_HASHED &&
1144 aggstate->aggstrategy != AGG_MIXED);
1145
1146 if (pertrans->numInputs == 1)
1147 process_ordered_aggregate_single(aggstate,
1148 pertrans,
1149 pergroupstate);
1150 else
1151 process_ordered_aggregate_multi(aggstate,
1152 pertrans,
1153 pergroupstate);
1154 }
1155 }
1156
1157 /*
1158 * Run the final functions.
1159 */
1160 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1161 {
1162 AggStatePerAgg peragg = &peraggs[aggno];
1163 int transno = peragg->transno;
1164 AggStatePerGroup pergroupstate;
1165
1166 pergroupstate = &pergroup[transno];
1167
1168 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1169 finalize_partialaggregate(aggstate, peragg, pergroupstate,
1170 &aggvalues[aggno], &aggnulls[aggno]);
1171 else
1172 finalize_aggregate(aggstate, peragg, pergroupstate,
1173 &aggvalues[aggno], &aggnulls[aggno]);
1174 }
1175}
1176
1177/*
1178 * Project the result of a group (whose aggs have already been calculated by
1179 * finalize_aggregates). Returns the result slot, or NULL if no row is
1180 * projected (suppressed by qual).
1181 */
1182static TupleTableSlot *
1183project_aggregates(AggState *aggstate)
1184{
1185 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1186
1187 /*
1188 * Check the qual (HAVING clause); if the group does not match, ignore it.
1189 */
1190 if (ExecQual(aggstate->ss.ps.qual, econtext))
1191 {
1192 /*
1193 * Form and return projection tuple using the aggregate results and
1194 * the representative input tuple.
1195 */
1196 return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1197 }
1198 else
1199 InstrCountFiltered1(aggstate, 1);
1200
1201 return NULL;
1202}
1203
1204/*
1205 * find_unaggregated_cols
1206 * Construct a bitmapset of the column numbers of un-aggregated Vars
1207 * appearing in our targetlist and qual (HAVING clause)
1208 */
1209static Bitmapset *
1210find_unaggregated_cols(AggState *aggstate)
1211{
1212 Agg *node = (Agg *) aggstate->ss.ps.plan;
1213 Bitmapset *colnos;
1214
1215 colnos = NULL;
1216 (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1217 &colnos);
1218 (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1219 &colnos);
1220 return colnos;
1221}
1222
1223static bool
1224find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1225{
1226 if (node == NULL)
1227 return false;
1228 if (IsA(node, Var))
1229 {
1230 Var *var = (Var *) node;
1231
1232 /* setrefs.c should have set the varno to OUTER_VAR */
1233 Assert(var->varno == OUTER_VAR);
1234 Assert(var->varlevelsup == 0);
1235 *colnos = bms_add_member(*colnos, var->varattno);
1236 return false;
1237 }
1238 if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1239 {
1240 /* do not descend into aggregate exprs */
1241 return false;
1242 }
1243 return expression_tree_walker(node, find_unaggregated_cols_walker,
1244 (void *) colnos);
1245}
1246
1247/*
1248 * (Re-)initialize the hash table(s) to empty.
1249 *
1250 * To implement hashed aggregation, we need a hashtable that stores a
1251 * representative tuple and an array of AggStatePerGroup structs for each
1252 * distinct set of GROUP BY column values. We compute the hash key from the
1253 * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1254 * for each entry.
1255 *
1256 * We have a separate hashtable and associated perhash data structure for each
1257 * grouping set for which we're doing hashing.
1258 *
1259 * The contents of the hash tables always live in the hashcontext's per-tuple
1260 * memory context (there is only one of these for all tables together, since
1261 * they are all reset at the same time).
1262 */
1263static void
1264build_hash_table(AggState *aggstate)
1265{
1266 MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1267 Size additionalsize;
1268 int i;
1269
1270 Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1271
1272 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1273
1274 for (i = 0; i < aggstate->num_hashes; ++i)
1275 {
1276 AggStatePerHash perhash = &aggstate->perhash[i];
1277
1278 Assert(perhash->aggnode->numGroups > 0);
1279
1280 if (perhash->hashtable)
1281 ResetTupleHashTable(perhash->hashtable);
1282 else
1283 perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1284 perhash->hashslot->tts_tupleDescriptor,
1285 perhash->numCols,
1286 perhash->hashGrpColIdxHash,
1287 perhash->eqfuncoids,
1288 perhash->hashfunctions,
1289 perhash->aggnode->grpCollations,
1290 perhash->aggnode->numGroups,
1291 additionalsize,
1292 aggstate->ss.ps.state->es_query_cxt,
1293 aggstate->hashcontext->ecxt_per_tuple_memory,
1294 tmpmem,
1295 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1296 }
1297}
1298
1299/*
1300 * Compute columns that actually need to be stored in hashtable entries. The
1301 * incoming tuples from the child plan node will contain grouping columns,
1302 * other columns referenced in our targetlist and qual, columns used to
1303 * compute the aggregate functions, and perhaps just junk columns we don't use
1304 * at all. Only columns of the first two types need to be stored in the
1305 * hashtable, and getting rid of the others can make the table entries
1306 * significantly smaller. The hashtable only contains the relevant columns,
1307 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1308 * into the format of the normal input descriptor.
1309 *
1310 * Additional columns, in addition to the columns grouped by, come from two
1311 * sources: Firstly functionally dependent columns that we don't need to group
1312 * by themselves, and secondly ctids for row-marks.
1313 *
1314 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1315 * then build an array of the columns included in the hashtable. We might
1316 * still have duplicates if the passed-in grpColIdx has them, which can happen
1317 * in edge cases from semijoins/distinct; these can't always be removed,
1318 * because it's not certain that the duplicate cols will be using the same
1319 * hash function.
1320 *
1321 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1322 * the per-query context (unlike the hash table itself).
1323 */
1324static void
1325find_hash_columns(AggState *aggstate)
1326{
1327 Bitmapset *base_colnos;
1328 List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1329 int numHashes = aggstate->num_hashes;
1330 EState *estate = aggstate->ss.ps.state;
1331 int j;
1332
1333 /* Find Vars that will be needed in tlist and qual */
1334 base_colnos = find_unaggregated_cols(aggstate);
1335
1336 for (j = 0; j < numHashes; ++j)
1337 {
1338 AggStatePerHash perhash = &aggstate->perhash[j];
1339 Bitmapset *colnos = bms_copy(base_colnos);
1340 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1341 List *hashTlist = NIL;
1342 TupleDesc hashDesc;
1343 int maxCols;
1344 int i;
1345
1346 perhash->largestGrpColIdx = 0;
1347
1348 /*
1349 * If we're doing grouping sets, then some Vars might be referenced in
1350 * tlist/qual for the benefit of other grouping sets, but not needed
1351 * when hashing; i.e. prepare_projection_slot will null them out, so
1352 * there'd be no point storing them. Use prepare_projection_slot's
1353 * logic to determine which.
1354 */
1355 if (aggstate->phases[0].grouped_cols)
1356 {
1357 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1358 ListCell *lc;
1359
1360 foreach(lc, aggstate->all_grouped_cols)
1361 {
1362 int attnum = lfirst_int(lc);
1363
1364 if (!bms_is_member(attnum, grouped_cols))
1365 colnos = bms_del_member(colnos, attnum);
1366 }
1367 }
1368
1369 /*
1370 * Compute maximum number of input columns accounting for possible
1371 * duplications in the grpColIdx array, which can happen in some edge
1372 * cases where HashAggregate was generated as part of a semijoin or a
1373 * DISTINCT.
1374 */
1375 maxCols = bms_num_members(colnos) + perhash->numCols;
1376
1377 perhash->hashGrpColIdxInput =
1378 palloc(maxCols * sizeof(AttrNumber));
1379 perhash->hashGrpColIdxHash =
1380 palloc(perhash->numCols * sizeof(AttrNumber));
1381
1382 /* Add all the grouping columns to colnos */
1383 for (i = 0; i < perhash->numCols; i++)
1384 colnos = bms_add_member(colnos, grpColIdx[i]);
1385
1386 /*
1387 * First build mapping for columns directly hashed. These are the
1388 * first, because they'll be accessed when computing hash values and
1389 * comparing tuples for exact matches. We also build simple mapping
1390 * for execGrouping, so it knows where to find the to-be-hashed /
1391 * compared columns in the input.
1392 */
1393 for (i = 0; i < perhash->numCols; i++)
1394 {
1395 perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1396 perhash->hashGrpColIdxHash[i] = i + 1;
1397 perhash->numhashGrpCols++;
1398 /* delete already mapped columns */
1399 bms_del_member(colnos, grpColIdx[i]);
1400 }
1401
1402 /* and add the remaining columns */
1403 while ((i = bms_first_member(colnos)) >= 0)
1404 {
1405 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1406 perhash->numhashGrpCols++;
1407 }
1408
1409 /* and build a tuple descriptor for the hashtable */
1410 for (i = 0; i < perhash->numhashGrpCols; i++)
1411 {
1412 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1413
1414 hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1415 perhash->largestGrpColIdx =
1416 Max(varNumber + 1, perhash->largestGrpColIdx);
1417 }
1418
1419 hashDesc = ExecTypeFromTL(hashTlist);
1420
1421 execTuplesHashPrepare(perhash->numCols,
1422 perhash->aggnode->grpOperators,
1423 &perhash->eqfuncoids,
1424 &perhash->hashfunctions);
1425 perhash->hashslot =
1426 ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1427 &TTSOpsMinimalTuple);
1428
1429 list_free(hashTlist);
1430 bms_free(colnos);
1431 }
1432
1433 bms_free(base_colnos);
1434}
1435
1436/*
1437 * Estimate per-hash-table-entry overhead for the planner.
1438 *
1439 * Note that the estimate does not include space for pass-by-reference
1440 * transition data values, nor for the representative tuple of each group.
1441 * Nor does this account of the target fill-factor and growth policy of the
1442 * hash table.
1443 */
1444Size
1445hash_agg_entry_size(int numAggs)
1446{
1447 Size entrysize;
1448
1449 /* This must match build_hash_table */
1450 entrysize = sizeof(TupleHashEntryData) +
1451 numAggs * sizeof(AggStatePerGroupData);
1452 entrysize = MAXALIGN(entrysize);
1453
1454 return entrysize;
1455}
1456
1457/*
1458 * Find or create a hashtable entry for the tuple group containing the current
1459 * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1460 * set (which the caller must have selected - note that initialize_aggregate
1461 * depends on this).
1462 *
1463 * When called, CurrentMemoryContext should be the per-query context.
1464 */
1465static TupleHashEntryData *
1466lookup_hash_entry(AggState *aggstate)
1467{
1468 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1469 AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1470 TupleTableSlot *hashslot = perhash->hashslot;
1471 TupleHashEntryData *entry;
1472 bool isnew;
1473 int i;
1474
1475 /* transfer just the needed columns into hashslot */
1476 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1477 ExecClearTuple(hashslot);
1478
1479 for (i = 0; i < perhash->numhashGrpCols; i++)
1480 {
1481 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1482
1483 hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1484 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1485 }
1486 ExecStoreVirtualTuple(hashslot);
1487
1488 /* find or create the hashtable entry using the filtered tuple */
1489 entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
1490
1491 if (isnew)
1492 {
1493 AggStatePerGroup pergroup;
1494 int transno;
1495
1496 pergroup = (AggStatePerGroup)
1497 MemoryContextAlloc(perhash->hashtable->tablecxt,
1498 sizeof(AggStatePerGroupData) * aggstate->numtrans);
1499 entry->additional = pergroup;
1500
1501 /*
1502 * Initialize aggregates for new tuple group, lookup_hash_entries()
1503 * already has selected the relevant grouping set.
1504 */
1505 for (transno = 0; transno < aggstate->numtrans; transno++)
1506 {
1507 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1508 AggStatePerGroup pergroupstate = &pergroup[transno];
1509
1510 initialize_aggregate(aggstate, pertrans, pergroupstate);
1511 }
1512 }
1513
1514 return entry;
1515}
1516
1517/*
1518 * Look up hash entries for the current tuple in all hashed grouping sets,
1519 * returning an array of pergroup pointers suitable for advance_aggregates.
1520 *
1521 * Be aware that lookup_hash_entry can reset the tmpcontext.
1522 */
1523static void
1524lookup_hash_entries(AggState *aggstate)
1525{
1526 int numHashes = aggstate->num_hashes;
1527 AggStatePerGroup *pergroup = aggstate->hash_pergroup;
1528 int setno;
1529
1530 for (setno = 0; setno < numHashes; setno++)
1531 {
1532 select_current_set(aggstate, setno, true);
1533 pergroup[setno] = lookup_hash_entry(aggstate)->additional;
1534 }
1535}
1536
1537/*
1538 * ExecAgg -
1539 *
1540 * ExecAgg receives tuples from its outer subplan and aggregates over
1541 * the appropriate attribute for each aggregate function use (Aggref
1542 * node) appearing in the targetlist or qual of the node. The number
1543 * of tuples to aggregate over depends on whether grouped or plain
1544 * aggregation is selected. In grouped aggregation, we produce a result
1545 * row for each group; in plain aggregation there's a single result row
1546 * for the whole query. In either case, the value of each aggregate is
1547 * stored in the expression context to be used when ExecProject evaluates
1548 * the result tuple.
1549 */
1550static TupleTableSlot *
1551ExecAgg(PlanState *pstate)
1552{
1553 AggState *node = castNode(AggState, pstate);
1554 TupleTableSlot *result = NULL;
1555
1556 CHECK_FOR_INTERRUPTS();
1557
1558 if (!node->agg_done)
1559 {
1560 /* Dispatch based on strategy */
1561 switch (node->phase->aggstrategy)
1562 {
1563 case AGG_HASHED:
1564 if (!node->table_filled)
1565 agg_fill_hash_table(node);
1566 /* FALLTHROUGH */
1567 case AGG_MIXED:
1568 result = agg_retrieve_hash_table(node);
1569 break;
1570 case AGG_PLAIN:
1571 case AGG_SORTED:
1572 result = agg_retrieve_direct(node);
1573 break;
1574 }
1575
1576 if (!TupIsNull(result))
1577 return result;
1578 }
1579
1580 return NULL;
1581}
1582
1583/*
1584 * ExecAgg for non-hashed case
1585 */
1586static TupleTableSlot *
1587agg_retrieve_direct(AggState *aggstate)
1588{
1589 Agg *node = aggstate->phase->aggnode;
1590 ExprContext *econtext;
1591 ExprContext *tmpcontext;
1592 AggStatePerAgg peragg;
1593 AggStatePerGroup *pergroups;
1594 TupleTableSlot *outerslot;
1595 TupleTableSlot *firstSlot;
1596 TupleTableSlot *result;
1597 bool hasGroupingSets = aggstate->phase->numsets > 0;
1598 int numGroupingSets = Max(aggstate->phase->numsets, 1);
1599 int currentSet;
1600 int nextSetSize;
1601 int numReset;
1602 int i;
1603
1604 /*
1605 * get state info from node
1606 *
1607 * econtext is the per-output-tuple expression context
1608 *
1609 * tmpcontext is the per-input-tuple expression context
1610 */
1611 econtext = aggstate->ss.ps.ps_ExprContext;
1612 tmpcontext = aggstate->tmpcontext;
1613
1614 peragg = aggstate->peragg;
1615 pergroups = aggstate->pergroups;
1616 firstSlot = aggstate->ss.ss_ScanTupleSlot;
1617
1618 /*
1619 * We loop retrieving groups until we find one matching
1620 * aggstate->ss.ps.qual
1621 *
1622 * For grouping sets, we have the invariant that aggstate->projected_set
1623 * is either -1 (initial call) or the index (starting from 0) in
1624 * gset_lengths for the group we just completed (either by projecting a
1625 * row or by discarding it in the qual).
1626 */
1627 while (!aggstate->agg_done)
1628 {
1629 /*
1630 * Clear the per-output-tuple context for each group, as well as
1631 * aggcontext (which contains any pass-by-ref transvalues of the old
1632 * group). Some aggregate functions store working state in child
1633 * contexts; those now get reset automatically without us needing to
1634 * do anything special.
1635 *
1636 * We use ReScanExprContext not just ResetExprContext because we want
1637 * any registered shutdown callbacks to be called. That allows
1638 * aggregate functions to ensure they've cleaned up any non-memory
1639 * resources.
1640 */
1641 ReScanExprContext(econtext);
1642
1643 /*
1644 * Determine how many grouping sets need to be reset at this boundary.
1645 */
1646 if (aggstate->projected_set >= 0 &&
1647 aggstate->projected_set < numGroupingSets)
1648 numReset = aggstate->projected_set + 1;
1649 else
1650 numReset = numGroupingSets;
1651
1652 /*
1653 * numReset can change on a phase boundary, but that's OK; we want to
1654 * reset the contexts used in _this_ phase, and later, after possibly
1655 * changing phase, initialize the right number of aggregates for the
1656 * _new_ phase.
1657 */
1658
1659 for (i = 0; i < numReset; i++)
1660 {
1661 ReScanExprContext(aggstate->aggcontexts[i]);
1662 }
1663
1664 /*
1665 * Check if input is complete and there are no more groups to project
1666 * in this phase; move to next phase or mark as done.
1667 */
1668 if (aggstate->input_done == true &&
1669 aggstate->projected_set >= (numGroupingSets - 1))
1670 {
1671 if (aggstate->current_phase < aggstate->numphases - 1)
1672 {
1673 initialize_phase(aggstate, aggstate->current_phase + 1);
1674 aggstate->input_done = false;
1675 aggstate->projected_set = -1;
1676 numGroupingSets = Max(aggstate->phase->numsets, 1);
1677 node = aggstate->phase->aggnode;
1678 numReset = numGroupingSets;
1679 }
1680 else if (aggstate->aggstrategy == AGG_MIXED)
1681 {
1682 /*
1683 * Mixed mode; we've output all the grouped stuff and have
1684 * full hashtables, so switch to outputting those.
1685 */
1686 initialize_phase(aggstate, 0);
1687 aggstate->table_filled = true;
1688 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1689 &aggstate->perhash[0].hashiter);
1690 select_current_set(aggstate, 0, true);
1691 return agg_retrieve_hash_table(aggstate);
1692 }
1693 else
1694 {
1695 aggstate->agg_done = true;
1696 break;
1697 }
1698 }
1699
1700 /*
1701 * Get the number of columns in the next grouping set after the last
1702 * projected one (if any). This is the number of columns to compare to
1703 * see if we reached the boundary of that set too.
1704 */
1705 if (aggstate->projected_set >= 0 &&
1706 aggstate->projected_set < (numGroupingSets - 1))
1707 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
1708 else
1709 nextSetSize = 0;
1710
1711 /*----------
1712 * If a subgroup for the current grouping set is present, project it.
1713 *
1714 * We have a new group if:
1715 * - we're out of input but haven't projected all grouping sets
1716 * (checked above)
1717 * OR
1718 * - we already projected a row that wasn't from the last grouping
1719 * set
1720 * AND
1721 * - the next grouping set has at least one grouping column (since
1722 * empty grouping sets project only once input is exhausted)
1723 * AND
1724 * - the previous and pending rows differ on the grouping columns
1725 * of the next grouping set
1726 *----------
1727 */
1728 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
1729 if (aggstate->input_done ||
1730 (node->aggstrategy != AGG_PLAIN &&
1731 aggstate->projected_set != -1 &&
1732 aggstate->projected_set < (numGroupingSets - 1) &&
1733 nextSetSize > 0 &&
1734 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
1735 tmpcontext)))
1736 {
1737 aggstate->projected_set += 1;
1738
1739 Assert(aggstate->projected_set < numGroupingSets);
1740 Assert(nextSetSize > 0 || aggstate->input_done);
1741 }
1742 else
1743 {
1744 /*
1745 * We no longer care what group we just projected, the next
1746 * projection will always be the first (or only) grouping set
1747 * (unless the input proves to be empty).
1748 */
1749 aggstate->projected_set = 0;
1750
1751 /*
1752 * If we don't already have the first tuple of the new group,
1753 * fetch it from the outer plan.
1754 */
1755 if (aggstate->grp_firstTuple == NULL)
1756 {
1757 outerslot = fetch_input_tuple(aggstate);
1758 if (!TupIsNull(outerslot))
1759 {
1760 /*
1761 * Make a copy of the first input tuple; we will use this
1762 * for comparisons (in group mode) and for projection.
1763 */
1764 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
1765 }
1766 else
1767 {
1768 /* outer plan produced no tuples at all */
1769 if (hasGroupingSets)
1770 {
1771 /*
1772 * If there was no input at all, we need to project
1773 * rows only if there are grouping sets of size 0.
1774 * Note that this implies that there can't be any
1775 * references to ungrouped Vars, which would otherwise
1776 * cause issues with the empty output slot.
1777 *
1778 * XXX: This is no longer true, we currently deal with
1779 * this in finalize_aggregates().
1780 */
1781 aggstate->input_done = true;
1782
1783 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
1784 {
1785 aggstate->projected_set += 1;
1786 if (aggstate->projected_set >= numGroupingSets)
1787 {
1788 /*
1789 * We can't set agg_done here because we might
1790 * have more phases to do, even though the
1791 * input is empty. So we need to restart the
1792 * whole outer loop.
1793 */
1794 break;
1795 }
1796 }
1797
1798 if (aggstate->projected_set >= numGroupingSets)
1799 continue;
1800 }
1801 else
1802 {
1803 aggstate->agg_done = true;
1804 /* If we are grouping, we should produce no tuples too */
1805 if (node->aggstrategy != AGG_PLAIN)
1806 return NULL;
1807 }
1808 }
1809 }
1810
1811 /*
1812 * Initialize working state for a new input tuple group.
1813 */
1814 initialize_aggregates(aggstate, pergroups, numReset);
1815
1816 if (aggstate->grp_firstTuple != NULL)
1817 {
1818 /*
1819 * Store the copied first input tuple in the tuple table slot
1820 * reserved for it. The tuple will be deleted when it is
1821 * cleared from the slot.
1822 */
1823 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
1824 firstSlot, true);
1825 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
1826
1827 /* set up for first advance_aggregates call */
1828 tmpcontext->ecxt_outertuple = firstSlot;
1829
1830 /*
1831 * Process each outer-plan tuple, and then fetch the next one,
1832 * until we exhaust the outer plan or cross a group boundary.
1833 */
1834 for (;;)
1835 {
1836 /*
1837 * During phase 1 only of a mixed agg, we need to update
1838 * hashtables as well in advance_aggregates.
1839 */
1840 if (aggstate->aggstrategy == AGG_MIXED &&
1841 aggstate->current_phase == 1)
1842 {
1843 lookup_hash_entries(aggstate);
1844 }
1845
1846 /* Advance the aggregates (or combine functions) */
1847 advance_aggregates(aggstate);
1848
1849 /* Reset per-input-tuple context after each tuple */
1850 ResetExprContext(tmpcontext);
1851
1852 outerslot = fetch_input_tuple(aggstate);
1853 if (TupIsNull(outerslot))
1854 {
1855 /* no more outer-plan tuples available */
1856 if (hasGroupingSets)
1857 {
1858 aggstate->input_done = true;
1859 break;
1860 }
1861 else
1862 {
1863 aggstate->agg_done = true;
1864 break;
1865 }
1866 }
1867 /* set up for next advance_aggregates call */
1868 tmpcontext->ecxt_outertuple = outerslot;
1869
1870 /*
1871 * If we are grouping, check whether we've crossed a group
1872 * boundary.
1873 */
1874 if (node->aggstrategy != AGG_PLAIN)
1875 {
1876 tmpcontext->ecxt_innertuple = firstSlot;
1877 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
1878 tmpcontext))
1879 {
1880 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
1881 break;
1882 }
1883 }
1884 }
1885 }
1886
1887 /*
1888 * Use the representative input tuple for any references to
1889 * non-aggregated input columns in aggregate direct args, the node
1890 * qual, and the tlist. (If we are not grouping, and there are no
1891 * input rows at all, we will come here with an empty firstSlot
1892 * ... but if not grouping, there can't be any references to
1893 * non-aggregated input columns, so no problem.)
1894 */
1895 econtext->ecxt_outertuple = firstSlot;
1896 }
1897
1898 Assert(aggstate->projected_set >= 0);
1899
1900 currentSet = aggstate->projected_set;
1901
1902 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
1903
1904 select_current_set(aggstate, currentSet, false);
1905
1906 finalize_aggregates(aggstate,
1907 peragg,
1908 pergroups[currentSet]);
1909
1910 /*
1911 * If there's no row to project right now, we must continue rather
1912 * than returning a null since there might be more groups.
1913 */
1914 result = project_aggregates(aggstate);
1915 if (result)
1916 return result;
1917 }
1918
1919 /* No more groups */
1920 return NULL;
1921}
1922
1923/*
1924 * ExecAgg for hashed case: read input and build hash table
1925 */
1926static void
1927agg_fill_hash_table(AggState *aggstate)
1928{
1929 TupleTableSlot *outerslot;
1930 ExprContext *tmpcontext = aggstate->tmpcontext;
1931
1932 /*
1933 * Process each outer-plan tuple, and then fetch the next one, until we
1934 * exhaust the outer plan.
1935 */
1936 for (;;)
1937 {
1938 outerslot = fetch_input_tuple(aggstate);
1939 if (TupIsNull(outerslot))
1940 break;
1941
1942 /* set up for lookup_hash_entries and advance_aggregates */
1943 tmpcontext->ecxt_outertuple = outerslot;
1944
1945 /* Find or build hashtable entries */
1946 lookup_hash_entries(aggstate);
1947
1948 /* Advance the aggregates (or combine functions) */
1949 advance_aggregates(aggstate);
1950
1951 /*
1952 * Reset per-input-tuple context after each tuple, but note that the
1953 * hash lookups do this too
1954 */
1955 ResetExprContext(aggstate->tmpcontext);
1956 }
1957
1958 aggstate->table_filled = true;
1959 /* Initialize to walk the first hash table */
1960 select_current_set(aggstate, 0, true);
1961 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1962 &aggstate->perhash[0].hashiter);
1963}
1964
1965/*
1966 * ExecAgg for hashed case: retrieving groups from hash table
1967 */
1968static TupleTableSlot *
1969agg_retrieve_hash_table(AggState *aggstate)
1970{
1971 ExprContext *econtext;
1972 AggStatePerAgg peragg;
1973 AggStatePerGroup pergroup;
1974 TupleHashEntryData *entry;
1975 TupleTableSlot *firstSlot;
1976 TupleTableSlot *result;
1977 AggStatePerHash perhash;
1978
1979 /*
1980 * get state info from node.
1981 *
1982 * econtext is the per-output-tuple expression context.
1983 */
1984 econtext = aggstate->ss.ps.ps_ExprContext;
1985 peragg = aggstate->peragg;
1986 firstSlot = aggstate->ss.ss_ScanTupleSlot;
1987
1988 /*
1989 * Note that perhash (and therefore anything accessed through it) can
1990 * change inside the loop, as we change between grouping sets.
1991 */
1992 perhash = &aggstate->perhash[aggstate->current_set];
1993
1994 /*
1995 * We loop retrieving groups until we find one satisfying
1996 * aggstate->ss.ps.qual
1997 */
1998 while (!aggstate->agg_done)
1999 {
2000 TupleTableSlot *hashslot = perhash->hashslot;
2001 int i;
2002
2003 CHECK_FOR_INTERRUPTS();
2004
2005 /*
2006 * Find the next entry in the hash table
2007 */
2008 entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2009 if (entry == NULL)
2010 {
2011 int nextset = aggstate->current_set + 1;
2012
2013 if (nextset < aggstate->num_hashes)
2014 {
2015 /*
2016 * Switch to next grouping set, reinitialize, and restart the
2017 * loop.
2018 */
2019 select_current_set(aggstate, nextset, true);
2020
2021 perhash = &aggstate->perhash[aggstate->current_set];
2022
2023 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2024
2025 continue;
2026 }
2027 else
2028 {
2029 /* No more hashtables, so done */
2030 aggstate->agg_done = true;
2031 return NULL;
2032 }
2033 }
2034
2035 /*
2036 * Clear the per-output-tuple context for each group
2037 *
2038 * We intentionally don't use ReScanExprContext here; if any aggs have
2039 * registered shutdown callbacks, they mustn't be called yet, since we
2040 * might not be done with that agg.
2041 */
2042 ResetExprContext(econtext);
2043
2044 /*
2045 * Transform representative tuple back into one with the right
2046 * columns.
2047 */
2048 ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2049 slot_getallattrs(hashslot);
2050
2051 ExecClearTuple(firstSlot);
2052 memset(firstSlot->tts_isnull, true,
2053 firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2054
2055 for (i = 0; i < perhash->numhashGrpCols; i++)
2056 {
2057 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2058
2059 firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2060 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2061 }
2062 ExecStoreVirtualTuple(firstSlot);
2063
2064 pergroup = (AggStatePerGroup) entry->additional;
2065
2066 /*
2067 * Use the representative input tuple for any references to
2068 * non-aggregated input columns in the qual and tlist.
2069 */
2070 econtext->ecxt_outertuple = firstSlot;
2071
2072 prepare_projection_slot(aggstate,
2073 econtext->ecxt_outertuple,
2074 aggstate->current_set);
2075
2076 finalize_aggregates(aggstate, peragg, pergroup);
2077
2078 result = project_aggregates(aggstate);
2079 if (result)
2080 return result;
2081 }
2082
2083 /* No more groups */
2084 return NULL;
2085}
2086
2087/* -----------------
2088 * ExecInitAgg
2089 *
2090 * Creates the run-time information for the agg node produced by the
2091 * planner and initializes its outer subtree.
2092 *
2093 * -----------------
2094 */
2095AggState *
2096ExecInitAgg(Agg *node, EState *estate, int eflags)
2097{
2098 AggState *aggstate;
2099 AggStatePerAgg peraggs;
2100 AggStatePerTrans pertransstates;
2101 AggStatePerGroup *pergroups;
2102 Plan *outerPlan;
2103 ExprContext *econtext;
2104 TupleDesc scanDesc;
2105 int numaggs,
2106 transno,
2107 aggno;
2108 int phase;
2109 int phaseidx;
2110 ListCell *l;
2111 Bitmapset *all_grouped_cols = NULL;
2112 int numGroupingSets = 1;
2113 int numPhases;
2114 int numHashes;
2115 int i = 0;
2116 int j = 0;
2117 bool use_hashing = (node->aggstrategy == AGG_HASHED ||
2118 node->aggstrategy == AGG_MIXED);
2119
2120 /* check for unsupported flags */
2121 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2122
2123 /*
2124 * create state structure
2125 */
2126 aggstate = makeNode(AggState);
2127 aggstate->ss.ps.plan = (Plan *) node;
2128 aggstate->ss.ps.state = estate;
2129 aggstate->ss.ps.ExecProcNode = ExecAgg;
2130
2131 aggstate->aggs = NIL;
2132 aggstate->numaggs = 0;
2133 aggstate->numtrans = 0;
2134 aggstate->aggstrategy = node->aggstrategy;
2135 aggstate->aggsplit = node->aggsplit;
2136 aggstate->maxsets = 0;
2137 aggstate->projected_set = -1;
2138 aggstate->current_set = 0;
2139 aggstate->peragg = NULL;
2140 aggstate->pertrans = NULL;
2141 aggstate->curperagg = NULL;
2142 aggstate->curpertrans = NULL;
2143 aggstate->input_done = false;
2144 aggstate->agg_done = false;
2145 aggstate->pergroups = NULL;
2146 aggstate->grp_firstTuple = NULL;
2147 aggstate->sort_in = NULL;
2148 aggstate->sort_out = NULL;
2149
2150 /*
2151 * phases[0] always exists, but is dummy in sorted/plain mode
2152 */
2153 numPhases = (use_hashing ? 1 : 2);
2154 numHashes = (use_hashing ? 1 : 0);
2155
2156 /*
2157 * Calculate the maximum number of grouping sets in any phase; this
2158 * determines the size of some allocations. Also calculate the number of
2159 * phases, since all hashed/mixed nodes contribute to only a single phase.
2160 */
2161 if (node->groupingSets)
2162 {
2163 numGroupingSets = list_length(node->groupingSets);
2164
2165 foreach(l, node->chain)
2166 {
2167 Agg *agg = lfirst(l);
2168
2169 numGroupingSets = Max(numGroupingSets,
2170 list_length(agg->groupingSets));
2171
2172 /*
2173 * additional AGG_HASHED aggs become part of phase 0, but all
2174 * others add an extra phase.
2175 */
2176 if (agg->aggstrategy != AGG_HASHED)
2177 ++numPhases;
2178 else
2179 ++numHashes;
2180 }
2181 }
2182
2183 aggstate->maxsets = numGroupingSets;
2184 aggstate->numphases = numPhases;
2185
2186 aggstate->aggcontexts = (ExprContext **)
2187 palloc0(sizeof(ExprContext *) * numGroupingSets);
2188
2189 /*
2190 * Create expression contexts. We need three or more, one for
2191 * per-input-tuple processing, one for per-output-tuple processing, one
2192 * for all the hashtables, and one for each grouping set. The per-tuple
2193 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2194 * replaces the standalone memory context formerly used to hold transition
2195 * values. We cheat a little by using ExecAssignExprContext() to build
2196 * all of them.
2197 *
2198 * NOTE: the details of what is stored in aggcontexts and what is stored
2199 * in the regular per-query memory context are driven by a simple
2200 * decision: we want to reset the aggcontext at group boundaries (if not
2201 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2202 */
2203 ExecAssignExprContext(estate, &aggstate->ss.ps);
2204 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2205
2206 for (i = 0; i < numGroupingSets; ++i)
2207 {
2208 ExecAssignExprContext(estate, &aggstate->ss.ps);
2209 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2210 }
2211
2212 if (use_hashing)
2213 {
2214 ExecAssignExprContext(estate, &aggstate->ss.ps);
2215 aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2216 }
2217
2218 ExecAssignExprContext(estate, &aggstate->ss.ps);
2219
2220 /*
2221 * Initialize child nodes.
2222 *
2223 * If we are doing a hashed aggregation then the child plan does not need
2224 * to handle REWIND efficiently; see ExecReScanAgg.
2225 */
2226 if (node->aggstrategy == AGG_HASHED)
2227 eflags &= ~EXEC_FLAG_REWIND;
2228 outerPlan = outerPlan(node);
2229 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2230
2231 /*
2232 * initialize source tuple type.
2233 */
2234 aggstate->ss.ps.outerops =
2235 ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
2236 &aggstate->ss.ps.outeropsfixed);
2237 aggstate->ss.ps.outeropsset = true;
2238
2239 ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
2240 aggstate->ss.ps.outerops);
2241 scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2242
2243 /*
2244 * If there are more than two phases (including a potential dummy phase
2245 * 0), input will be resorted using tuplesort. Need a slot for that.
2246 */
2247 if (numPhases > 2)
2248 {
2249 aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2250 &TTSOpsMinimalTuple);
2251
2252 /*
2253 * The output of the tuplesort, and the output from the outer child
2254 * might not use the same type of slot. In most cases the child will
2255 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
2256 * input can also be be presorted due an index, in which case it could
2257 * be a different type of slot.
2258 *
2259 * XXX: For efficiency it would be good to instead/additionally
2260 * generate expressions with corresponding settings of outerops* for
2261 * the individual phases - deforming is often a bottleneck for
2262 * aggregations with lots of rows per group. If there's multiple
2263 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
2264 * the nodeAgg.c internal tuplesort).
2265 */
2266 if (aggstate->ss.ps.outeropsfixed &&
2267 aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
2268 aggstate->ss.ps.outeropsfixed = false;
2269 }
2270
2271 /*
2272 * Initialize result type, slot and projection.
2273 */
2274 ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
2275 ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2276
2277 /*
2278 * initialize child expressions
2279 *
2280 * We expect the parser to have checked that no aggs contain other agg
2281 * calls in their arguments (and just to be sure, we verify it again while
2282 * initializing the plan node). This would make no sense under SQL
2283 * semantics, and it's forbidden by the spec. Because it is true, we
2284 * don't need to worry about evaluating the aggs in any particular order.
2285 *
2286 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2287 * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
2288 * in the targetlist are found during ExecAssignProjectionInfo, below.
2289 */
2290 aggstate->ss.ps.qual =
2291 ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2292
2293 /*
2294 * We should now have found all Aggrefs in the targetlist and quals.
2295 */
2296 numaggs = aggstate->numaggs;
2297 Assert(numaggs == list_length(aggstate->aggs));
2298
2299 /*
2300 * For each phase, prepare grouping set data and fmgr lookup data for
2301 * compare functions. Accumulate all_grouped_cols in passing.
2302 */
2303 aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2304
2305 aggstate->num_hashes = numHashes;
2306 if (numHashes)
2307 {
2308 aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2309 aggstate->phases[0].numsets = 0;
2310 aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2311 aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2312 }
2313
2314 phase = 0;
2315 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2316 {
2317 Agg *aggnode;
2318 Sort *sortnode;
2319
2320 if (phaseidx > 0)
2321 {
2322 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2323 sortnode = castNode(Sort, aggnode->plan.lefttree);
2324 }
2325 else
2326 {
2327 aggnode = node;
2328 sortnode = NULL;
2329 }
2330
2331 Assert(phase <= 1 || sortnode);
2332
2333 if (aggnode->aggstrategy == AGG_HASHED
2334 || aggnode->aggstrategy == AGG_MIXED)
2335 {
2336 AggStatePerPhase phasedata = &aggstate->phases[0];
2337 AggStatePerHash perhash;
2338 Bitmapset *cols = NULL;
2339
2340 Assert(phase == 0);
2341 i = phasedata->numsets++;
2342 perhash = &aggstate->perhash[i];
2343
2344 /* phase 0 always points to the "real" Agg in the hash case */
2345 phasedata->aggnode = node;
2346 phasedata->aggstrategy = node->aggstrategy;
2347
2348 /* but the actual Agg node representing this hash is saved here */
2349 perhash->aggnode = aggnode;
2350
2351 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2352
2353 for (j = 0; j < aggnode->numCols; ++j)
2354 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2355
2356 phasedata->grouped_cols[i] = cols;
2357
2358 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2359 continue;
2360 }
2361 else
2362 {
2363 AggStatePerPhase phasedata = &aggstate->phases[++phase];
2364 int num_sets;
2365
2366 phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2367
2368 if (num_sets)
2369 {
2370 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2371 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2372
2373 i = 0;
2374 foreach(l, aggnode->groupingSets)
2375 {
2376 int current_length = list_length(lfirst(l));
2377 Bitmapset *cols = NULL;
2378
2379 /* planner forces this to be correct */
2380 for (j = 0; j < current_length; ++j)
2381 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2382
2383 phasedata->grouped_cols[i] = cols;
2384 phasedata->gset_lengths[i] = current_length;
2385
2386 ++i;
2387 }
2388
2389 all_grouped_cols = bms_add_members(all_grouped_cols,
2390 phasedata->grouped_cols[0]);
2391 }
2392 else
2393 {
2394 Assert(phaseidx == 0);
2395
2396 phasedata->gset_lengths = NULL;
2397 phasedata->grouped_cols = NULL;
2398 }
2399
2400 /*
2401 * If we are grouping, precompute fmgr lookup data for inner loop.
2402 */
2403 if (aggnode->aggstrategy == AGG_SORTED)
2404 {
2405 int i = 0;
2406
2407 Assert(aggnode->numCols > 0);
2408
2409 /*
2410 * Build a separate function for each subset of columns that
2411 * need to be compared.
2412 */
2413 phasedata->eqfunctions =
2414 (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
2415
2416 /* for each grouping set */
2417 for (i = 0; i < phasedata->numsets; i++)
2418 {
2419 int length = phasedata->gset_lengths[i];
2420
2421 if (phasedata->eqfunctions[length - 1] != NULL)
2422 continue;
2423
2424 phasedata->eqfunctions[length - 1] =
2425 execTuplesMatchPrepare(scanDesc,
2426 length,
2427 aggnode->grpColIdx,
2428 aggnode->grpOperators,
2429 aggnode->grpCollations,
2430 (PlanState *) aggstate);
2431 }
2432
2433 /* and for all grouped columns, unless already computed */
2434 if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
2435 {
2436 phasedata->eqfunctions[aggnode->numCols - 1] =
2437 execTuplesMatchPrepare(scanDesc,
2438 aggnode->numCols,
2439 aggnode->grpColIdx,
2440 aggnode->grpOperators,
2441 aggnode->grpCollations,
2442 (PlanState *) aggstate);
2443 }
2444 }
2445
2446 phasedata->aggnode = aggnode;
2447 phasedata->aggstrategy = aggnode->aggstrategy;
2448 phasedata->sortnode = sortnode;
2449 }
2450 }
2451
2452 /*
2453 * Convert all_grouped_cols to a descending-order list.
2454 */
2455 i = -1;
2456 while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2457 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2458
2459 /*
2460 * Set up aggregate-result storage in the output expr context, and also
2461 * allocate my private per-agg working storage
2462 */
2463 econtext = aggstate->ss.ps.ps_ExprContext;
2464 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2465 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2466
2467 peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2468 pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2469
2470 aggstate->peragg = peraggs;
2471 aggstate->pertrans = pertransstates;
2472
2473
2474 aggstate->all_pergroups =
2475 (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
2476 * (numGroupingSets + numHashes));
2477 pergroups = aggstate->all_pergroups;
2478
2479 if (node->aggstrategy != AGG_HASHED)
2480 {
2481 for (i = 0; i < numGroupingSets; i++)
2482 {
2483 pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
2484 * numaggs);
2485 }
2486
2487 aggstate->pergroups = pergroups;
2488 pergroups += numGroupingSets;
2489 }
2490
2491 /*
2492 * Hashing can only appear in the initial phase.
2493 */
2494 if (use_hashing)
2495 {
2496 /* this is an array of pointers, not structures */
2497 aggstate->hash_pergroup = pergroups;
2498
2499 find_hash_columns(aggstate);
2500 build_hash_table(aggstate);
2501 aggstate->table_filled = false;
2502 }
2503
2504 /*
2505 * Initialize current phase-dependent values to initial phase. The initial
2506 * phase is 1 (first sort pass) for all strategies that use sorting (if
2507 * hashing is being done too, then phase 0 is processed last); but if only
2508 * hashing is being done, then phase 0 is all there is.
2509 */
2510 if (node->aggstrategy == AGG_HASHED)
2511 {
2512 aggstate->current_phase = 0;
2513 initialize_phase(aggstate, 0);
2514 select_current_set(aggstate, 0, true);
2515 }
2516 else
2517 {
2518 aggstate->current_phase = 1;
2519 initialize_phase(aggstate, 1);
2520 select_current_set(aggstate, 0, false);
2521 }
2522
2523 /* -----------------
2524 * Perform lookups of aggregate function info, and initialize the
2525 * unchanging fields of the per-agg and per-trans data.
2526 *
2527 * We try to optimize by detecting duplicate aggregate functions so that
2528 * their state and final values are re-used, rather than needlessly being
2529 * re-calculated independently. We also detect aggregates that are not
2530 * the same, but which can share the same transition state.
2531 *
2532 * Scenarios:
2533 *
2534 * 1. Identical aggregate function calls appear in the query:
2535 *
2536 * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
2537 *
2538 * Since these aggregates are identical, we only need to calculate
2539 * the value once. Both aggregates will share the same 'aggno' value.
2540 *
2541 * 2. Two different aggregate functions appear in the query, but the
2542 * aggregates have the same arguments, transition functions and
2543 * initial values (and, presumably, different final functions):
2544 *
2545 * SELECT AVG(x), STDDEV(x) FROM ...
2546 *
2547 * In this case we must create a new peragg for the varying aggregate,
2548 * and we need to call the final functions separately, but we need
2549 * only run the transition function once. (This requires that the
2550 * final functions be nondestructive of the transition state, but
2551 * that's required anyway for other reasons.)
2552 *
2553 * For either of these optimizations to be valid, all aggregate properties
2554 * used in the transition phase must be the same, including any modifiers
2555 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
2556 * contain any volatile functions.
2557 * -----------------
2558 */
2559 aggno = -1;
2560 transno = -1;
2561 foreach(l, aggstate->aggs)
2562 {
2563 AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
2564 Aggref *aggref = aggrefstate->aggref;
2565 AggStatePerAgg peragg;
2566 AggStatePerTrans pertrans;
2567 int existing_aggno;
2568 int existing_transno;
2569 List *same_input_transnos;
2570 Oid inputTypes[FUNC_MAX_ARGS];
2571 int numArguments;
2572 int numDirectArgs;
2573 HeapTuple aggTuple;
2574 Form_pg_aggregate aggform;
2575 AclResult aclresult;
2576 Oid transfn_oid,
2577 finalfn_oid;
2578 bool shareable;
2579 Oid serialfn_oid,
2580 deserialfn_oid;
2581 Expr *finalfnexpr;
2582 Oid aggtranstype;
2583 Datum textInitVal;
2584 Datum initValue;
2585 bool initValueIsNull;
2586
2587 /* Planner should have assigned aggregate to correct level */
2588 Assert(aggref->agglevelsup == 0);
2589 /* ... and the split mode should match */
2590 Assert(aggref->aggsplit == aggstate->aggsplit);
2591
2592 /* 1. Check for already processed aggs which can be re-used */
2593 existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
2594 &same_input_transnos);
2595 if (existing_aggno != -1)
2596 {
2597 /*
2598 * Existing compatible agg found. so just point the Aggref to the
2599 * same per-agg struct.
2600 */
2601 aggrefstate->aggno = existing_aggno;
2602 continue;
2603 }
2604
2605 /* Mark Aggref state node with assigned index in the result array */
2606 peragg = &peraggs[++aggno];
2607 peragg->aggref = aggref;
2608 aggrefstate->aggno = aggno;
2609
2610 /* Fetch the pg_aggregate row */
2611 aggTuple = SearchSysCache1(AGGFNOID,
2612 ObjectIdGetDatum(aggref->aggfnoid));
2613 if (!HeapTupleIsValid(aggTuple))
2614 elog(ERROR, "cache lookup failed for aggregate %u",
2615 aggref->aggfnoid);
2616 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2617
2618 /* Check permission to call aggregate function */
2619 aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
2620 ACL_EXECUTE);
2621 if (aclresult != ACLCHECK_OK)
2622 aclcheck_error(aclresult, OBJECT_AGGREGATE,
2623 get_func_name(aggref->aggfnoid));
2624 InvokeFunctionExecuteHook(aggref->aggfnoid);
2625
2626 /* planner recorded transition state type in the Aggref itself */
2627 aggtranstype = aggref->aggtranstype;
2628 Assert(OidIsValid(aggtranstype));
2629
2630 /*
2631 * If this aggregation is performing state combines, then instead of
2632 * using the transition function, we'll use the combine function
2633 */
2634 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2635 {
2636 transfn_oid = aggform->aggcombinefn;
2637
2638 /* If not set then the planner messed up */
2639 if (!OidIsValid(transfn_oid))
2640 elog(ERROR, "combinefn not set for aggregate function");
2641 }
2642 else
2643 transfn_oid = aggform->aggtransfn;
2644
2645 /* Final function only required if we're finalizing the aggregates */
2646 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
2647 peragg->finalfn_oid = finalfn_oid = InvalidOid;
2648 else
2649 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2650
2651 /*
2652 * If finalfn is marked read-write, we can't share transition states;
2653 * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
2654 * if we're not executing the finalfn here, we can share regardless.
2655 */
2656 shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
2657 (finalfn_oid == InvalidOid);
2658 peragg->shareable = shareable;
2659
2660 serialfn_oid = InvalidOid;
2661 deserialfn_oid = InvalidOid;
2662
2663 /*
2664 * Check if serialization/deserialization is required. We only do it
2665 * for aggregates that have transtype INTERNAL.
2666 */
2667 if (aggtranstype == INTERNALOID)
2668 {
2669 /*
2670 * The planner should only have generated a serialize agg node if
2671 * every aggregate with an INTERNAL state has a serialization
2672 * function. Verify that.
2673 */
2674 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
2675 {
2676 /* serialization only valid when not running finalfn */
2677 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
2678
2679 if (!OidIsValid(aggform->aggserialfn))
2680 elog(ERROR, "serialfunc not provided for serialization aggregation");
2681 serialfn_oid = aggform->aggserialfn;
2682 }
2683
2684 /* Likewise for deserialization functions */
2685 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
2686 {
2687 /* deserialization only valid when combining states */
2688 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
2689
2690 if (!OidIsValid(aggform->aggdeserialfn))
2691 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
2692 deserialfn_oid = aggform->aggdeserialfn;
2693 }
2694 }
2695
2696 /* Check that aggregate owner has permission to call component fns */
2697 {
2698 HeapTuple procTuple;
2699 Oid aggOwner;
2700
2701 procTuple = SearchSysCache1(PROCOID,
2702 ObjectIdGetDatum(aggref->aggfnoid));
2703 if (!HeapTupleIsValid(procTuple))
2704 elog(ERROR, "cache lookup failed for function %u",
2705 aggref->aggfnoid);
2706 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2707 ReleaseSysCache(procTuple);
2708
2709 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2710 ACL_EXECUTE);
2711 if (aclresult != ACLCHECK_OK)
2712 aclcheck_error(aclresult, OBJECT_FUNCTION,
2713 get_func_name(transfn_oid));
2714 InvokeFunctionExecuteHook(transfn_oid);
2715 if (OidIsValid(finalfn_oid))
2716 {
2717 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2718 ACL_EXECUTE);
2719 if (aclresult != ACLCHECK_OK)
2720 aclcheck_error(aclresult, OBJECT_FUNCTION,
2721 get_func_name(finalfn_oid));
2722 InvokeFunctionExecuteHook(finalfn_oid);
2723 }
2724 if (OidIsValid(serialfn_oid))
2725 {
2726 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
2727 ACL_EXECUTE);
2728 if (aclresult != ACLCHECK_OK)
2729 aclcheck_error(aclresult, OBJECT_FUNCTION,
2730 get_func_name(serialfn_oid));
2731 InvokeFunctionExecuteHook(serialfn_oid);
2732 }
2733 if (OidIsValid(deserialfn_oid))
2734 {
2735 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
2736 ACL_EXECUTE);
2737 if (aclresult != ACLCHECK_OK)
2738 aclcheck_error(aclresult, OBJECT_FUNCTION,
2739 get_func_name(deserialfn_oid));
2740 InvokeFunctionExecuteHook(deserialfn_oid);
2741 }
2742 }
2743
2744 /*
2745 * Get actual datatypes of the (nominal) aggregate inputs. These
2746 * could be different from the agg's declared input types, when the
2747 * agg accepts ANY or a polymorphic type.
2748 */
2749 numArguments = get_aggregate_argtypes(aggref, inputTypes);
2750
2751 /* Count the "direct" arguments, if any */
2752 numDirectArgs = list_length(aggref->aggdirectargs);
2753
2754 /* Detect how many arguments to pass to the finalfn */
2755 if (aggform->aggfinalextra)
2756 peragg->numFinalArgs = numArguments + 1;
2757 else
2758 peragg->numFinalArgs = numDirectArgs + 1;
2759
2760 /* Initialize any direct-argument expressions */
2761 peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
2762 (PlanState *) aggstate);
2763
2764 /*
2765 * build expression trees using actual argument & result types for the
2766 * finalfn, if it exists and is required.
2767 */
2768 if (OidIsValid(finalfn_oid))
2769 {
2770 build_aggregate_finalfn_expr(inputTypes,
2771 peragg->numFinalArgs,
2772 aggtranstype,
2773 aggref->aggtype,
2774 aggref->inputcollid,
2775 finalfn_oid,
2776 &finalfnexpr);
2777 fmgr_info(finalfn_oid, &peragg->finalfn);
2778 fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
2779 }
2780
2781 /* get info about the output value's datatype */
2782 get_typlenbyval(aggref->aggtype,
2783 &peragg->resulttypeLen,
2784 &peragg->resulttypeByVal);
2785
2786 /*
2787 * initval is potentially null, so don't try to access it as a struct
2788 * field. Must do it the hard way with SysCacheGetAttr.
2789 */
2790 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
2791 Anum_pg_aggregate_agginitval,
2792 &initValueIsNull);
2793 if (initValueIsNull)
2794 initValue = (Datum) 0;
2795 else
2796 initValue = GetAggInitVal(textInitVal, aggtranstype);
2797
2798 /*
2799 * 2. Build working state for invoking the transition function, or
2800 * look up previously initialized working state, if we can share it.
2801 *
2802 * find_compatible_peragg() already collected a list of shareable
2803 * per-Trans's with the same inputs. Check if any of them have the
2804 * same transition function and initial value.
2805 */
2806 existing_transno = find_compatible_pertrans(aggstate, aggref,
2807 shareable,
2808 transfn_oid, aggtranstype,
2809 serialfn_oid, deserialfn_oid,
2810 initValue, initValueIsNull,
2811 same_input_transnos);
2812 if (existing_transno != -1)
2813 {
2814 /*
2815 * Existing compatible trans found, so just point the 'peragg' to
2816 * the same per-trans struct, and mark the trans state as shared.
2817 */
2818 pertrans = &pertransstates[existing_transno];
2819 pertrans->aggshared = true;
2820 peragg->transno = existing_transno;
2821 }
2822 else
2823 {
2824 pertrans = &pertransstates[++transno];
2825 build_pertrans_for_aggref(pertrans, aggstate, estate,
2826 aggref, transfn_oid, aggtranstype,
2827 serialfn_oid, deserialfn_oid,
2828 initValue, initValueIsNull,
2829 inputTypes, numArguments);
2830 peragg->transno = transno;
2831 }
2832 ReleaseSysCache(aggTuple);
2833 }
2834
2835 /*
2836 * Update aggstate->numaggs to be the number of unique aggregates found.
2837 * Also set numstates to the number of unique transition states found.
2838 */
2839 aggstate->numaggs = aggno + 1;
2840 aggstate->numtrans = transno + 1;
2841
2842 /*
2843 * Last, check whether any more aggregates got added onto the node while
2844 * we processed the expressions for the aggregate arguments (including not
2845 * only the regular arguments and FILTER expressions handled immediately
2846 * above, but any direct arguments we might've handled earlier). If so,
2847 * we have nested aggregate functions, which is semantically nonsensical,
2848 * so complain. (This should have been caught by the parser, so we don't
2849 * need to work hard on a helpful error message; but we defend against it
2850 * here anyway, just to be sure.)
2851 */
2852 if (numaggs != list_length(aggstate->aggs))
2853 ereport(ERROR,
2854 (errcode(ERRCODE_GROUPING_ERROR),
2855 errmsg("aggregate function calls cannot be nested")));
2856
2857 /*
2858 * Build expressions doing all the transition work at once. We build a
2859 * different one for each phase, as the number of transition function
2860 * invocation can differ between phases. Note this'll work both for
2861 * transition and combination functions (although there'll only be one
2862 * phase in the latter case).
2863 */
2864 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
2865 {
2866 AggStatePerPhase phase = &aggstate->phases[phaseidx];
2867 bool dohash = false;
2868 bool dosort = false;
2869
2870 /* phase 0 doesn't necessarily exist */
2871 if (!phase->aggnode)
2872 continue;
2873
2874 if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
2875 {
2876 /*
2877 * Phase one, and only phase one, in a mixed agg performs both
2878 * sorting and aggregation.
2879 */
2880 dohash = true;
2881 dosort = true;
2882 }
2883 else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
2884 {
2885 /*
2886 * No need to compute a transition function for an AGG_MIXED phase
2887 * 0 - the contents of the hashtables will have been computed
2888 * during phase 1.
2889 */
2890 continue;
2891 }
2892 else if (phase->aggstrategy == AGG_PLAIN ||
2893 phase->aggstrategy == AGG_SORTED)
2894 {
2895 dohash = false;
2896 dosort = true;
2897 }
2898 else if (phase->aggstrategy == AGG_HASHED)
2899 {
2900 dohash = true;
2901 dosort = false;
2902 }
2903 else
2904 Assert(false);
2905
2906 phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash);
2907
2908 }
2909
2910 return aggstate;
2911}
2912
2913/*
2914 * Build the state needed to calculate a state value for an aggregate.
2915 *
2916 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
2917 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
2918 * of the arguments could be calculated from 'aggref', but the caller has
2919 * calculated them already, so might as well pass them.
2920 */
2921static void
2922build_pertrans_for_aggref(AggStatePerTrans pertrans,
2923 AggState *aggstate, EState *estate,
2924 Aggref *aggref,
2925 Oid aggtransfn, Oid aggtranstype,
2926 Oid aggserialfn, Oid aggdeserialfn,
2927 Datum initValue, bool initValueIsNull,
2928 Oid *inputTypes, int numArguments)
2929{
2930 int numGroupingSets = Max(aggstate->maxsets, 1);
2931 Expr *serialfnexpr = NULL;
2932 Expr *deserialfnexpr = NULL;
2933 ListCell *lc;
2934 int numInputs;
2935 int numDirectArgs;
2936 List *sortlist;
2937 int numSortCols;
2938 int numDistinctCols;
2939 int i;
2940
2941 /* Begin filling in the pertrans data */
2942 pertrans->aggref = aggref;
2943 pertrans->aggshared = false;
2944 pertrans->aggCollation = aggref->inputcollid;
2945 pertrans->transfn_oid = aggtransfn;
2946 pertrans->serialfn_oid = aggserialfn;
2947 pertrans->deserialfn_oid = aggdeserialfn;
2948 pertrans->initValue = initValue;
2949 pertrans->initValueIsNull = initValueIsNull;
2950
2951 /* Count the "direct" arguments, if any */
2952 numDirectArgs = list_length(aggref->aggdirectargs);
2953
2954 /* Count the number of aggregated input columns */
2955 pertrans->numInputs = numInputs = list_length(aggref->args);
2956
2957 pertrans->aggtranstype = aggtranstype;
2958
2959 /*
2960 * When combining states, we have no use at all for the aggregate
2961 * function's transfn. Instead we use the combinefn. In this case, the
2962 * transfn and transfn_oid fields of pertrans refer to the combine
2963 * function rather than the transition function.
2964 */
2965 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2966 {
2967 Expr *combinefnexpr;
2968 size_t numTransArgs;
2969
2970 /*
2971 * When combining there's only one input, the to-be-combined added
2972 * transition value from below (this node's transition value is
2973 * counted separately).
2974 */
2975 pertrans->numTransInputs = 1;
2976
2977 /* account for the current transition state */
2978 numTransArgs = pertrans->numTransInputs + 1;
2979
2980 build_aggregate_combinefn_expr(aggtranstype,
2981 aggref->inputcollid,
2982 aggtransfn,
2983 &combinefnexpr);
2984 fmgr_info(aggtransfn, &pertrans->transfn);
2985 fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
2986
2987 pertrans->transfn_fcinfo =
2988 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
2989 InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
2990 &pertrans->transfn,
2991 numTransArgs,
2992 pertrans->aggCollation,
2993 (void *) aggstate, NULL);
2994
2995 /*
2996 * Ensure that a combine function to combine INTERNAL states is not
2997 * strict. This should have been checked during CREATE AGGREGATE, but
2998 * the strict property could have been changed since then.
2999 */
3000 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
3001 ereport(ERROR,
3002 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3003 errmsg("combine function with transition type %s must not be declared STRICT",
3004 format_type_be(aggtranstype))));
3005 }
3006 else
3007 {
3008 Expr *transfnexpr;
3009 size_t numTransArgs;
3010
3011 /* Detect how many arguments to pass to the transfn */
3012 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3013 pertrans->numTransInputs = numInputs;
3014 else
3015 pertrans->numTransInputs = numArguments;
3016
3017 /* account for the current transition state */
3018 numTransArgs = pertrans->numTransInputs + 1;
3019
3020 /*
3021 * Set up infrastructure for calling the transfn. Note that invtrans
3022 * is not needed here.
3023 */
3024 build_aggregate_transfn_expr(inputTypes,
3025 numArguments,
3026 numDirectArgs,
3027 aggref->aggvariadic,
3028 aggtranstype,
3029 aggref->inputcollid,
3030 aggtransfn,
3031 InvalidOid,
3032 &transfnexpr,
3033 NULL);
3034 fmgr_info(aggtransfn, &pertrans->transfn);
3035 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3036
3037 pertrans->transfn_fcinfo =
3038 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
3039 InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
3040 &pertrans->transfn,
3041 numTransArgs,
3042 pertrans->aggCollation,
3043 (void *) aggstate, NULL);
3044
3045 /*
3046 * If the transfn is strict and the initval is NULL, make sure input
3047 * type and transtype are the same (or at least binary-compatible), so
3048 * that it's OK to use the first aggregated input value as the initial
3049 * transValue. This should have been checked at agg definition time,
3050 * but we must check again in case the transfn's strictness property
3051 * has been changed.
3052 */
3053 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3054 {
3055 if (numArguments <= numDirectArgs ||
3056 !IsBinaryCoercible(inputTypes[numDirectArgs],
3057 aggtranstype))
3058 ereport(ERROR,
3059 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3060 errmsg("aggregate %u needs to have compatible input type and transition type",
3061 aggref->aggfnoid)));
3062 }
3063 }
3064
3065 /* get info about the state value's datatype */
3066 get_typlenbyval(aggtranstype,
3067 &pertrans->transtypeLen,
3068 &pertrans->transtypeByVal);
3069
3070 if (OidIsValid(aggserialfn))
3071 {
3072 build_aggregate_serialfn_expr(aggserialfn,
3073 &serialfnexpr);
3074 fmgr_info(aggserialfn, &pertrans->serialfn);
3075 fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3076
3077 pertrans->serialfn_fcinfo =
3078 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
3079 InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
3080 &pertrans->serialfn,
3081 1,
3082 InvalidOid,
3083 (void *) aggstate, NULL);
3084 }
3085
3086 if (OidIsValid(aggdeserialfn))
3087 {
3088 build_aggregate_deserialfn_expr(aggdeserialfn,
3089 &deserialfnexpr);
3090 fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3091 fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3092
3093 pertrans->deserialfn_fcinfo =
3094 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
3095 InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
3096 &pertrans->deserialfn,
3097 2,
3098 InvalidOid,
3099 (void *) aggstate, NULL);
3100
3101 }
3102
3103 /*
3104 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3105 * have a list of SortGroupClause nodes; fish out the data in them and
3106 * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
3107 * however; the agg's transfn and finalfn are responsible for that.
3108 *
3109 * Note that by construction, if there is a DISTINCT clause then the ORDER
3110 * BY clause is a prefix of it (see transformDistinctClause).
3111 */
3112 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3113 {
3114 sortlist = NIL;
3115 numSortCols = numDistinctCols = 0;
3116 }
3117 else if (aggref->aggdistinct)
3118 {
3119 sortlist = aggref->aggdistinct;
3120 numSortCols = numDistinctCols = list_length(sortlist);
3121 Assert(numSortCols >= list_length(aggref->aggorder));
3122 }
3123 else
3124 {
3125 sortlist = aggref->aggorder;
3126 numSortCols = list_length(sortlist);
3127 numDistinctCols = 0;
3128 }
3129
3130 pertrans->numSortCols = numSortCols;
3131 pertrans->numDistinctCols = numDistinctCols;
3132
3133 /*
3134 * If we have either sorting or filtering to do, create a tupledesc and
3135 * slot corresponding to the aggregated inputs (including sort
3136 * expressions) of the agg.
3137 */
3138 if (numSortCols > 0 || aggref->aggfilter)
3139 {
3140 pertrans->sortdesc = ExecTypeFromTL(aggref->args);
3141 pertrans->sortslot =
3142 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
3143 &TTSOpsMinimalTuple);
3144 }
3145
3146 if (numSortCols > 0)
3147 {
3148 /*
3149 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3150 * (yet)
3151 */
3152 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3153
3154 /* If we have only one input, we need its len/byval info. */
3155 if (numInputs == 1)
3156 {
3157 get_typlenbyval(inputTypes[numDirectArgs],
3158 &pertrans->inputtypeLen,
3159 &pertrans->inputtypeByVal);
3160 }
3161 else if (numDistinctCols > 0)
3162 {
3163 /* we will need an extra slot to store prior values */
3164 pertrans->uniqslot =
3165 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
3166 &TTSOpsMinimalTuple);
3167 }
3168
3169 /* Extract the sort information for use later */
3170 pertrans->sortColIdx =
3171 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3172 pertrans->sortOperators =
3173 (Oid *) palloc(numSortCols * sizeof(Oid));
3174 pertrans->sortCollations =
3175 (Oid *) palloc(numSortCols * sizeof(Oid));
3176 pertrans->sortNullsFirst =
3177 (bool *) palloc(numSortCols * sizeof(bool));
3178
3179 i = 0;
3180 foreach(lc, sortlist)
3181 {
3182 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3183 TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3184
3185 /* the parser should have made sure of this */
3186 Assert(OidIsValid(sortcl->sortop));
3187
3188 pertrans->sortColIdx[i] = tle->resno;
3189 pertrans->sortOperators[i] = sortcl->sortop;
3190 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3191 pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3192 i++;
3193 }
3194 Assert(i == numSortCols);
3195 }
3196
3197 if (aggref->aggdistinct)
3198 {
3199 Oid *ops;
3200
3201 Assert(numArguments > 0);
3202 Assert(list_length(aggref->aggdistinct) == numDistinctCols);
3203
3204 ops = palloc(numDistinctCols * sizeof(Oid));
3205
3206 i = 0;
3207 foreach(lc, aggref->aggdistinct)
3208 ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
3209
3210 /* lookup / build the necessary comparators */
3211 if (numDistinctCols == 1)
3212 fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
3213 else
3214 pertrans->equalfnMulti =
3215 execTuplesMatchPrepare(pertrans->sortdesc,
3216 numDistinctCols,
3217 pertrans->sortColIdx,
3218 ops,
3219 pertrans->sortCollations,
3220 &aggstate->ss.ps);
3221 pfree(ops);
3222 }
3223
3224 pertrans->sortstates = (Tuplesortstate **)
3225 palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3226}
3227
3228
3229static Datum
3230GetAggInitVal(Datum textInitVal, Oid transtype)
3231{
3232 Oid typinput,
3233 typioparam;
3234 char *strInitVal;
3235 Datum initVal;
3236
3237 getTypeInputInfo(transtype, &typinput, &typioparam);
3238 strInitVal = TextDatumGetCString(textInitVal);
3239 initVal = OidInputFunctionCall(typinput, strInitVal,
3240 typioparam, -1);
3241 pfree(strInitVal);
3242 return initVal;
3243}
3244
3245/*
3246 * find_compatible_peragg - search for a previously initialized per-Agg struct
3247 *
3248 * Searches the previously looked at aggregates to find one which is compatible
3249 * with this one, with the same input parameters. If no compatible aggregate
3250 * can be found, returns -1.
3251 *
3252 * As a side-effect, this also collects a list of existing, shareable per-Trans
3253 * structs with matching inputs. If no identical Aggref is found, the list is
3254 * passed later to find_compatible_pertrans, to see if we can at least reuse
3255 * the state value of another aggregate.
3256 */
3257static int
3258find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3259 int lastaggno, List **same_input_transnos)
3260{
3261 int aggno;
3262 AggStatePerAgg peraggs;
3263
3264 *same_input_transnos = NIL;
3265
3266 /* we mustn't reuse the aggref if it contains volatile function calls */
3267 if (contain_volatile_functions((Node *) newagg))
3268 return -1;
3269
3270 peraggs = aggstate->peragg;
3271
3272 /*
3273 * Search through the list of already seen aggregates. If we find an
3274 * existing identical aggregate call, then we can re-use that one. While
3275 * searching, we'll also collect a list of Aggrefs with the same input
3276 * parameters. If no matching Aggref is found, the caller can potentially
3277 * still re-use the transition state of one of them. (At this stage we
3278 * just compare the parsetrees; whether different aggregates share the
3279 * same transition function will be checked later.)
3280 */
3281 for (aggno = 0; aggno <= lastaggno; aggno++)
3282 {
3283 AggStatePerAgg peragg;
3284 Aggref *existingRef;
3285
3286 peragg = &peraggs[aggno];
3287 existingRef = peragg->aggref;
3288
3289 /* all of the following must be the same or it's no match */
3290 if (newagg->inputcollid != existingRef->inputcollid ||
3291 newagg->aggtranstype != existingRef->aggtranstype ||
3292 newagg->aggstar != existingRef->aggstar ||
3293 newagg->aggvariadic != existingRef->aggvariadic ||
3294 newagg->aggkind != existingRef->aggkind ||
3295 !equal(newagg->args, existingRef->args) ||
3296 !equal(newagg->aggorder, existingRef->aggorder) ||
3297 !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3298 !equal(newagg->aggfilter, existingRef->aggfilter))
3299 continue;
3300
3301 /* if it's the same aggregate function then report exact match */
3302 if (newagg->aggfnoid == existingRef->aggfnoid &&
3303 newagg->aggtype == existingRef->aggtype &&
3304 newagg->aggcollid == existingRef->aggcollid &&
3305 equal(newagg->aggdirectargs, existingRef->aggdirectargs))
3306 {
3307 list_free(*same_input_transnos);
3308 *same_input_transnos = NIL;
3309 return aggno;
3310 }
3311
3312 /*
3313 * Not identical, but it had the same inputs. If the final function
3314 * permits sharing, return its transno to the caller, in case we can
3315 * re-use its per-trans state. (If there's already sharing going on,
3316 * we might report a transno more than once. find_compatible_pertrans
3317 * is cheap enough that it's not worth spending cycles to avoid that.)
3318 */
3319 if (peragg->shareable)
3320 *same_input_transnos = lappend_int(*same_input_transnos,
3321 peragg->transno);
3322 }
3323
3324 return -1;
3325}
3326
3327/*
3328 * find_compatible_pertrans - search for a previously initialized per-Trans
3329 * struct
3330 *
3331 * Searches the list of transnos for a per-Trans struct with the same
3332 * transition function and initial condition. (The inputs have already been
3333 * verified to match.)
3334 */
3335static int
3336find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
3337 Oid aggtransfn, Oid aggtranstype,
3338 Oid aggserialfn, Oid aggdeserialfn,
3339 Datum initValue, bool initValueIsNull,
3340 List *transnos)
3341{
3342 ListCell *lc;
3343
3344 /* If this aggregate can't share transition states, give up */
3345 if (!shareable)
3346 return -1;
3347
3348 foreach(lc, transnos)
3349 {
3350 int transno = lfirst_int(lc);
3351 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3352
3353 /*
3354 * if the transfns or transition state types are not the same then the
3355 * state can't be shared.
3356 */
3357 if (aggtransfn != pertrans->transfn_oid ||
3358 aggtranstype != pertrans->aggtranstype)
3359 continue;
3360
3361 /*
3362 * The serialization and deserialization functions must match, if
3363 * present, as we're unable to share the trans state for aggregates
3364 * which will serialize or deserialize into different formats.
3365 * Remember that these will be InvalidOid if they're not required for
3366 * this agg node.
3367 */
3368 if (aggserialfn != pertrans->serialfn_oid ||
3369 aggdeserialfn != pertrans->deserialfn_oid)
3370 continue;
3371
3372 /*
3373 * Check that the initial condition matches, too.
3374 */
3375 if (initValueIsNull && pertrans->initValueIsNull)
3376 return transno;
3377
3378 if (!initValueIsNull && !pertrans->initValueIsNull &&
3379 datumIsEqual(initValue, pertrans->initValue,
3380 pertrans->transtypeByVal, pertrans->transtypeLen))
3381 return transno;
3382 }
3383 return -1;
3384}
3385
3386void
3387ExecEndAgg(AggState *node)
3388{
3389 PlanState *outerPlan;
3390 int transno;
3391 int numGroupingSets = Max(node->maxsets, 1);
3392 int setno;
3393
3394 /* Make sure we have closed any open tuplesorts */
3395
3396 if (node->sort_in)
3397 tuplesort_end(node->sort_in);
3398 if (node->sort_out)
3399 tuplesort_end(node->sort_out);
3400
3401 for (transno = 0; transno < node->numtrans; transno++)
3402 {
3403 AggStatePerTrans pertrans = &node->pertrans[transno];
3404
3405 for (setno = 0; setno < numGroupingSets; setno++)
3406 {
3407 if (pertrans->sortstates[setno])
3408 tuplesort_end(pertrans->sortstates[setno]);
3409 }
3410 }
3411
3412 /* And ensure any agg shutdown callbacks have been called */
3413 for (setno = 0; setno < numGroupingSets; setno++)
3414 ReScanExprContext(node->aggcontexts[setno]);
3415 if (node->hashcontext)
3416 ReScanExprContext(node->hashcontext);
3417
3418 /*
3419 * We don't actually free any ExprContexts here (see comment in
3420 * ExecFreeExprContext), just unlinking the output one from the plan node
3421 * suffices.
3422 */
3423 ExecFreeExprContext(&node->ss.ps);
3424
3425 /* clean up tuple table */
3426 ExecClearTuple(node->ss.ss_ScanTupleSlot);
3427
3428 outerPlan = outerPlanState(node);
3429 ExecEndNode(outerPlan);
3430}
3431
3432void
3433ExecReScanAgg(AggState *node)
3434{
3435 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3436 PlanState *outerPlan = outerPlanState(node);
3437 Agg *aggnode = (Agg *) node->ss.ps.plan;
3438 int transno;
3439 int numGroupingSets = Max(node->maxsets, 1);
3440 int setno;
3441
3442 node->agg_done = false;
3443
3444 if (node->aggstrategy == AGG_HASHED)
3445 {
3446 /*
3447 * In the hashed case, if we haven't yet built the hash table then we
3448 * can just return; nothing done yet, so nothing to undo. If subnode's
3449 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3450 * else no reason to re-scan it at all.
3451 */
3452 if (!node->table_filled)
3453 return;
3454
3455 /*
3456 * If we do have the hash table, and the subplan does not have any
3457 * parameter changes, and none of our own parameter changes affect
3458 * input expressions of the aggregated functions, then we can just
3459 * rescan the existing hash table; no need to build it again.
3460 */
3461 if (outerPlan->chgParam == NULL &&
3462 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3463 {
3464 ResetTupleHashIterator(node->perhash[0].hashtable,
3465 &node->perhash[0].hashiter);
3466 select_current_set(node, 0, true);
3467 return;
3468 }
3469 }
3470
3471 /* Make sure we have closed any open tuplesorts */
3472 for (transno = 0; transno < node->numtrans; transno++)
3473 {
3474 for (setno = 0; setno < numGroupingSets; setno++)
3475 {
3476 AggStatePerTrans pertrans = &node->pertrans[transno];
3477
3478 if (pertrans->sortstates[setno])
3479 {
3480 tuplesort_end(pertrans->sortstates[setno]);
3481 pertrans->sortstates[setno] = NULL;
3482 }
3483 }
3484 }
3485
3486 /*
3487 * We don't need to ReScanExprContext the output tuple context here;
3488 * ExecReScan already did it. But we do need to reset our per-grouping-set
3489 * contexts, which may have transvalues stored in them. (We use rescan
3490 * rather than just reset because transfns may have registered callbacks
3491 * that need to be run now.) For the AGG_HASHED case, see below.
3492 */
3493
3494 for (setno = 0; setno < numGroupingSets; setno++)
3495 {
3496 ReScanExprContext(node->aggcontexts[setno]);
3497 }
3498
3499 /* Release first tuple of group, if we have made a copy */
3500 if (node->grp_firstTuple != NULL)
3501 {
3502 heap_freetuple(node->grp_firstTuple);
3503 node->grp_firstTuple = NULL;
3504 }
3505 ExecClearTuple(node->ss.ss_ScanTupleSlot);
3506
3507 /* Forget current agg values */
3508 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3509 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3510
3511 /*
3512 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3513 * the hashcontext. This used to be an issue, but now, resetting a context
3514 * automatically deletes sub-contexts too.
3515 */
3516 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3517 {
3518 ReScanExprContext(node->hashcontext);
3519 /* Rebuild an empty hash table */
3520 build_hash_table(node);
3521 node->table_filled = false;
3522 /* iterator will be reset when the table is filled */
3523 }
3524
3525 if (node->aggstrategy != AGG_HASHED)
3526 {
3527 /*
3528 * Reset the per-group state (in particular, mark transvalues null)
3529 */
3530 for (setno = 0; setno < numGroupingSets; setno++)
3531 {
3532 MemSet(node->pergroups[setno], 0,
3533 sizeof(AggStatePerGroupData) * node->numaggs);
3534 }
3535
3536 /* reset to phase 1 */
3537 initialize_phase(node, 1);
3538
3539 node->input_done = false;
3540 node->projected_set = -1;
3541 }
3542
3543 if (outerPlan->chgParam == NULL)
3544 ExecReScan(outerPlan);
3545}
3546
3547
3548/***********************************************************************
3549 * API exposed to aggregate functions
3550 ***********************************************************************/
3551
3552
3553/*
3554 * AggCheckCallContext - test if a SQL function is being called as an aggregate
3555 *
3556 * The transition and/or final functions of an aggregate may want to verify
3557 * that they are being called as aggregates, rather than as plain SQL
3558 * functions. They should use this function to do so. The return value
3559 * is nonzero if being called as an aggregate, or zero if not. (Specific
3560 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
3561 * values could conceivably appear in future.)
3562 *
3563 * If aggcontext isn't NULL, the function also stores at *aggcontext the
3564 * identity of the memory context that aggregate transition values are being
3565 * stored in. Note that the same aggregate call site (flinfo) may be called
3566 * interleaved on different transition values in different contexts, so it's
3567 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
3568 * cache it in the transvalue itself (for internal-type transvalues).
3569 */
3570int
3571AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
3572{
3573 if (fcinfo->context && IsA(fcinfo->context, AggState))
3574 {
3575 if (aggcontext)
3576 {
3577 AggState *aggstate = ((AggState *) fcinfo->context);
3578 ExprContext *cxt = aggstate->curaggcontext;
3579
3580 *aggcontext = cxt->ecxt_per_tuple_memory;
3581 }
3582 return AGG_CONTEXT_AGGREGATE;
3583 }
3584 if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
3585 {
3586 if (aggcontext)
3587 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
3588 return AGG_CONTEXT_WINDOW;
3589 }
3590
3591 /* this is just to prevent "uninitialized variable" warnings */
3592 if (aggcontext)
3593 *aggcontext = NULL;
3594 return 0;
3595}
3596
3597/*
3598 * AggGetAggref - allow an aggregate support function to get its Aggref
3599 *
3600 * If the function is being called as an aggregate support function,
3601 * return the Aggref node for the aggregate call. Otherwise, return NULL.
3602 *
3603 * Aggregates sharing the same inputs and transition functions can get
3604 * merged into a single transition calculation. If the transition function
3605 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
3606 * executing. It must therefore not pay attention to the Aggref fields that
3607 * relate to the final function, as those are indeterminate. But if a final
3608 * function calls AggGetAggref, it will get a precise result.
3609 *
3610 * Note that if an aggregate is being used as a window function, this will
3611 * return NULL. We could provide a similar function to return the relevant
3612 * WindowFunc node in such cases, but it's not needed yet.
3613 */
3614Aggref *
3615AggGetAggref(FunctionCallInfo fcinfo)
3616{
3617 if (fcinfo->context && IsA(fcinfo->context, AggState))
3618 {
3619 AggState *aggstate = (AggState *) fcinfo->context;
3620 AggStatePerAgg curperagg;
3621 AggStatePerTrans curpertrans;
3622
3623 /* check curperagg (valid when in a final function) */
3624 curperagg = aggstate->curperagg;
3625
3626 if (curperagg)
3627 return curperagg->aggref;
3628
3629 /* check curpertrans (valid when in a transition function) */
3630 curpertrans = aggstate->curpertrans;
3631
3632 if (curpertrans)
3633 return curpertrans->aggref;
3634 }
3635 return NULL;
3636}
3637
3638/*
3639 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
3640 *
3641 * This is useful in agg final functions; the context returned is one that
3642 * the final function can safely reset as desired. This isn't useful for
3643 * transition functions, since the context returned MAY (we don't promise)
3644 * be the same as the context those are called in.
3645 *
3646 * As above, this is currently not useful for aggs called as window functions.
3647 */
3648MemoryContext
3649AggGetTempMemoryContext(FunctionCallInfo fcinfo)
3650{
3651 if (fcinfo->context && IsA(fcinfo->context, AggState))
3652 {
3653 AggState *aggstate = (AggState *) fcinfo->context;
3654
3655 return aggstate->tmpcontext->ecxt_per_tuple_memory;
3656 }
3657 return NULL;
3658}
3659
3660/*
3661 * AggStateIsShared - find out whether transition state is shared
3662 *
3663 * If the function is being called as an aggregate support function,
3664 * return true if the aggregate's transition state is shared across
3665 * multiple aggregates, false if it is not.
3666 *
3667 * Returns true if not called as an aggregate support function.
3668 * This is intended as a conservative answer, ie "no you'd better not
3669 * scribble on your input". In particular, will return true if the
3670 * aggregate is being used as a window function, which is a scenario
3671 * in which changing the transition state is a bad idea. We might
3672 * want to refine the behavior for the window case in future.
3673 */
3674bool
3675AggStateIsShared(FunctionCallInfo fcinfo)
3676{
3677 if (fcinfo->context && IsA(fcinfo->context, AggState))
3678 {
3679 AggState *aggstate = (AggState *) fcinfo->context;
3680 AggStatePerAgg curperagg;
3681 AggStatePerTrans curpertrans;
3682
3683 /* check curperagg (valid when in a final function) */
3684 curperagg = aggstate->curperagg;
3685
3686 if (curperagg)
3687 return aggstate->pertrans[curperagg->transno].aggshared;
3688
3689 /* check curpertrans (valid when in a transition function) */
3690 curpertrans = aggstate->curpertrans;
3691
3692 if (curpertrans)
3693 return curpertrans->aggshared;
3694 }
3695 return true;
3696}
3697
3698/*
3699 * AggRegisterCallback - register a cleanup callback for an aggregate
3700 *
3701 * This is useful for aggs to register shutdown callbacks, which will ensure
3702 * that non-memory resources are freed. The callback will occur just before
3703 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
3704 * either between groups or as a result of rescanning the query. The callback
3705 * will NOT be called on error paths. The typical use-case is for freeing of
3706 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
3707 * created by the agg functions. (The callback will not be called until after
3708 * the result of the finalfn is no longer needed, so it's safe for the finalfn
3709 * to return data that will be freed by the callback.)
3710 *
3711 * As above, this is currently not useful for aggs called as window functions.
3712 */
3713void
3714AggRegisterCallback(FunctionCallInfo fcinfo,
3715 ExprContextCallbackFunction func,
3716 Datum arg)
3717{
3718 if (fcinfo->context && IsA(fcinfo->context, AggState))
3719 {
3720 AggState *aggstate = (AggState *) fcinfo->context;
3721 ExprContext *cxt = aggstate->curaggcontext;
3722
3723 RegisterExprContextCallback(cxt, func, arg);
3724
3725 return;
3726 }
3727 elog(ERROR, "aggregate function cannot register a callback in this context");
3728}
3729
3730
3731/*
3732 * aggregate_dummy - dummy execution routine for aggregate functions
3733 *
3734 * This function is listed as the implementation (prosrc field) of pg_proc
3735 * entries for aggregate functions. Its only purpose is to throw an error
3736 * if someone mistakenly executes such a function in the normal way.
3737 *
3738 * Perhaps someday we could assign real meaning to the prosrc field of
3739 * an aggregate?
3740 */
3741Datum
3742aggregate_dummy(PG_FUNCTION_ARGS)
3743{
3744 elog(ERROR, "aggregate function %u called as normal function",
3745 fcinfo->flinfo->fn_oid);
3746 return (Datum) 0; /* keep compiler quiet */
3747}
3748