1#include "duckdb/execution/operator/aggregate/physical_streaming_window.hpp"
2#include "duckdb/execution/operator/aggregate/physical_window.hpp"
3#include "duckdb/execution/operator/projection/physical_projection.hpp"
4#include "duckdb/execution/physical_plan_generator.hpp"
5#include "duckdb/planner/expression/bound_reference_expression.hpp"
6#include "duckdb/planner/expression/bound_window_expression.hpp"
7#include "duckdb/planner/operator/logical_window.hpp"
8
9#include <numeric>
10
11namespace duckdb {
12
13static bool IsStreamingWindow(unique_ptr<Expression> &expr) {
14 auto &wexpr = expr->Cast<BoundWindowExpression>();
15 if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls) {
16 return false;
17 }
18 switch (wexpr.type) {
19 // TODO: add more expression types here?
20 case ExpressionType::WINDOW_AGGREGATE:
21 // We can stream aggregates if they are "running totals" and don't use filters
22 return wexpr.start == WindowBoundary::UNBOUNDED_PRECEDING && wexpr.end == WindowBoundary::CURRENT_ROW_ROWS &&
23 !wexpr.filter_expr;
24 case ExpressionType::WINDOW_FIRST_VALUE:
25 case ExpressionType::WINDOW_PERCENT_RANK:
26 case ExpressionType::WINDOW_RANK:
27 case ExpressionType::WINDOW_RANK_DENSE:
28 case ExpressionType::WINDOW_ROW_NUMBER:
29 return true;
30 default:
31 return false;
32 }
33}
34
35unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalWindow &op) {
36 D_ASSERT(op.children.size() == 1);
37
38 auto plan = CreatePlan(op&: *op.children[0]);
39#ifdef DEBUG
40 for (auto &expr : op.expressions) {
41 D_ASSERT(expr->IsWindow());
42 }
43#endif
44
45 op.estimated_cardinality = op.EstimateCardinality(context);
46
47 // Slice types
48 auto types = op.types;
49 const auto output_idx = types.size() - op.expressions.size();
50 types.resize(new_size: output_idx);
51
52 // Identify streaming windows
53 vector<idx_t> blocking_windows;
54 vector<idx_t> streaming_windows;
55 for (idx_t expr_idx = 0; expr_idx < op.expressions.size(); expr_idx++) {
56 if (IsStreamingWindow(expr&: op.expressions[expr_idx])) {
57 streaming_windows.push_back(x: expr_idx);
58 } else {
59 blocking_windows.push_back(x: expr_idx);
60 }
61 }
62
63 // Process the window functions by sharing the partition/order definitions
64 vector<idx_t> evaluation_order;
65 while (!blocking_windows.empty() || !streaming_windows.empty()) {
66 const bool process_streaming = blocking_windows.empty();
67 auto &remaining = process_streaming ? streaming_windows : blocking_windows;
68
69 // Find all functions that share the partitioning of the first remaining expression
70 const auto over_idx = remaining[0];
71 auto &over_expr = op.expressions[over_idx]->Cast<BoundWindowExpression>();
72
73 vector<idx_t> matching;
74 vector<idx_t> unprocessed;
75 for (const auto &expr_idx : remaining) {
76 D_ASSERT(op.expressions[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW);
77 auto &wexpr = op.expressions[expr_idx]->Cast<BoundWindowExpression>();
78 if (over_expr.KeysAreCompatible(other: wexpr)) {
79 matching.emplace_back(args: expr_idx);
80 } else {
81 unprocessed.emplace_back(args: expr_idx);
82 }
83 }
84 remaining.swap(x&: unprocessed);
85
86 // Extract the matching expressions
87 vector<unique_ptr<Expression>> select_list;
88 for (const auto &expr_idx : matching) {
89 select_list.emplace_back(args: std::move(op.expressions[expr_idx]));
90 types.emplace_back(args&: op.types[output_idx + expr_idx]);
91 }
92
93 // Chain the new window operator on top of the plan
94 unique_ptr<PhysicalOperator> window;
95 if (process_streaming) {
96 window = make_uniq<PhysicalStreamingWindow>(args&: types, args: std::move(select_list), args&: op.estimated_cardinality);
97 } else {
98 window = make_uniq<PhysicalWindow>(args&: types, args: std::move(select_list), args&: op.estimated_cardinality);
99 }
100 window->children.push_back(x: std::move(plan));
101 plan = std::move(window);
102
103 // Remember the projection order if we changed it
104 if (!streaming_windows.empty() || !blocking_windows.empty() || !evaluation_order.empty()) {
105 evaluation_order.insert(position: evaluation_order.end(), first: matching.begin(), last: matching.end());
106 }
107 }
108
109 // Put everything back into place if it moved
110 if (!evaluation_order.empty()) {
111 vector<unique_ptr<Expression>> select_list(op.types.size());
112 // The inputs don't move
113 for (idx_t i = 0; i < output_idx; ++i) {
114 select_list[i] = make_uniq<BoundReferenceExpression>(args&: op.types[i], args&: i);
115 }
116 // The outputs have been rearranged
117 for (idx_t i = 0; i < evaluation_order.size(); ++i) {
118 const auto expr_idx = evaluation_order[i] + output_idx;
119 select_list[expr_idx] = make_uniq<BoundReferenceExpression>(args&: op.types[expr_idx], args: i + output_idx);
120 }
121 auto proj = make_uniq<PhysicalProjection>(args&: op.types, args: std::move(select_list), args&: op.estimated_cardinality);
122 proj->children.push_back(x: std::move(plan));
123 plan = std::move(proj);
124 }
125
126 return plan;
127}
128
129} // namespace duckdb
130