1 | #include "duckdb/execution/operator/aggregate/physical_streaming_window.hpp" |
2 | #include "duckdb/execution/operator/aggregate/physical_window.hpp" |
3 | #include "duckdb/execution/operator/projection/physical_projection.hpp" |
4 | #include "duckdb/execution/physical_plan_generator.hpp" |
5 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
6 | #include "duckdb/planner/expression/bound_window_expression.hpp" |
7 | #include "duckdb/planner/operator/logical_window.hpp" |
8 | |
9 | #include <numeric> |
10 | |
11 | namespace duckdb { |
12 | |
13 | static bool IsStreamingWindow(unique_ptr<Expression> &expr) { |
14 | auto &wexpr = expr->Cast<BoundWindowExpression>(); |
15 | if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls) { |
16 | return false; |
17 | } |
18 | switch (wexpr.type) { |
19 | // TODO: add more expression types here? |
20 | case ExpressionType::WINDOW_AGGREGATE: |
21 | // We can stream aggregates if they are "running totals" and don't use filters |
22 | return wexpr.start == WindowBoundary::UNBOUNDED_PRECEDING && wexpr.end == WindowBoundary::CURRENT_ROW_ROWS && |
23 | !wexpr.filter_expr; |
24 | case ExpressionType::WINDOW_FIRST_VALUE: |
25 | case ExpressionType::WINDOW_PERCENT_RANK: |
26 | case ExpressionType::WINDOW_RANK: |
27 | case ExpressionType::WINDOW_RANK_DENSE: |
28 | case ExpressionType::WINDOW_ROW_NUMBER: |
29 | return true; |
30 | default: |
31 | return false; |
32 | } |
33 | } |
34 | |
35 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalWindow &op) { |
36 | D_ASSERT(op.children.size() == 1); |
37 | |
38 | auto plan = CreatePlan(op&: *op.children[0]); |
39 | #ifdef DEBUG |
40 | for (auto &expr : op.expressions) { |
41 | D_ASSERT(expr->IsWindow()); |
42 | } |
43 | #endif |
44 | |
45 | op.estimated_cardinality = op.EstimateCardinality(context); |
46 | |
47 | // Slice types |
48 | auto types = op.types; |
49 | const auto output_idx = types.size() - op.expressions.size(); |
50 | types.resize(new_size: output_idx); |
51 | |
52 | // Identify streaming windows |
53 | vector<idx_t> blocking_windows; |
54 | vector<idx_t> streaming_windows; |
55 | for (idx_t expr_idx = 0; expr_idx < op.expressions.size(); expr_idx++) { |
56 | if (IsStreamingWindow(expr&: op.expressions[expr_idx])) { |
57 | streaming_windows.push_back(x: expr_idx); |
58 | } else { |
59 | blocking_windows.push_back(x: expr_idx); |
60 | } |
61 | } |
62 | |
63 | // Process the window functions by sharing the partition/order definitions |
64 | vector<idx_t> evaluation_order; |
65 | while (!blocking_windows.empty() || !streaming_windows.empty()) { |
66 | const bool process_streaming = blocking_windows.empty(); |
67 | auto &remaining = process_streaming ? streaming_windows : blocking_windows; |
68 | |
69 | // Find all functions that share the partitioning of the first remaining expression |
70 | const auto over_idx = remaining[0]; |
71 | auto &over_expr = op.expressions[over_idx]->Cast<BoundWindowExpression>(); |
72 | |
73 | vector<idx_t> matching; |
74 | vector<idx_t> unprocessed; |
75 | for (const auto &expr_idx : remaining) { |
76 | D_ASSERT(op.expressions[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
77 | auto &wexpr = op.expressions[expr_idx]->Cast<BoundWindowExpression>(); |
78 | if (over_expr.KeysAreCompatible(other: wexpr)) { |
79 | matching.emplace_back(args: expr_idx); |
80 | } else { |
81 | unprocessed.emplace_back(args: expr_idx); |
82 | } |
83 | } |
84 | remaining.swap(x&: unprocessed); |
85 | |
86 | // Extract the matching expressions |
87 | vector<unique_ptr<Expression>> select_list; |
88 | for (const auto &expr_idx : matching) { |
89 | select_list.emplace_back(args: std::move(op.expressions[expr_idx])); |
90 | types.emplace_back(args&: op.types[output_idx + expr_idx]); |
91 | } |
92 | |
93 | // Chain the new window operator on top of the plan |
94 | unique_ptr<PhysicalOperator> window; |
95 | if (process_streaming) { |
96 | window = make_uniq<PhysicalStreamingWindow>(args&: types, args: std::move(select_list), args&: op.estimated_cardinality); |
97 | } else { |
98 | window = make_uniq<PhysicalWindow>(args&: types, args: std::move(select_list), args&: op.estimated_cardinality); |
99 | } |
100 | window->children.push_back(x: std::move(plan)); |
101 | plan = std::move(window); |
102 | |
103 | // Remember the projection order if we changed it |
104 | if (!streaming_windows.empty() || !blocking_windows.empty() || !evaluation_order.empty()) { |
105 | evaluation_order.insert(position: evaluation_order.end(), first: matching.begin(), last: matching.end()); |
106 | } |
107 | } |
108 | |
109 | // Put everything back into place if it moved |
110 | if (!evaluation_order.empty()) { |
111 | vector<unique_ptr<Expression>> select_list(op.types.size()); |
112 | // The inputs don't move |
113 | for (idx_t i = 0; i < output_idx; ++i) { |
114 | select_list[i] = make_uniq<BoundReferenceExpression>(args&: op.types[i], args&: i); |
115 | } |
116 | // The outputs have been rearranged |
117 | for (idx_t i = 0; i < evaluation_order.size(); ++i) { |
118 | const auto expr_idx = evaluation_order[i] + output_idx; |
119 | select_list[expr_idx] = make_uniq<BoundReferenceExpression>(args&: op.types[expr_idx], args: i + output_idx); |
120 | } |
121 | auto proj = make_uniq<PhysicalProjection>(args&: op.types, args: std::move(select_list), args&: op.estimated_cardinality); |
122 | proj->children.push_back(x: std::move(plan)); |
123 | plan = std::move(proj); |
124 | } |
125 | |
126 | return plan; |
127 | } |
128 | |
129 | } // namespace duckdb |
130 | |