1#include "duckdb/function/aggregate/distributive_functions.hpp"
2#include "duckdb/main/client_config.hpp"
3#include "duckdb/planner/binder.hpp"
4#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
5#include "duckdb/planner/expression/bound_cast_expression.hpp"
6#include "duckdb/planner/expression/bound_columnref_expression.hpp"
7#include "duckdb/planner/expression/bound_comparison_expression.hpp"
8#include "duckdb/planner/expression/bound_constant_expression.hpp"
9#include "duckdb/planner/expression/bound_reference_expression.hpp"
10#include "duckdb/planner/expression/bound_subquery_expression.hpp"
11#include "duckdb/planner/expression/bound_window_expression.hpp"
12#include "duckdb/planner/expression_iterator.hpp"
13#include "duckdb/planner/operator/list.hpp"
14#include "duckdb/planner/operator/logical_window.hpp"
15#include "duckdb/function/function_binder.hpp"
16#include "duckdb/planner/subquery/flatten_dependent_join.hpp"
17
18namespace duckdb {
19
20static unique_ptr<Expression> PlanUncorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
21 unique_ptr<LogicalOperator> &root,
22 unique_ptr<LogicalOperator> plan) {
23 D_ASSERT(!expr.IsCorrelated());
24 switch (expr.subquery_type) {
25 case SubqueryType::EXISTS: {
26 // uncorrelated EXISTS
27 // we only care about existence, hence we push a LIMIT 1 operator
28 auto limit = make_uniq<LogicalLimit>(args: 1, args: 0, args: nullptr, args: nullptr);
29 limit->AddChild(child: std::move(plan));
30 plan = std::move(limit);
31
32 // now we push a COUNT(*) aggregate onto the limit, this will be either 0 or 1 (EXISTS or NOT EXISTS)
33 auto count_star_fun = CountStarFun::GetFunction();
34
35 FunctionBinder function_binder(binder.context);
36 auto count_star =
37 function_binder.BindAggregateFunction(bound_function: count_star_fun, children: {}, filter: nullptr, aggr_type: AggregateType::NON_DISTINCT);
38 auto idx_type = count_star->return_type;
39 vector<unique_ptr<Expression>> aggregate_list;
40 aggregate_list.push_back(x: std::move(count_star));
41 auto aggregate_index = binder.GenerateTableIndex();
42 auto aggregate =
43 make_uniq<LogicalAggregate>(args: binder.GenerateTableIndex(), args&: aggregate_index, args: std::move(aggregate_list));
44 aggregate->AddChild(child: std::move(plan));
45 plan = std::move(aggregate);
46
47 // now we push a projection with a comparison to 1
48 auto left_child = make_uniq<BoundColumnRefExpression>(args&: idx_type, args: ColumnBinding(aggregate_index, 0));
49 auto right_child = make_uniq<BoundConstantExpression>(args: Value::Numeric(type: idx_type, value: 1));
50 auto comparison = make_uniq<BoundComparisonExpression>(args: ExpressionType::COMPARE_EQUAL, args: std::move(left_child),
51 args: std::move(right_child));
52
53 vector<unique_ptr<Expression>> projection_list;
54 projection_list.push_back(x: std::move(comparison));
55 auto projection_index = binder.GenerateTableIndex();
56 auto projection = make_uniq<LogicalProjection>(args&: projection_index, args: std::move(projection_list));
57 projection->AddChild(child: std::move(plan));
58 plan = std::move(projection);
59
60 // we add it to the main query by adding a cross product
61 // FIXME: should use something else besides cross product as we always add only one scalar constant
62 root = LogicalCrossProduct::Create(left: std::move(root), right: std::move(plan));
63
64 // we replace the original subquery with a ColumnRefExpression referring to the result of the projection (either
65 // TRUE or FALSE)
66 return make_uniq<BoundColumnRefExpression>(args: expr.GetName(), args: LogicalType::BOOLEAN,
67 args: ColumnBinding(projection_index, 0));
68 }
69 case SubqueryType::SCALAR: {
70 // uncorrelated scalar, we want to return the first entry
71 // figure out the table index of the bound table of the entry which we want to return
72 auto bindings = plan->GetColumnBindings();
73 D_ASSERT(bindings.size() == 1);
74 idx_t table_idx = bindings[0].table_index;
75
76 // in the uncorrelated case we are only interested in the first result of the query
77 // hence we simply push a LIMIT 1 to get the first row of the subquery
78 auto limit = make_uniq<LogicalLimit>(args: 1, args: 0, args: nullptr, args: nullptr);
79 limit->AddChild(child: std::move(plan));
80 plan = std::move(limit);
81
82 // we push an aggregate that returns the FIRST element
83 vector<unique_ptr<Expression>> expressions;
84 auto bound = make_uniq<BoundColumnRefExpression>(args&: expr.return_type, args: ColumnBinding(table_idx, 0));
85 vector<unique_ptr<Expression>> first_children;
86 first_children.push_back(x: std::move(bound));
87
88 FunctionBinder function_binder(binder.context);
89 auto first_agg = function_binder.BindAggregateFunction(
90 bound_function: FirstFun::GetFunction(type: expr.return_type), children: std::move(first_children), filter: nullptr, aggr_type: AggregateType::NON_DISTINCT);
91
92 expressions.push_back(x: std::move(first_agg));
93 auto aggr_index = binder.GenerateTableIndex();
94 auto aggr = make_uniq<LogicalAggregate>(args: binder.GenerateTableIndex(), args&: aggr_index, args: std::move(expressions));
95 aggr->AddChild(child: std::move(plan));
96 plan = std::move(aggr);
97
98 // in the uncorrelated case, we add the value to the main query through a cross product
99 // FIXME: should use something else besides cross product as we always add only one scalar constant and cross
100 // product is not optimized for this.
101 D_ASSERT(root);
102 root = LogicalCrossProduct::Create(left: std::move(root), right: std::move(plan));
103
104 // we replace the original subquery with a BoundColumnRefExpression referring to the first result of the
105 // aggregation
106 return make_uniq<BoundColumnRefExpression>(args: expr.GetName(), args&: expr.return_type, args: ColumnBinding(aggr_index, 0));
107 }
108 default: {
109 D_ASSERT(expr.subquery_type == SubqueryType::ANY);
110 // we generate a MARK join that results in either (TRUE, FALSE or NULL)
111 // subquery has NULL values -> result is (TRUE or NULL)
112 // subquery has no NULL values -> result is (TRUE, FALSE or NULL [if input is NULL])
113 // fetch the column bindings
114 auto plan_columns = plan->GetColumnBindings();
115
116 // then we generate the MARK join with the subquery
117 idx_t mark_index = binder.GenerateTableIndex();
118 auto join = make_uniq<LogicalComparisonJoin>(args: JoinType::MARK);
119 join->mark_index = mark_index;
120 join->AddChild(child: std::move(root));
121 join->AddChild(child: std::move(plan));
122 // create the JOIN condition
123 JoinCondition cond;
124 cond.left = std::move(expr.child);
125 cond.right = BoundCastExpression::AddDefaultCastToType(
126 expr: make_uniq<BoundColumnRefExpression>(args&: expr.child_type, args&: plan_columns[0]), target_type: expr.child_target);
127 cond.comparison = expr.comparison_type;
128 join->conditions.push_back(x: std::move(cond));
129 root = std::move(join);
130
131 // we replace the original subquery with a BoundColumnRefExpression referring to the mark column
132 return make_uniq<BoundColumnRefExpression>(args: expr.GetName(), args&: expr.return_type, args: ColumnBinding(mark_index, 0));
133 }
134 }
135}
136
137static unique_ptr<LogicalDelimJoin>
138CreateDuplicateEliminatedJoin(const vector<CorrelatedColumnInfo> &correlated_columns, JoinType join_type,
139 unique_ptr<LogicalOperator> original_plan, bool perform_delim) {
140 auto delim_join = make_uniq<LogicalDelimJoin>(args&: join_type);
141 if (!perform_delim) {
142 // if we are not performing a delim join, we push a row_number() OVER() window operator on the LHS
143 // and perform all duplicate elimination on that row number instead
144 D_ASSERT(correlated_columns[0].type.id() == LogicalTypeId::BIGINT);
145 auto window = make_uniq<LogicalWindow>(args: correlated_columns[0].binding.table_index);
146 auto row_number =
147 make_uniq<BoundWindowExpression>(args: ExpressionType::WINDOW_ROW_NUMBER, args: LogicalType::BIGINT, args: nullptr, args: nullptr);
148 row_number->start = WindowBoundary::UNBOUNDED_PRECEDING;
149 row_number->end = WindowBoundary::CURRENT_ROW_ROWS;
150 row_number->alias = "delim_index";
151 window->expressions.push_back(x: std::move(row_number));
152 window->AddChild(child: std::move(original_plan));
153 original_plan = std::move(window);
154 }
155 delim_join->AddChild(child: std::move(original_plan));
156 for (idx_t i = 0; i < correlated_columns.size(); i++) {
157 auto &col = correlated_columns[i];
158 delim_join->duplicate_eliminated_columns.push_back(x: make_uniq<BoundColumnRefExpression>(args: col.type, args: col.binding));
159 delim_join->delim_types.push_back(x: col.type);
160 }
161 return delim_join;
162}
163
164static void CreateDelimJoinConditions(LogicalDelimJoin &delim_join,
165 const vector<CorrelatedColumnInfo> &correlated_columns,
166 vector<ColumnBinding> bindings, idx_t base_offset, bool perform_delim) {
167 auto col_count = perform_delim ? correlated_columns.size() : 1;
168 for (idx_t i = 0; i < col_count; i++) {
169 auto &col = correlated_columns[i];
170 auto binding_idx = base_offset + i;
171 if (binding_idx >= bindings.size()) {
172 throw InternalException("Delim join - binding index out of range");
173 }
174 JoinCondition cond;
175 cond.left = make_uniq<BoundColumnRefExpression>(args: col.name, args: col.type, args: col.binding);
176 cond.right = make_uniq<BoundColumnRefExpression>(args: col.name, args: col.type, args&: bindings[binding_idx]);
177 cond.comparison = ExpressionType::COMPARE_NOT_DISTINCT_FROM;
178 delim_join.conditions.push_back(x: std::move(cond));
179 }
180}
181
182static bool PerformDelimOnType(const LogicalType &type) {
183 if (type.InternalType() == PhysicalType::LIST) {
184 return false;
185 }
186 if (type.InternalType() == PhysicalType::STRUCT) {
187 for (auto &entry : StructType::GetChildTypes(type)) {
188 if (!PerformDelimOnType(type: entry.second)) {
189 return false;
190 }
191 }
192 }
193 return true;
194}
195
196static bool PerformDuplicateElimination(Binder &binder, vector<CorrelatedColumnInfo> &correlated_columns) {
197 if (!ClientConfig::GetConfig(context&: binder.context).enable_optimizer) {
198 // if optimizations are disabled we always do a delim join
199 return true;
200 }
201 bool perform_delim = true;
202 for (auto &col : correlated_columns) {
203 if (!PerformDelimOnType(type: col.type)) {
204 perform_delim = false;
205 break;
206 }
207 }
208 if (perform_delim) {
209 return true;
210 }
211 auto binding = ColumnBinding(binder.GenerateTableIndex(), 0);
212 auto type = LogicalType::BIGINT;
213 auto name = "delim_index";
214 CorrelatedColumnInfo info(binding, type, name, 0);
215 correlated_columns.insert(position: correlated_columns.begin(), x: std::move(info));
216 return false;
217}
218
219static unique_ptr<Expression> PlanCorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
220 unique_ptr<LogicalOperator> &root,
221 unique_ptr<LogicalOperator> plan) {
222 auto &correlated_columns = expr.binder->correlated_columns;
223 // FIXME: there should be a way of disabling decorrelation for ANY queries as well, but not for now...
224 bool perform_delim =
225 expr.subquery_type == SubqueryType::ANY ? true : PerformDuplicateElimination(binder, correlated_columns);
226 D_ASSERT(expr.IsCorrelated());
227 // correlated subquery
228 // for a more in-depth explanation of this code, read the paper "Unnesting Arbitrary Subqueries"
229 // we handle three types of correlated subqueries: Scalar, EXISTS and ANY
230 // all three cases are very similar with some minor changes (mainly the type of join performed at the end)
231 switch (expr.subquery_type) {
232 case SubqueryType::SCALAR: {
233 // correlated SCALAR query
234 // first push a DUPLICATE ELIMINATED join
235 // a duplicate eliminated join creates a duplicate eliminated copy of the LHS
236 // and pushes it into any DUPLICATE_ELIMINATED SCAN operators on the RHS
237
238 // in the SCALAR case, we create a SINGLE join (because we are only interested in obtaining the value)
239 // NULL values are equal in this join because we join on the correlated columns ONLY
240 // and e.g. in the query: SELECT (SELECT 42 FROM integers WHERE i1.i IS NULL LIMIT 1) FROM integers i1;
241 // the input value NULL will generate the value 42, and we need to join NULL on the LHS with NULL on the RHS
242 // the left side is the original plan
243 // this is the side that will be duplicate eliminated and pushed into the RHS
244 auto delim_join =
245 CreateDuplicateEliminatedJoin(correlated_columns, join_type: JoinType::SINGLE, original_plan: std::move(root), perform_delim);
246
247 // the right side initially is a DEPENDENT join between the duplicate eliminated scan and the subquery
248 // HOWEVER: we do not explicitly create the dependent join
249 // instead, we eliminate the dependent join by pushing it down into the right side of the plan
250 FlattenDependentJoins flatten(binder, correlated_columns, perform_delim);
251
252 // first we check which logical operators have correlated expressions in the first place
253 flatten.DetectCorrelatedExpressions(op: plan.get());
254 // now we push the dependent join down
255 auto dependent_join = flatten.PushDownDependentJoin(plan: std::move(plan));
256
257 // now the dependent join is fully eliminated
258 // we only need to create the join conditions between the LHS and the RHS
259 // fetch the set of columns
260 auto plan_columns = dependent_join->GetColumnBindings();
261
262 // now create the join conditions
263 CreateDelimJoinConditions(delim_join&: *delim_join, correlated_columns, bindings: plan_columns, base_offset: flatten.delim_offset, perform_delim);
264 delim_join->AddChild(child: std::move(dependent_join));
265 root = std::move(delim_join);
266 // finally push the BoundColumnRefExpression referring to the data element returned by the join
267 return make_uniq<BoundColumnRefExpression>(args: expr.GetName(), args&: expr.return_type, args&: plan_columns[flatten.data_offset]);
268 }
269 case SubqueryType::EXISTS: {
270 // correlated EXISTS query
271 // this query is similar to the correlated SCALAR query, except we use a MARK join here
272 idx_t mark_index = binder.GenerateTableIndex();
273 auto delim_join =
274 CreateDuplicateEliminatedJoin(correlated_columns, join_type: JoinType::MARK, original_plan: std::move(root), perform_delim);
275 delim_join->mark_index = mark_index;
276 // RHS
277 FlattenDependentJoins flatten(binder, correlated_columns, perform_delim, true);
278 flatten.DetectCorrelatedExpressions(op: plan.get());
279 auto dependent_join = flatten.PushDownDependentJoin(plan: std::move(plan));
280
281 // fetch the set of columns
282 auto plan_columns = dependent_join->GetColumnBindings();
283
284 // now we create the join conditions between the dependent join and the original table
285 CreateDelimJoinConditions(delim_join&: *delim_join, correlated_columns, bindings: plan_columns, base_offset: flatten.delim_offset, perform_delim);
286 delim_join->AddChild(child: std::move(dependent_join));
287 root = std::move(delim_join);
288 // finally push the BoundColumnRefExpression referring to the marker
289 return make_uniq<BoundColumnRefExpression>(args: expr.GetName(), args&: expr.return_type, args: ColumnBinding(mark_index, 0));
290 }
291 default: {
292 D_ASSERT(expr.subquery_type == SubqueryType::ANY);
293 // correlated ANY query
294 // this query is similar to the correlated SCALAR query
295 // however, in this case we push a correlated MARK join
296 // note that in this join null values are NOT equal for ALL columns, but ONLY for the correlated columns
297 // the correlated mark join handles this case by itself
298 // as the MARK join has one extra join condition (the original condition, of the ANY expression, e.g.
299 // [i=ANY(...)])
300 idx_t mark_index = binder.GenerateTableIndex();
301 auto delim_join =
302 CreateDuplicateEliminatedJoin(correlated_columns, join_type: JoinType::MARK, original_plan: std::move(root), perform_delim);
303 delim_join->mark_index = mark_index;
304 // RHS
305 FlattenDependentJoins flatten(binder, correlated_columns, true, true);
306 flatten.DetectCorrelatedExpressions(op: plan.get());
307 auto dependent_join = flatten.PushDownDependentJoin(plan: std::move(plan));
308
309 // fetch the columns
310 auto plan_columns = dependent_join->GetColumnBindings();
311
312 // now we create the join conditions between the dependent join and the original table
313 CreateDelimJoinConditions(delim_join&: *delim_join, correlated_columns, bindings: plan_columns, base_offset: flatten.delim_offset, perform_delim);
314 // add the actual condition based on the ANY/ALL predicate
315 JoinCondition compare_cond;
316 compare_cond.left = std::move(expr.child);
317 compare_cond.right = BoundCastExpression::AddDefaultCastToType(
318 expr: make_uniq<BoundColumnRefExpression>(args&: expr.child_type, args&: plan_columns[0]), target_type: expr.child_target);
319 compare_cond.comparison = expr.comparison_type;
320 delim_join->conditions.push_back(x: std::move(compare_cond));
321
322 delim_join->AddChild(child: std::move(dependent_join));
323 root = std::move(delim_join);
324 // finally push the BoundColumnRefExpression referring to the marker
325 return make_uniq<BoundColumnRefExpression>(args: expr.GetName(), args&: expr.return_type, args: ColumnBinding(mark_index, 0));
326 }
327 }
328}
329
330class RecursiveSubqueryPlanner : public LogicalOperatorVisitor {
331public:
332 explicit RecursiveSubqueryPlanner(Binder &binder) : binder(binder) {
333 }
334 void VisitOperator(LogicalOperator &op) override {
335 if (!op.children.empty()) {
336 root = std::move(op.children[0]);
337 D_ASSERT(root);
338 VisitOperatorExpressions(op);
339 op.children[0] = std::move(root);
340 for (idx_t i = 0; i < op.children.size(); i++) {
341 D_ASSERT(op.children[i]);
342 VisitOperator(op&: *op.children[i]);
343 }
344 }
345 }
346
347 unique_ptr<Expression> VisitReplace(BoundSubqueryExpression &expr, unique_ptr<Expression> *expr_ptr) override {
348 return binder.PlanSubquery(expr, root);
349 }
350
351private:
352 unique_ptr<LogicalOperator> root;
353 Binder &binder;
354};
355
356unique_ptr<Expression> Binder::PlanSubquery(BoundSubqueryExpression &expr, unique_ptr<LogicalOperator> &root) {
357 D_ASSERT(root);
358 // first we translate the QueryNode of the subquery into a logical plan
359 // note that we do not plan nested subqueries yet
360 auto sub_binder = Binder::CreateBinder(context, parent: this);
361 sub_binder->plan_subquery = false;
362 auto subquery_root = sub_binder->CreatePlan(node&: *expr.subquery);
363 D_ASSERT(subquery_root);
364
365 // now we actually flatten the subquery
366 auto plan = std::move(subquery_root);
367 unique_ptr<Expression> result_expression;
368 if (!expr.IsCorrelated()) {
369 result_expression = PlanUncorrelatedSubquery(binder&: *this, expr, root, plan: std::move(plan));
370 } else {
371 result_expression = PlanCorrelatedSubquery(binder&: *this, expr, root, plan: std::move(plan));
372 }
373 // finally, we recursively plan the nested subqueries (if there are any)
374 if (sub_binder->has_unplanned_subqueries) {
375 RecursiveSubqueryPlanner plan(*this);
376 plan.VisitOperator(op&: *root);
377 }
378 return result_expression;
379}
380
381void Binder::PlanSubqueries(unique_ptr<Expression> &expr_ptr, unique_ptr<LogicalOperator> &root) {
382 if (!expr_ptr) {
383 return;
384 }
385 auto &expr = *expr_ptr;
386
387 // first visit the children of the node, if any
388 ExpressionIterator::EnumerateChildren(expression&: expr, callback: [&](unique_ptr<Expression> &expr) { PlanSubqueries(expr_ptr&: expr, root); });
389
390 // check if this is a subquery node
391 if (expr.expression_class == ExpressionClass::BOUND_SUBQUERY) {
392 auto &subquery = expr.Cast<BoundSubqueryExpression>();
393 // subquery node! plan it
394 if (subquery.IsCorrelated() && !plan_subquery) {
395 // detected a nested correlated subquery
396 // we don't plan it yet here, we are currently planning a subquery
397 // nested subqueries will only be planned AFTER the current subquery has been flattened entirely
398 has_unplanned_subqueries = true;
399 return;
400 }
401 expr_ptr = PlanSubquery(expr&: subquery, root);
402 }
403}
404
405unique_ptr<LogicalOperator> Binder::PlanLateralJoin(unique_ptr<LogicalOperator> left, unique_ptr<LogicalOperator> right,
406 vector<CorrelatedColumnInfo> &correlated_columns,
407 JoinType join_type, unique_ptr<Expression> condition) {
408 // scan the right operator for correlated columns
409 // correlated LATERAL JOIN
410 vector<JoinCondition> conditions;
411 vector<unique_ptr<Expression>> arbitrary_expressions;
412 if (condition) {
413 // extract join conditions, if there are any
414 LogicalComparisonJoin::ExtractJoinConditions(type: join_type, left_child&: left, right_child&: right, condition: std::move(condition), conditions,
415 arbitrary_expressions);
416 }
417
418 auto perform_delim = PerformDuplicateElimination(binder&: *this, correlated_columns);
419 auto delim_join = CreateDuplicateEliminatedJoin(correlated_columns, join_type, original_plan: std::move(left), perform_delim);
420
421 FlattenDependentJoins flatten(*this, correlated_columns, perform_delim);
422
423 // first we check which logical operators have correlated expressions in the first place
424 flatten.DetectCorrelatedExpressions(op: right.get(), lateral: true);
425 // now we push the dependent join down
426 auto dependent_join = flatten.PushDownDependentJoin(plan: std::move(right));
427
428 // now the dependent join is fully eliminated
429 // we only need to create the join conditions between the LHS and the RHS
430 // fetch the set of columns
431 auto plan_columns = dependent_join->GetColumnBindings();
432
433 // now create the join conditions
434 // start off with the conditions that were passed in (if any)
435 D_ASSERT(delim_join->conditions.empty());
436 delim_join->conditions = std::move(conditions);
437 // then add the delim join conditions
438 CreateDelimJoinConditions(delim_join&: *delim_join, correlated_columns, bindings: plan_columns, base_offset: flatten.delim_offset, perform_delim);
439 delim_join->AddChild(child: std::move(dependent_join));
440
441 // check if there are any arbitrary expressions left
442 if (!arbitrary_expressions.empty()) {
443 // we can only evaluate scalar arbitrary expressions for inner joins
444 if (join_type != JoinType::INNER) {
445 throw BinderException(
446 "Join condition for non-inner LATERAL JOIN must be a comparison between the left and right side");
447 }
448 auto filter = make_uniq<LogicalFilter>();
449 filter->expressions = std::move(arbitrary_expressions);
450 filter->AddChild(child: std::move(delim_join));
451 return std::move(filter);
452 }
453 return std::move(delim_join);
454}
455
456} // namespace duckdb
457