| 1 | #include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp" |
| 2 | #include "duckdb/execution/operator/projection/physical_projection.hpp" |
| 3 | #include "duckdb/execution/physical_plan_generator.hpp" |
| 4 | #include "duckdb/function/aggregate/distributive_functions.hpp" |
| 5 | #include "duckdb/planner/expression/bound_aggregate_expression.hpp" |
| 6 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 7 | #include "duckdb/planner/operator/logical_distinct.hpp" |
| 8 | #include "duckdb/function/function_binder.hpp" |
| 9 | |
| 10 | namespace duckdb { |
| 11 | |
| 12 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalDistinct &op) { |
| 13 | D_ASSERT(op.children.size() == 1); |
| 14 | auto child = CreatePlan(op&: *op.children[0]); |
| 15 | auto &distinct_targets = op.distinct_targets; |
| 16 | D_ASSERT(child); |
| 17 | D_ASSERT(!distinct_targets.empty()); |
| 18 | |
| 19 | auto &types = child->GetTypes(); |
| 20 | vector<unique_ptr<Expression>> groups, aggregates, projections; |
| 21 | idx_t group_count = distinct_targets.size(); |
| 22 | unordered_map<idx_t, idx_t> group_by_references; |
| 23 | vector<LogicalType> aggregate_types; |
| 24 | // creates one group per distinct_target |
| 25 | for (idx_t i = 0; i < distinct_targets.size(); i++) { |
| 26 | auto &target = distinct_targets[i]; |
| 27 | if (target->type == ExpressionType::BOUND_REF) { |
| 28 | auto &bound_ref = target->Cast<BoundReferenceExpression>(); |
| 29 | group_by_references[bound_ref.index] = i; |
| 30 | } |
| 31 | aggregate_types.push_back(x: target->return_type); |
| 32 | groups.push_back(x: std::move(target)); |
| 33 | } |
| 34 | bool requires_projection = false; |
| 35 | if (types.size() != group_count) { |
| 36 | requires_projection = true; |
| 37 | } |
| 38 | // we need to create one aggregate per column in the select_list |
| 39 | for (idx_t i = 0; i < types.size(); ++i) { |
| 40 | auto logical_type = types[i]; |
| 41 | // check if we can directly refer to a group, or if we need to push an aggregate with FIRST |
| 42 | auto entry = group_by_references.find(x: i); |
| 43 | if (entry != group_by_references.end()) { |
| 44 | auto group_index = entry->second; |
| 45 | // entry is found: can directly refer to a group |
| 46 | projections.push_back(x: make_uniq<BoundReferenceExpression>(args&: logical_type, args&: group_index)); |
| 47 | if (group_index != i) { |
| 48 | // we require a projection only if this group element is out of order |
| 49 | requires_projection = true; |
| 50 | } |
| 51 | } else { |
| 52 | if (op.distinct_type == DistinctType::DISTINCT && op.order_by) { |
| 53 | throw InternalException("Entry that is not a group, but not a DISTINCT ON aggregate" ); |
| 54 | } |
| 55 | // entry is not one of the groups: need to push a FIRST aggregate |
| 56 | auto bound = make_uniq<BoundReferenceExpression>(args&: logical_type, args&: i); |
| 57 | vector<unique_ptr<Expression>> first_children; |
| 58 | first_children.push_back(x: std::move(bound)); |
| 59 | |
| 60 | FunctionBinder function_binder(context); |
| 61 | auto first_aggregate = function_binder.BindAggregateFunction( |
| 62 | bound_function: FirstFun::GetFunction(type: logical_type), children: std::move(first_children), filter: nullptr, aggr_type: AggregateType::NON_DISTINCT); |
| 63 | first_aggregate->order_bys = op.order_by ? op.order_by->Copy() : nullptr; |
| 64 | // add the projection |
| 65 | projections.push_back(x: make_uniq<BoundReferenceExpression>(args&: logical_type, args: group_count + aggregates.size())); |
| 66 | // push it to the list of aggregates |
| 67 | aggregate_types.push_back(x: logical_type); |
| 68 | aggregates.push_back(x: std::move(first_aggregate)); |
| 69 | requires_projection = true; |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | child = ExtractAggregateExpressions(child: std::move(child), aggregates, groups); |
| 74 | |
| 75 | // we add a physical hash aggregation in the plan to select the distinct groups |
| 76 | auto groupby = make_uniq<PhysicalHashAggregate>(args&: context, args&: aggregate_types, args: std::move(aggregates), args: std::move(groups), |
| 77 | args&: child->estimated_cardinality); |
| 78 | groupby->children.push_back(x: std::move(child)); |
| 79 | if (!requires_projection) { |
| 80 | return std::move(groupby); |
| 81 | } |
| 82 | |
| 83 | // we add a physical projection on top of the aggregation to project all members in the select list |
| 84 | auto aggr_projection = make_uniq<PhysicalProjection>(args: types, args: std::move(projections), args&: groupby->estimated_cardinality); |
| 85 | aggr_projection->children.push_back(x: std::move(groupby)); |
| 86 | return std::move(aggr_projection); |
| 87 | } |
| 88 | |
| 89 | } // namespace duckdb |
| 90 | |