1#include "duckdb/planner/binder.hpp"
2#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
3#include "duckdb/planner/expression/bound_cast_expression.hpp"
4#include "duckdb/planner/expression/bound_columnref_expression.hpp"
5#include "duckdb/planner/expression/bound_comparison_expression.hpp"
6#include "duckdb/planner/expression/bound_constant_expression.hpp"
7#include "duckdb/planner/expression/bound_reference_expression.hpp"
8#include "duckdb/planner/expression/bound_subquery_expression.hpp"
9#include "duckdb/planner/expression_iterator.hpp"
10#include "duckdb/planner/binder.hpp"
11#include "duckdb/planner/operator/list.hpp"
12#include "duckdb/planner/subquery/flatten_dependent_join.hpp"
13#include "duckdb/function/aggregate/distributive_functions.hpp"
14
15using namespace std;
16
17namespace duckdb {
18
19static unique_ptr<Expression> PlanUncorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
20 unique_ptr<LogicalOperator> &root,
21 unique_ptr<LogicalOperator> plan) {
22 assert(!expr.IsCorrelated());
23 switch (expr.subquery_type) {
24 case SubqueryType::EXISTS: {
25 // uncorrelated EXISTS
26 // we only care about existence, hence we push a LIMIT 1 operator
27 auto limit = make_unique<LogicalLimit>(1, 0);
28 limit->AddChild(move(plan));
29 plan = move(limit);
30
31 // now we push a COUNT(*) aggregate onto the limit, this will be either 0 or 1 (EXISTS or NOT EXISTS)
32 auto count_star = make_unique<BoundAggregateExpression>(TypeId::INT64, CountStarFun::GetFunction(), false);
33 auto idx_type = count_star->return_type;
34 vector<unique_ptr<Expression>> aggregate_list;
35 aggregate_list.push_back(move(count_star));
36 auto aggregate_index = binder.GenerateTableIndex();
37 auto aggregate =
38 make_unique<LogicalAggregate>(binder.GenerateTableIndex(), aggregate_index, move(aggregate_list));
39 aggregate->AddChild(move(plan));
40 plan = move(aggregate);
41
42 // now we push a projection with a comparison to 1
43 auto left_child = make_unique<BoundColumnRefExpression>(idx_type, ColumnBinding(aggregate_index, 0));
44 auto right_child = make_unique<BoundConstantExpression>(Value::Numeric(idx_type, 1));
45 auto comparison =
46 make_unique<BoundComparisonExpression>(ExpressionType::COMPARE_EQUAL, move(left_child), move(right_child));
47
48 vector<unique_ptr<Expression>> projection_list;
49 projection_list.push_back(move(comparison));
50 auto projection_index = binder.GenerateTableIndex();
51 auto projection = make_unique<LogicalProjection>(projection_index, move(projection_list));
52 projection->AddChild(move(plan));
53 plan = move(projection);
54
55 // we add it to the main query by adding a cross product
56 // FIXME: should use something else besides cross product as we always add only one scalar constant
57 auto cross_product = make_unique<LogicalCrossProduct>();
58 cross_product->AddChild(move(root));
59 cross_product->AddChild(move(plan));
60 root = move(cross_product);
61
62 // we replace the original subquery with a ColumnRefExpression refering to the result of the projection (either
63 // TRUE or FALSE)
64 return make_unique<BoundColumnRefExpression>(expr.GetName(), TypeId::BOOL, ColumnBinding(projection_index, 0));
65 }
66 case SubqueryType::SCALAR: {
67 // uncorrelated scalar, we want to return the first entry
68 // figure out the table index of the bound table of the entry which we want to return
69 auto bindings = plan->GetColumnBindings();
70 assert(bindings.size() == 1);
71 idx_t table_idx = bindings[0].table_index;
72
73 // in the uncorrelated case we are only interested in the first result of the query
74 // hence we simply push a LIMIT 1 to get the first row of the subquery
75 auto limit = make_unique<LogicalLimit>(1, 0);
76 limit->AddChild(move(plan));
77 plan = move(limit);
78
79 // we push an aggregate that returns the FIRST element
80 vector<unique_ptr<Expression>> expressions;
81 auto bound = make_unique<BoundColumnRefExpression>(expr.return_type, ColumnBinding(table_idx, 0));
82 auto first_agg = make_unique<BoundAggregateExpression>(
83 expr.return_type, FirstFun::GetFunction(SQLTypeFromInternalType(expr.return_type)), false);
84 first_agg->children.push_back(move(bound));
85 expressions.push_back(move(first_agg));
86 auto aggr_index = binder.GenerateTableIndex();
87 auto aggr = make_unique<LogicalAggregate>(binder.GenerateTableIndex(), aggr_index, move(expressions));
88 aggr->AddChild(move(plan));
89 plan = move(aggr);
90
91 // in the uncorrelated case, we add the value to the main query through a cross product
92 // FIXME: should use something else besides cross product as we always add only one scalar constant and cross
93 // product is not optimized for this.
94 assert(root);
95 auto cross_product = make_unique<LogicalCrossProduct>();
96 cross_product->AddChild(move(root));
97 cross_product->AddChild(move(plan));
98 root = move(cross_product);
99
100 // we replace the original subquery with a BoundColumnRefExpression refering to the first result of the
101 // aggregation
102 return make_unique<BoundColumnRefExpression>(expr.GetName(), expr.return_type, ColumnBinding(aggr_index, 0));
103 }
104 default: {
105 assert(expr.subquery_type == SubqueryType::ANY);
106 // we generate a MARK join that results in either (TRUE, FALSE or NULL)
107 // subquery has NULL values -> result is (TRUE or NULL)
108 // subquery has no NULL values -> result is (TRUE, FALSE or NULL [if input is NULL])
109 // fetch the column bindings
110 auto plan_columns = plan->GetColumnBindings();
111
112 // then we generate the MARK join with the subquery
113 idx_t mark_index = binder.GenerateTableIndex();
114 auto join = make_unique<LogicalComparisonJoin>(JoinType::MARK);
115 join->mark_index = mark_index;
116 join->AddChild(move(root));
117 join->AddChild(move(plan));
118 // create the JOIN condition
119 JoinCondition cond;
120 cond.left = move(expr.child);
121 cond.right = BoundCastExpression::AddCastToType(
122 make_unique<BoundColumnRefExpression>(GetInternalType(expr.child_type), plan_columns[0]), expr.child_type,
123 expr.child_target);
124 cond.comparison = expr.comparison_type;
125 join->conditions.push_back(move(cond));
126 root = move(join);
127
128 // we replace the original subquery with a BoundColumnRefExpression refering to the mark column
129 return make_unique<BoundColumnRefExpression>(expr.GetName(), expr.return_type, ColumnBinding(mark_index, 0));
130 }
131 }
132}
133
134static unique_ptr<LogicalDelimJoin> CreateDuplicateEliminatedJoin(vector<CorrelatedColumnInfo> &correlated_columns,
135 JoinType join_type) {
136 auto delim_join = make_unique<LogicalDelimJoin>(join_type);
137 for (idx_t i = 0; i < correlated_columns.size(); i++) {
138 auto &col = correlated_columns[i];
139 delim_join->duplicate_eliminated_columns.push_back(
140 make_unique<BoundColumnRefExpression>(col.type, col.binding));
141 }
142 return delim_join;
143}
144
145static void CreateDelimJoinConditions(LogicalDelimJoin &delim_join, vector<CorrelatedColumnInfo> &correlated_columns,
146 vector<ColumnBinding> bindings, idx_t base_offset) {
147 for (idx_t i = 0; i < correlated_columns.size(); i++) {
148 auto &col = correlated_columns[i];
149 JoinCondition cond;
150 cond.left = make_unique<BoundColumnRefExpression>(col.name, col.type, col.binding);
151 cond.right = make_unique<BoundColumnRefExpression>(col.name, col.type, bindings[base_offset + i]);
152 cond.comparison = ExpressionType::COMPARE_EQUAL;
153 cond.null_values_are_equal = true;
154 delim_join.conditions.push_back(move(cond));
155 }
156}
157
158static unique_ptr<Expression> PlanCorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
159 unique_ptr<LogicalOperator> &root,
160 unique_ptr<LogicalOperator> plan) {
161 auto &correlated_columns = expr.binder->correlated_columns;
162 assert(expr.IsCorrelated());
163 // correlated subquery
164 // for a more in-depth explanation of this code, read the paper "Unnesting Arbitrary Subqueries"
165 // we handle three types of correlated subqueries: Scalar, EXISTS and ANY
166 // all three cases are very similar with some minor changes (mainly the type of join performed at the end)
167 switch (expr.subquery_type) {
168 case SubqueryType::SCALAR: {
169 // correlated SCALAR query
170 // first push a DUPLICATE ELIMINATED join
171 // a duplicate eliminated join creates a duplicate eliminated copy of the LHS
172 // and pushes it into any DUPLICATE_ELIMINATED SCAN operators on the RHS
173
174 // in the SCALAR case, we create a SINGLE join (because we are only interested in obtaining the value)
175 // NULL values are equal in this join because we join on the correlated columns ONLY
176 // and e.g. in the query: SELECT (SELECT 42 FROM integers WHERE i1.i IS NULL LIMIT 1) FROM integers i1;
177 // the input value NULL will generate the value 42, and we need to join NULL on the LHS with NULL on the RHS
178 auto delim_join = CreateDuplicateEliminatedJoin(correlated_columns, JoinType::SINGLE);
179
180 // the left side is the original plan
181 // this is the side that will be duplicate eliminated and pushed into the RHS
182 delim_join->AddChild(move(root));
183 // the right side initially is a DEPENDENT join between the duplicate eliminated scan and the subquery
184 // HOWEVER: we do not explicitly create the dependent join
185 // instead, we eliminate the dependent join by pushing it down into the right side of the plan
186 FlattenDependentJoins flatten(binder, correlated_columns);
187
188 // first we check which logical operators have correlated expressions in the first place
189 flatten.DetectCorrelatedExpressions(plan.get());
190 // now we push the dependent join down
191 auto dependent_join = flatten.PushDownDependentJoin(move(plan));
192
193 // now the dependent join is fully eliminated
194 // we only need to create the join conditions between the LHS and the RHS
195 // fetch the set of columns
196 auto plan_columns = dependent_join->GetColumnBindings();
197
198 // now create the join conditions
199 CreateDelimJoinConditions(*delim_join, correlated_columns, plan_columns, flatten.delim_offset);
200 delim_join->AddChild(move(dependent_join));
201 root = move(delim_join);
202 // finally push the BoundColumnRefExpression referring to the data element returned by the join
203 return make_unique<BoundColumnRefExpression>(expr.GetName(), expr.return_type,
204 plan_columns[flatten.data_offset]);
205 }
206 case SubqueryType::EXISTS: {
207 // correlated EXISTS query
208 // this query is similar to the correlated SCALAR query, except we use a MARK join here
209 idx_t mark_index = binder.GenerateTableIndex();
210 auto delim_join = CreateDuplicateEliminatedJoin(correlated_columns, JoinType::MARK);
211 delim_join->mark_index = mark_index;
212 // LHS
213 delim_join->AddChild(move(root));
214 // RHS
215 FlattenDependentJoins flatten(binder, correlated_columns);
216 flatten.DetectCorrelatedExpressions(plan.get());
217 auto dependent_join = flatten.PushDownDependentJoin(move(plan));
218
219 // fetch the set of columns
220 auto plan_columns = dependent_join->GetColumnBindings();
221
222 // now we create the join conditions between the dependent join and the original table
223 CreateDelimJoinConditions(*delim_join, correlated_columns, plan_columns, flatten.delim_offset);
224 delim_join->AddChild(move(dependent_join));
225 root = move(delim_join);
226 // finally push the BoundColumnRefExpression referring to the marker
227 return make_unique<BoundColumnRefExpression>(expr.GetName(), expr.return_type, ColumnBinding(mark_index, 0));
228 }
229 default: {
230 assert(expr.subquery_type == SubqueryType::ANY);
231 // correlated ANY query
232 // this query is similar to the correlated SCALAR query
233 // however, in this case we push a correlated MARK join
234 // note that in this join null values are NOT equal for ALL columns, but ONLY for the correlated columns
235 // the correlated mark join handles this case by itself
236 // as the MARK join has one extra join condition (the original condition, of the ANY expression, e.g.
237 // [i=ANY(...)])
238 idx_t mark_index = binder.GenerateTableIndex();
239 auto delim_join = CreateDuplicateEliminatedJoin(correlated_columns, JoinType::MARK);
240 delim_join->mark_index = mark_index;
241 // LHS
242 delim_join->AddChild(move(root));
243 // RHS
244 FlattenDependentJoins flatten(binder, correlated_columns);
245 flatten.DetectCorrelatedExpressions(plan.get());
246 auto dependent_join = flatten.PushDownDependentJoin(move(plan));
247
248 // fetch the columns
249 auto plan_columns = dependent_join->GetColumnBindings();
250
251 // now we create the join conditions between the dependent join and the original table
252 CreateDelimJoinConditions(*delim_join, correlated_columns, plan_columns, flatten.delim_offset);
253 // add the actual condition based on the ANY/ALL predicate
254 JoinCondition compare_cond;
255 compare_cond.left = move(expr.child);
256 compare_cond.right = BoundCastExpression::AddCastToType(
257 make_unique<BoundColumnRefExpression>(GetInternalType(expr.child_type), plan_columns[0]), expr.child_type,
258 expr.child_target);
259 compare_cond.comparison = expr.comparison_type;
260 delim_join->conditions.push_back(move(compare_cond));
261
262 delim_join->AddChild(move(dependent_join));
263 root = move(delim_join);
264 // finally push the BoundColumnRefExpression referring to the marker
265 return make_unique<BoundColumnRefExpression>(expr.GetName(), expr.return_type, ColumnBinding(mark_index, 0));
266 }
267 }
268}
269
270class RecursiveSubqueryPlanner : public LogicalOperatorVisitor {
271public:
272 RecursiveSubqueryPlanner(Binder &binder) : binder(binder) {
273 }
274 void VisitOperator(LogicalOperator &op) override {
275 if (op.children.size() > 0) {
276 root = move(op.children[0]);
277 VisitOperatorExpressions(op);
278 op.children[0] = move(root);
279 for (idx_t i = 0; i < op.children.size(); i++) {
280 VisitOperator(*op.children[i]);
281 }
282 }
283 }
284
285 unique_ptr<Expression> VisitReplace(BoundSubqueryExpression &expr, unique_ptr<Expression> *expr_ptr) override {
286 return binder.PlanSubquery(expr, root);
287 }
288
289private:
290 unique_ptr<LogicalOperator> root;
291 Binder &binder;
292};
293
294unique_ptr<Expression> Binder::PlanSubquery(BoundSubqueryExpression &expr, unique_ptr<LogicalOperator> &root) {
295 assert(root);
296 // first we translate the QueryNode of the subquery into a logical plan
297 // note that we do not plan nested subqueries yet
298 Binder sub_binder(context);
299 sub_binder.plan_subquery = false;
300 auto subquery_root = sub_binder.CreatePlan(*expr.subquery);
301 assert(subquery_root);
302
303 // now we actually flatten the subquery
304 auto plan = move(subquery_root);
305 unique_ptr<Expression> result_expression;
306 if (!expr.IsCorrelated()) {
307 result_expression = PlanUncorrelatedSubquery(*this, expr, root, move(plan));
308 } else {
309 result_expression = PlanCorrelatedSubquery(*this, expr, root, move(plan));
310 }
311 // finally, we recursively plan the nested subqueries (if there are any)
312 if (sub_binder.has_unplanned_subqueries) {
313 RecursiveSubqueryPlanner plan(*this);
314 plan.VisitOperator(*root);
315 }
316 return result_expression;
317}
318
319void Binder::PlanSubqueries(unique_ptr<Expression> *expr_ptr, unique_ptr<LogicalOperator> *root) {
320 auto &expr = **expr_ptr;
321
322 // first visit the children of the node, if any
323 ExpressionIterator::EnumerateChildren(expr, [&](unique_ptr<Expression> expr) -> unique_ptr<Expression> {
324 PlanSubqueries(&expr, root);
325 return move(expr);
326 });
327
328 // check if this is a subquery node
329 if (expr.expression_class == ExpressionClass::BOUND_SUBQUERY) {
330 auto &subquery = (BoundSubqueryExpression &)expr;
331 // subquery node! plan it
332 if (subquery.IsCorrelated() && !plan_subquery) {
333 // detected a nested correlated subquery
334 // we don't plan it yet here, we are currently planning a subquery
335 // nested subqueries will only be planned AFTER the current subquery has been flattened entirely
336 has_unplanned_subqueries = true;
337 return;
338 }
339 *expr_ptr = PlanSubquery(subquery, *root);
340 }
341}
342
343} // namespace duckdb
344