1#include "duckdb/execution/operator/aggregate/physical_window.hpp"
2#include "duckdb/execution/operator/join/physical_asof_join.hpp"
3#include "duckdb/execution/operator/join/physical_iejoin.hpp"
4#include "duckdb/execution/operator/projection/physical_projection.hpp"
5#include "duckdb/execution/physical_plan_generator.hpp"
6#include "duckdb/main/client_context.hpp"
7#include "duckdb/planner/expression/bound_constant_expression.hpp"
8#include "duckdb/planner/expression/bound_reference_expression.hpp"
9#include "duckdb/planner/expression/bound_window_expression.hpp"
10#include "duckdb/planner/operator/logical_asof_join.hpp"
11
12namespace duckdb {
13
14unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalAsOfJoin &op) {
15 // now visit the children
16 D_ASSERT(op.children.size() == 2);
17 idx_t lhs_cardinality = op.children[0]->EstimateCardinality(context);
18 idx_t rhs_cardinality = op.children[1]->EstimateCardinality(context);
19 auto left = CreatePlan(op&: *op.children[0]);
20 auto right = CreatePlan(op&: *op.children[1]);
21 D_ASSERT(left && right);
22
23 // Validate
24 vector<idx_t> equi_indexes;
25 auto asof_idx = op.conditions.size();
26 for (size_t c = 0; c < op.conditions.size(); ++c) {
27 auto &cond = op.conditions[c];
28 switch (cond.comparison) {
29 case ExpressionType::COMPARE_EQUAL:
30 case ExpressionType::COMPARE_NOT_DISTINCT_FROM:
31 equi_indexes.emplace_back(args&: c);
32 break;
33 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
34 D_ASSERT(asof_idx == op.conditions.size());
35 asof_idx = c;
36 break;
37 default:
38 throw InternalException("Invalid ASOF JOIN comparison");
39 }
40 }
41 D_ASSERT(asof_idx < op.conditions.size());
42
43 if (!ClientConfig::GetConfig(context).force_asof_iejoin) {
44 return make_uniq<PhysicalAsOfJoin>(args&: op, args: std::move(left), args: std::move(right));
45 }
46
47 // Debug implementation: IEJoin of Window
48 // LEAD(asof_column, 1, infinity) OVER (PARTITION BY equi_column... ORDER BY asof_column) AS asof_temp
49 auto &asof_comp = op.conditions[asof_idx];
50 auto &asof_column = asof_comp.right;
51 auto asof_type = asof_column->return_type;
52 auto asof_temp = make_uniq<BoundWindowExpression>(args: ExpressionType::WINDOW_LEAD, args&: asof_type, args: nullptr, args: nullptr);
53 asof_temp->children.emplace_back(args: asof_column->Copy());
54 asof_temp->offset_expr = make_uniq<BoundConstantExpression>(args: Value::BIGINT(value: 1));
55 // TODO: If infinities are not supported for a type, fake them by looking at LHS statistics?
56 asof_temp->default_expr = make_uniq<BoundConstantExpression>(args: Value::Infinity(type: asof_type));
57 for (auto equi_idx : equi_indexes) {
58 asof_temp->partitions.emplace_back(args: op.conditions[equi_idx].right->Copy());
59 }
60 asof_temp->orders.emplace_back(args: OrderType::ASCENDING, args: OrderByNullType::NULLS_FIRST, args: asof_column->Copy());
61 asof_temp->start = WindowBoundary::UNBOUNDED_PRECEDING;
62 asof_temp->end = WindowBoundary::CURRENT_ROW_ROWS;
63
64 vector<unique_ptr<Expression>> window_select;
65 window_select.emplace_back(args: std::move(asof_temp));
66
67 auto window_types = right->types;
68 window_types.emplace_back(args&: asof_type);
69
70 auto window = make_uniq<PhysicalWindow>(args&: window_types, args: std::move(window_select), args&: rhs_cardinality);
71 window->children.emplace_back(args: std::move(right));
72
73 // IEJoin(left, window, conditions || asof_column < asof_temp)
74 JoinCondition asof_upper;
75 asof_upper.left = asof_comp.left->Copy();
76 asof_upper.right = make_uniq<BoundReferenceExpression>(args&: asof_type, args: window_types.size() - 1);
77 asof_upper.comparison = ExpressionType::COMPARE_LESSTHAN;
78
79 // We have an equality condition, so we may have to deal with projection maps.
80 // IEJoin does not (currently) support them, so we have to do it manually
81 auto proj_types = op.types;
82 op.types.clear();
83
84 auto lhs_types = op.children[0]->types;
85 op.types = lhs_types;
86
87 auto rhs_types = op.children[1]->types;
88 op.types.insert(position: op.types.end(), first: rhs_types.begin(), last: rhs_types.end());
89
90 op.types.emplace_back(args&: asof_type);
91 op.conditions.emplace_back(args: std::move(asof_upper));
92 auto iejoin = make_uniq<PhysicalIEJoin>(args&: op, args: std::move(left), args: std::move(window), args: std::move(op.conditions),
93 args&: op.join_type, args&: op.estimated_cardinality);
94
95 // Project away asof_temp and anything from the projection maps
96 auto proj = PhysicalProjection::CreateJoinProjection(proj_types, lhs_types, rhs_types, left_projection_map: op.left_projection_map,
97 right_projection_map: op.right_projection_map, estimated_cardinality: lhs_cardinality);
98 proj->children.push_back(x: std::move(iejoin));
99
100 return proj;
101}
102
103} // namespace duckdb
104