1#include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp"
2#include "duckdb/execution/operator/projection/physical_projection.hpp"
3#include "duckdb/execution/physical_plan_generator.hpp"
4#include "duckdb/function/aggregate/distributive_functions.hpp"
5#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
6#include "duckdb/planner/expression/bound_reference_expression.hpp"
7#include "duckdb/planner/operator/logical_distinct.hpp"
8#include "duckdb/function/function_binder.hpp"
9
10namespace duckdb {
11
12unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalDistinct &op) {
13 D_ASSERT(op.children.size() == 1);
14 auto child = CreatePlan(op&: *op.children[0]);
15 auto &distinct_targets = op.distinct_targets;
16 D_ASSERT(child);
17 D_ASSERT(!distinct_targets.empty());
18
19 auto &types = child->GetTypes();
20 vector<unique_ptr<Expression>> groups, aggregates, projections;
21 idx_t group_count = distinct_targets.size();
22 unordered_map<idx_t, idx_t> group_by_references;
23 vector<LogicalType> aggregate_types;
24 // creates one group per distinct_target
25 for (idx_t i = 0; i < distinct_targets.size(); i++) {
26 auto &target = distinct_targets[i];
27 if (target->type == ExpressionType::BOUND_REF) {
28 auto &bound_ref = target->Cast<BoundReferenceExpression>();
29 group_by_references[bound_ref.index] = i;
30 }
31 aggregate_types.push_back(x: target->return_type);
32 groups.push_back(x: std::move(target));
33 }
34 bool requires_projection = false;
35 if (types.size() != group_count) {
36 requires_projection = true;
37 }
38 // we need to create one aggregate per column in the select_list
39 for (idx_t i = 0; i < types.size(); ++i) {
40 auto logical_type = types[i];
41 // check if we can directly refer to a group, or if we need to push an aggregate with FIRST
42 auto entry = group_by_references.find(x: i);
43 if (entry != group_by_references.end()) {
44 auto group_index = entry->second;
45 // entry is found: can directly refer to a group
46 projections.push_back(x: make_uniq<BoundReferenceExpression>(args&: logical_type, args&: group_index));
47 if (group_index != i) {
48 // we require a projection only if this group element is out of order
49 requires_projection = true;
50 }
51 } else {
52 if (op.distinct_type == DistinctType::DISTINCT && op.order_by) {
53 throw InternalException("Entry that is not a group, but not a DISTINCT ON aggregate");
54 }
55 // entry is not one of the groups: need to push a FIRST aggregate
56 auto bound = make_uniq<BoundReferenceExpression>(args&: logical_type, args&: i);
57 vector<unique_ptr<Expression>> first_children;
58 first_children.push_back(x: std::move(bound));
59
60 FunctionBinder function_binder(context);
61 auto first_aggregate = function_binder.BindAggregateFunction(
62 bound_function: FirstFun::GetFunction(type: logical_type), children: std::move(first_children), filter: nullptr, aggr_type: AggregateType::NON_DISTINCT);
63 first_aggregate->order_bys = op.order_by ? op.order_by->Copy() : nullptr;
64 // add the projection
65 projections.push_back(x: make_uniq<BoundReferenceExpression>(args&: logical_type, args: group_count + aggregates.size()));
66 // push it to the list of aggregates
67 aggregate_types.push_back(x: logical_type);
68 aggregates.push_back(x: std::move(first_aggregate));
69 requires_projection = true;
70 }
71 }
72
73 child = ExtractAggregateExpressions(child: std::move(child), aggregates, groups);
74
75 // we add a physical hash aggregation in the plan to select the distinct groups
76 auto groupby = make_uniq<PhysicalHashAggregate>(args&: context, args&: aggregate_types, args: std::move(aggregates), args: std::move(groups),
77 args&: child->estimated_cardinality);
78 groupby->children.push_back(x: std::move(child));
79 if (!requires_projection) {
80 return std::move(groupby);
81 }
82
83 // we add a physical projection on top of the aggregation to project all members in the select list
84 auto aggr_projection = make_uniq<PhysicalProjection>(args: types, args: std::move(projections), args&: groupby->estimated_cardinality);
85 aggr_projection->children.push_back(x: std::move(groupby));
86 return std::move(aggr_projection);
87}
88
89} // namespace duckdb
90