1 | #include "duckdb/execution/operator/join/perfect_hash_join_executor.hpp" |
2 | #include "duckdb/execution/operator/join/physical_cross_product.hpp" |
3 | #include "duckdb/execution/operator/join/physical_hash_join.hpp" |
4 | #include "duckdb/execution/operator/join/physical_iejoin.hpp" |
5 | #include "duckdb/execution/operator/join/physical_index_join.hpp" |
6 | #include "duckdb/execution/operator/join/physical_nested_loop_join.hpp" |
7 | #include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp" |
8 | #include "duckdb/execution/operator/scan/physical_table_scan.hpp" |
9 | #include "duckdb/execution/physical_plan_generator.hpp" |
10 | #include "duckdb/function/table/table_scan.hpp" |
11 | #include "duckdb/main/client_context.hpp" |
12 | #include "duckdb/planner/operator/logical_comparison_join.hpp" |
13 | #include "duckdb/transaction/duck_transaction.hpp" |
14 | #include "duckdb/common/operator/subtract.hpp" |
15 | #include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp" |
16 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
17 | #include "duckdb/planner/expression_iterator.hpp" |
18 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
19 | |
20 | namespace duckdb { |
21 | |
22 | static bool CanPlanIndexJoin(ClientContext &context, TableScanBindData &bind_data, PhysicalTableScan &scan) { |
23 | auto &table = bind_data.table; |
24 | auto &transaction = DuckTransaction::Get(context, catalog&: table.catalog); |
25 | auto &local_storage = LocalStorage::Get(transaction); |
26 | if (local_storage.Find(table&: table.GetStorage())) { |
27 | // transaction local appends: skip index join |
28 | return false; |
29 | } |
30 | if (scan.table_filters && !scan.table_filters->filters.empty()) { |
31 | // table scan filters |
32 | return false; |
33 | } |
34 | return true; |
35 | } |
36 | |
37 | bool (Value val, int64_t &result) { |
38 | if (!val.type().IsIntegral()) { |
39 | switch (val.type().InternalType()) { |
40 | case PhysicalType::INT16: |
41 | result = val.GetValueUnsafe<int16_t>(); |
42 | break; |
43 | case PhysicalType::INT32: |
44 | result = val.GetValueUnsafe<int32_t>(); |
45 | break; |
46 | case PhysicalType::INT64: |
47 | result = val.GetValueUnsafe<int64_t>(); |
48 | break; |
49 | default: |
50 | return false; |
51 | } |
52 | } else { |
53 | if (!val.DefaultTryCastAs(target_type: LogicalType::BIGINT)) { |
54 | return false; |
55 | } |
56 | result = val.GetValue<int64_t>(); |
57 | } |
58 | return true; |
59 | } |
60 | |
61 | void CheckForPerfectJoinOpt(LogicalComparisonJoin &op, PerfectHashJoinStats &join_state) { |
62 | // we only do this optimization for inner joins |
63 | if (op.join_type != JoinType::INNER) { |
64 | return; |
65 | } |
66 | // with one condition |
67 | if (op.conditions.size() != 1) { |
68 | return; |
69 | } |
70 | // with propagated statistics |
71 | if (op.join_stats.empty()) { |
72 | return; |
73 | } |
74 | for (auto &type : op.children[1]->types) { |
75 | switch (type.InternalType()) { |
76 | case PhysicalType::STRUCT: |
77 | case PhysicalType::LIST: |
78 | return; |
79 | default: |
80 | break; |
81 | } |
82 | } |
83 | // with equality condition and null values not equal |
84 | for (auto &&condition : op.conditions) { |
85 | if (condition.comparison != ExpressionType::COMPARE_EQUAL) { |
86 | return; |
87 | } |
88 | } |
89 | // with integral internal types |
90 | for (auto &&join_stat : op.join_stats) { |
91 | if (!TypeIsInteger(type: join_stat->GetType().InternalType()) || |
92 | join_stat->GetType().InternalType() == PhysicalType::INT128) { |
93 | // perfect join not possible for non-integral types or hugeint |
94 | return; |
95 | } |
96 | } |
97 | |
98 | // and when the build range is smaller than the threshold |
99 | auto &stats_build = *op.join_stats[0].get(); // lhs stats |
100 | if (!NumericStats::HasMinMax(stats: stats_build)) { |
101 | return; |
102 | } |
103 | int64_t min_value, max_value; |
104 | if (!ExtractNumericValue(val: NumericStats::Min(stats: stats_build), result&: min_value) || |
105 | !ExtractNumericValue(val: NumericStats::Max(stats: stats_build), result&: max_value)) { |
106 | return; |
107 | } |
108 | int64_t build_range; |
109 | if (!TrySubtractOperator::Operation(left: max_value, right: min_value, result&: build_range)) { |
110 | return; |
111 | } |
112 | |
113 | // Fill join_stats for invisible join |
114 | auto &stats_probe = *op.join_stats[1].get(); // rhs stats |
115 | if (!NumericStats::HasMinMax(stats: stats_probe)) { |
116 | return; |
117 | } |
118 | |
119 | // The max size our build must have to run the perfect HJ |
120 | const idx_t MAX_BUILD_SIZE = 1000000; |
121 | join_state.probe_min = NumericStats::Min(stats: stats_probe); |
122 | join_state.probe_max = NumericStats::Max(stats: stats_probe); |
123 | join_state.build_min = NumericStats::Min(stats: stats_build); |
124 | join_state.build_max = NumericStats::Max(stats: stats_build); |
125 | join_state.estimated_cardinality = op.estimated_cardinality; |
126 | join_state.build_range = build_range; |
127 | if (join_state.build_range > MAX_BUILD_SIZE) { |
128 | return; |
129 | } |
130 | if (NumericStats::Min(stats: stats_build) <= NumericStats::Min(stats: stats_probe) && |
131 | NumericStats::Max(stats: stats_probe) <= NumericStats::Max(stats: stats_build)) { |
132 | join_state.is_probe_in_domain = true; |
133 | } |
134 | join_state.is_build_small = true; |
135 | return; |
136 | } |
137 | |
138 | static optional_ptr<Index> CanUseIndexJoin(TableScanBindData &tbl, Expression &expr) { |
139 | optional_ptr<Index> result; |
140 | tbl.table.GetStorage().info->indexes.Scan(callback: [&](Index &index) { |
141 | if (index.unbound_expressions.size() != 1) { |
142 | return false; |
143 | } |
144 | if (expr.alias == index.unbound_expressions[0]->alias) { |
145 | result = &index; |
146 | return true; |
147 | } |
148 | return false; |
149 | }); |
150 | return result; |
151 | } |
152 | |
153 | optional_ptr<Index> CheckIndexJoin(ClientContext &context, LogicalComparisonJoin &op, PhysicalOperator &plan, |
154 | Expression &condition) { |
155 | if (op.type == LogicalOperatorType::LOGICAL_DELIM_JOIN) { |
156 | return nullptr; |
157 | } |
158 | // check if one of the tables has an index on column |
159 | if (op.join_type != JoinType::INNER) { |
160 | return nullptr; |
161 | } |
162 | if (op.conditions.size() != 1) { |
163 | return nullptr; |
164 | } |
165 | // check if the child is (1) a table scan, and (2) has an index on the join condition |
166 | if (plan.type != PhysicalOperatorType::TABLE_SCAN) { |
167 | return nullptr; |
168 | } |
169 | auto &tbl_scan = plan.Cast<PhysicalTableScan>(); |
170 | auto tbl_data = dynamic_cast<TableScanBindData *>(tbl_scan.bind_data.get()); |
171 | if (!tbl_data) { |
172 | return nullptr; |
173 | } |
174 | optional_ptr<Index> result; |
175 | if (CanPlanIndexJoin(context, bind_data&: *tbl_data, scan&: tbl_scan)) { |
176 | result = CanUseIndexJoin(tbl&: *tbl_data, expr&: condition); |
177 | } |
178 | return result; |
179 | } |
180 | |
181 | static bool PlanIndexJoin(ClientContext &context, LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> &plan, |
182 | unique_ptr<PhysicalOperator> &left, unique_ptr<PhysicalOperator> &right, |
183 | optional_ptr<Index> index, bool swap_condition = false) { |
184 | if (!index) { |
185 | return false; |
186 | } |
187 | // index joins are not supported if there are pushed down table filters |
188 | D_ASSERT(right->type == PhysicalOperatorType::TABLE_SCAN); |
189 | auto &tbl_scan = right->Cast<PhysicalTableScan>(); |
190 | // if (tbl_scan.table_filters && !tbl_scan.table_filters->filters.empty()) { |
191 | // return false; |
192 | // } |
193 | // index joins are disabled if enable_optimizer is false |
194 | if (!ClientConfig::GetConfig(context).enable_optimizer) { |
195 | return false; |
196 | } |
197 | // check if the cardinality difference justifies an index join |
198 | if (!((ClientConfig::GetConfig(context).force_index_join || |
199 | left->estimated_cardinality < 0.01 * right->estimated_cardinality))) { |
200 | return false; |
201 | } |
202 | |
203 | // plan the index join |
204 | if (swap_condition) { |
205 | swap(a&: op.conditions[0].left, b&: op.conditions[0].right); |
206 | swap(a&: op.left_projection_map, b&: op.right_projection_map); |
207 | } |
208 | plan = make_uniq<PhysicalIndexJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), args&: op.join_type, |
209 | args&: op.left_projection_map, args&: op.right_projection_map, args&: tbl_scan.column_ids, args&: *index, |
210 | args: !swap_condition, args&: op.estimated_cardinality); |
211 | return true; |
212 | } |
213 | |
214 | static bool PlanIndexJoin(ClientContext &context, LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> &plan, |
215 | unique_ptr<PhysicalOperator> &left, unique_ptr<PhysicalOperator> &right) { |
216 | if (op.conditions.empty()) { |
217 | return false; |
218 | } |
219 | // check if we can plan an index join on the RHS |
220 | auto right_index = CheckIndexJoin(context, op, plan&: *right, condition&: *op.conditions[0].right); |
221 | if (PlanIndexJoin(context, op, plan, left, right, index: right_index)) { |
222 | return true; |
223 | } |
224 | // else check if we can plan an index join on the left side |
225 | auto left_index = CheckIndexJoin(context, op, plan&: *left, condition&: *op.conditions[0].left); |
226 | if (PlanIndexJoin(context, op, plan, left&: right, right&: left, index: left_index, swap_condition: true)) { |
227 | return true; |
228 | } |
229 | return false; |
230 | } |
231 | |
232 | static void RewriteJoinCondition(Expression &expr, idx_t offset) { |
233 | if (expr.type == ExpressionType::BOUND_REF) { |
234 | auto &ref = expr.Cast<BoundReferenceExpression>(); |
235 | ref.index += offset; |
236 | } |
237 | ExpressionIterator::EnumerateChildren(expression&: expr, callback: [&](Expression &child) { RewriteJoinCondition(expr&: child, offset); }); |
238 | } |
239 | |
240 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalComparisonJoin &op) { |
241 | // now visit the children |
242 | D_ASSERT(op.children.size() == 2); |
243 | idx_t lhs_cardinality = op.children[0]->EstimateCardinality(context); |
244 | idx_t rhs_cardinality = op.children[1]->EstimateCardinality(context); |
245 | auto left = CreatePlan(op&: *op.children[0]); |
246 | auto right = CreatePlan(op&: *op.children[1]); |
247 | left->estimated_cardinality = lhs_cardinality; |
248 | right->estimated_cardinality = rhs_cardinality; |
249 | D_ASSERT(left && right); |
250 | |
251 | if (op.conditions.empty()) { |
252 | // no conditions: insert a cross product |
253 | return make_uniq<PhysicalCrossProduct>(args&: op.types, args: std::move(left), args: std::move(right), args&: op.estimated_cardinality); |
254 | } |
255 | |
256 | bool has_equality = false; |
257 | // bool has_inequality = false; |
258 | size_t has_range = 0; |
259 | for (size_t c = 0; c < op.conditions.size(); ++c) { |
260 | auto &cond = op.conditions[c]; |
261 | switch (cond.comparison) { |
262 | case ExpressionType::COMPARE_EQUAL: |
263 | case ExpressionType::COMPARE_NOT_DISTINCT_FROM: |
264 | has_equality = true; |
265 | break; |
266 | case ExpressionType::COMPARE_LESSTHAN: |
267 | case ExpressionType::COMPARE_GREATERTHAN: |
268 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: |
269 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: |
270 | ++has_range; |
271 | break; |
272 | case ExpressionType::COMPARE_NOTEQUAL: |
273 | case ExpressionType::COMPARE_DISTINCT_FROM: |
274 | // has_inequality = true; |
275 | break; |
276 | default: |
277 | throw NotImplementedException("Unimplemented comparison join" ); |
278 | } |
279 | } |
280 | |
281 | unique_ptr<PhysicalOperator> plan; |
282 | if (has_equality) { |
283 | // check if we can use an index join |
284 | if (PlanIndexJoin(context, op, plan, left, right)) { |
285 | return plan; |
286 | } |
287 | // Equality join with small number of keys : possible perfect join optimization |
288 | PerfectHashJoinStats perfect_join_stats; |
289 | CheckForPerfectJoinOpt(op, join_state&: perfect_join_stats); |
290 | plan = make_uniq<PhysicalHashJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
291 | args&: op.join_type, args&: op.left_projection_map, args&: op.right_projection_map, |
292 | args: std::move(op.delim_types), args&: op.estimated_cardinality, args&: perfect_join_stats); |
293 | |
294 | } else { |
295 | static constexpr const idx_t NESTED_LOOP_JOIN_THRESHOLD = 5; |
296 | bool can_merge = has_range > 0; |
297 | bool can_iejoin = has_range >= 2 && recursive_cte_tables.empty(); |
298 | switch (op.join_type) { |
299 | case JoinType::SEMI: |
300 | case JoinType::ANTI: |
301 | case JoinType::MARK: |
302 | can_merge = can_merge && op.conditions.size() == 1; |
303 | can_iejoin = false; |
304 | break; |
305 | default: |
306 | break; |
307 | } |
308 | if (left->estimated_cardinality <= NESTED_LOOP_JOIN_THRESHOLD || |
309 | right->estimated_cardinality <= NESTED_LOOP_JOIN_THRESHOLD) { |
310 | can_iejoin = false; |
311 | can_merge = false; |
312 | } |
313 | if (can_iejoin) { |
314 | plan = make_uniq<PhysicalIEJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
315 | args&: op.join_type, args&: op.estimated_cardinality); |
316 | } else if (can_merge) { |
317 | // range join: use piecewise merge join |
318 | plan = |
319 | make_uniq<PhysicalPiecewiseMergeJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
320 | args&: op.join_type, args&: op.estimated_cardinality); |
321 | } else if (PhysicalNestedLoopJoin::IsSupported(conditions: op.conditions, join_type: op.join_type)) { |
322 | // inequality join: use nested loop |
323 | plan = make_uniq<PhysicalNestedLoopJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
324 | args&: op.join_type, args&: op.estimated_cardinality); |
325 | } else { |
326 | for (auto &cond : op.conditions) { |
327 | RewriteJoinCondition(expr&: *cond.right, offset: left->types.size()); |
328 | } |
329 | auto condition = JoinCondition::CreateExpression(conditions: std::move(op.conditions)); |
330 | plan = make_uniq<PhysicalBlockwiseNLJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(condition), |
331 | args&: op.join_type, args&: op.estimated_cardinality); |
332 | } |
333 | } |
334 | return plan; |
335 | } |
336 | |
337 | } // namespace duckdb |
338 | |