| 1 | #include "duckdb/execution/operator/aggregate/physical_streaming_window.hpp" |
| 2 | #include "duckdb/execution/operator/aggregate/physical_window.hpp" |
| 3 | #include "duckdb/execution/operator/projection/physical_projection.hpp" |
| 4 | #include "duckdb/execution/physical_plan_generator.hpp" |
| 5 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 6 | #include "duckdb/planner/expression/bound_window_expression.hpp" |
| 7 | #include "duckdb/planner/operator/logical_window.hpp" |
| 8 | |
| 9 | #include <numeric> |
| 10 | |
| 11 | namespace duckdb { |
| 12 | |
| 13 | static bool IsStreamingWindow(unique_ptr<Expression> &expr) { |
| 14 | auto &wexpr = expr->Cast<BoundWindowExpression>(); |
| 15 | if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls) { |
| 16 | return false; |
| 17 | } |
| 18 | switch (wexpr.type) { |
| 19 | // TODO: add more expression types here? |
| 20 | case ExpressionType::WINDOW_AGGREGATE: |
| 21 | // We can stream aggregates if they are "running totals" and don't use filters |
| 22 | return wexpr.start == WindowBoundary::UNBOUNDED_PRECEDING && wexpr.end == WindowBoundary::CURRENT_ROW_ROWS && |
| 23 | !wexpr.filter_expr; |
| 24 | case ExpressionType::WINDOW_FIRST_VALUE: |
| 25 | case ExpressionType::WINDOW_PERCENT_RANK: |
| 26 | case ExpressionType::WINDOW_RANK: |
| 27 | case ExpressionType::WINDOW_RANK_DENSE: |
| 28 | case ExpressionType::WINDOW_ROW_NUMBER: |
| 29 | return true; |
| 30 | default: |
| 31 | return false; |
| 32 | } |
| 33 | } |
| 34 | |
| 35 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalWindow &op) { |
| 36 | D_ASSERT(op.children.size() == 1); |
| 37 | |
| 38 | auto plan = CreatePlan(op&: *op.children[0]); |
| 39 | #ifdef DEBUG |
| 40 | for (auto &expr : op.expressions) { |
| 41 | D_ASSERT(expr->IsWindow()); |
| 42 | } |
| 43 | #endif |
| 44 | |
| 45 | op.estimated_cardinality = op.EstimateCardinality(context); |
| 46 | |
| 47 | // Slice types |
| 48 | auto types = op.types; |
| 49 | const auto output_idx = types.size() - op.expressions.size(); |
| 50 | types.resize(new_size: output_idx); |
| 51 | |
| 52 | // Identify streaming windows |
| 53 | vector<idx_t> blocking_windows; |
| 54 | vector<idx_t> streaming_windows; |
| 55 | for (idx_t expr_idx = 0; expr_idx < op.expressions.size(); expr_idx++) { |
| 56 | if (IsStreamingWindow(expr&: op.expressions[expr_idx])) { |
| 57 | streaming_windows.push_back(x: expr_idx); |
| 58 | } else { |
| 59 | blocking_windows.push_back(x: expr_idx); |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | // Process the window functions by sharing the partition/order definitions |
| 64 | vector<idx_t> evaluation_order; |
| 65 | while (!blocking_windows.empty() || !streaming_windows.empty()) { |
| 66 | const bool process_streaming = blocking_windows.empty(); |
| 67 | auto &remaining = process_streaming ? streaming_windows : blocking_windows; |
| 68 | |
| 69 | // Find all functions that share the partitioning of the first remaining expression |
| 70 | const auto over_idx = remaining[0]; |
| 71 | auto &over_expr = op.expressions[over_idx]->Cast<BoundWindowExpression>(); |
| 72 | |
| 73 | vector<idx_t> matching; |
| 74 | vector<idx_t> unprocessed; |
| 75 | for (const auto &expr_idx : remaining) { |
| 76 | D_ASSERT(op.expressions[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
| 77 | auto &wexpr = op.expressions[expr_idx]->Cast<BoundWindowExpression>(); |
| 78 | if (over_expr.KeysAreCompatible(other: wexpr)) { |
| 79 | matching.emplace_back(args: expr_idx); |
| 80 | } else { |
| 81 | unprocessed.emplace_back(args: expr_idx); |
| 82 | } |
| 83 | } |
| 84 | remaining.swap(x&: unprocessed); |
| 85 | |
| 86 | // Extract the matching expressions |
| 87 | vector<unique_ptr<Expression>> select_list; |
| 88 | for (const auto &expr_idx : matching) { |
| 89 | select_list.emplace_back(args: std::move(op.expressions[expr_idx])); |
| 90 | types.emplace_back(args&: op.types[output_idx + expr_idx]); |
| 91 | } |
| 92 | |
| 93 | // Chain the new window operator on top of the plan |
| 94 | unique_ptr<PhysicalOperator> window; |
| 95 | if (process_streaming) { |
| 96 | window = make_uniq<PhysicalStreamingWindow>(args&: types, args: std::move(select_list), args&: op.estimated_cardinality); |
| 97 | } else { |
| 98 | window = make_uniq<PhysicalWindow>(args&: types, args: std::move(select_list), args&: op.estimated_cardinality); |
| 99 | } |
| 100 | window->children.push_back(x: std::move(plan)); |
| 101 | plan = std::move(window); |
| 102 | |
| 103 | // Remember the projection order if we changed it |
| 104 | if (!streaming_windows.empty() || !blocking_windows.empty() || !evaluation_order.empty()) { |
| 105 | evaluation_order.insert(position: evaluation_order.end(), first: matching.begin(), last: matching.end()); |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | // Put everything back into place if it moved |
| 110 | if (!evaluation_order.empty()) { |
| 111 | vector<unique_ptr<Expression>> select_list(op.types.size()); |
| 112 | // The inputs don't move |
| 113 | for (idx_t i = 0; i < output_idx; ++i) { |
| 114 | select_list[i] = make_uniq<BoundReferenceExpression>(args&: op.types[i], args&: i); |
| 115 | } |
| 116 | // The outputs have been rearranged |
| 117 | for (idx_t i = 0; i < evaluation_order.size(); ++i) { |
| 118 | const auto expr_idx = evaluation_order[i] + output_idx; |
| 119 | select_list[expr_idx] = make_uniq<BoundReferenceExpression>(args&: op.types[expr_idx], args: i + output_idx); |
| 120 | } |
| 121 | auto proj = make_uniq<PhysicalProjection>(args&: op.types, args: std::move(select_list), args&: op.estimated_cardinality); |
| 122 | proj->children.push_back(x: std::move(plan)); |
| 123 | plan = std::move(proj); |
| 124 | } |
| 125 | |
| 126 | return plan; |
| 127 | } |
| 128 | |
| 129 | } // namespace duckdb |
| 130 | |