1#include "duckdb/execution/operator/join/perfect_hash_join_executor.hpp"
2#include "duckdb/execution/operator/join/physical_cross_product.hpp"
3#include "duckdb/execution/operator/join/physical_hash_join.hpp"
4#include "duckdb/execution/operator/join/physical_iejoin.hpp"
5#include "duckdb/execution/operator/join/physical_index_join.hpp"
6#include "duckdb/execution/operator/join/physical_nested_loop_join.hpp"
7#include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp"
8#include "duckdb/execution/operator/scan/physical_table_scan.hpp"
9#include "duckdb/execution/physical_plan_generator.hpp"
10#include "duckdb/function/table/table_scan.hpp"
11#include "duckdb/main/client_context.hpp"
12#include "duckdb/planner/operator/logical_comparison_join.hpp"
13#include "duckdb/transaction/duck_transaction.hpp"
14#include "duckdb/common/operator/subtract.hpp"
15#include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp"
16#include "duckdb/planner/expression/bound_reference_expression.hpp"
17#include "duckdb/planner/expression_iterator.hpp"
18#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
19
20namespace duckdb {
21
22static bool CanPlanIndexJoin(ClientContext &context, TableScanBindData &bind_data, PhysicalTableScan &scan) {
23 auto &table = bind_data.table;
24 auto &transaction = DuckTransaction::Get(context, catalog&: table.catalog);
25 auto &local_storage = LocalStorage::Get(transaction);
26 if (local_storage.Find(table&: table.GetStorage())) {
27 // transaction local appends: skip index join
28 return false;
29 }
30 if (scan.table_filters && !scan.table_filters->filters.empty()) {
31 // table scan filters
32 return false;
33 }
34 return true;
35}
36
37bool ExtractNumericValue(Value val, int64_t &result) {
38 if (!val.type().IsIntegral()) {
39 switch (val.type().InternalType()) {
40 case PhysicalType::INT16:
41 result = val.GetValueUnsafe<int16_t>();
42 break;
43 case PhysicalType::INT32:
44 result = val.GetValueUnsafe<int32_t>();
45 break;
46 case PhysicalType::INT64:
47 result = val.GetValueUnsafe<int64_t>();
48 break;
49 default:
50 return false;
51 }
52 } else {
53 if (!val.DefaultTryCastAs(target_type: LogicalType::BIGINT)) {
54 return false;
55 }
56 result = val.GetValue<int64_t>();
57 }
58 return true;
59}
60
61void CheckForPerfectJoinOpt(LogicalComparisonJoin &op, PerfectHashJoinStats &join_state) {
62 // we only do this optimization for inner joins
63 if (op.join_type != JoinType::INNER) {
64 return;
65 }
66 // with one condition
67 if (op.conditions.size() != 1) {
68 return;
69 }
70 // with propagated statistics
71 if (op.join_stats.empty()) {
72 return;
73 }
74 for (auto &type : op.children[1]->types) {
75 switch (type.InternalType()) {
76 case PhysicalType::STRUCT:
77 case PhysicalType::LIST:
78 return;
79 default:
80 break;
81 }
82 }
83 // with equality condition and null values not equal
84 for (auto &&condition : op.conditions) {
85 if (condition.comparison != ExpressionType::COMPARE_EQUAL) {
86 return;
87 }
88 }
89 // with integral internal types
90 for (auto &&join_stat : op.join_stats) {
91 if (!TypeIsInteger(type: join_stat->GetType().InternalType()) ||
92 join_stat->GetType().InternalType() == PhysicalType::INT128) {
93 // perfect join not possible for non-integral types or hugeint
94 return;
95 }
96 }
97
98 // and when the build range is smaller than the threshold
99 auto &stats_build = *op.join_stats[0].get(); // lhs stats
100 if (!NumericStats::HasMinMax(stats: stats_build)) {
101 return;
102 }
103 int64_t min_value, max_value;
104 if (!ExtractNumericValue(val: NumericStats::Min(stats: stats_build), result&: min_value) ||
105 !ExtractNumericValue(val: NumericStats::Max(stats: stats_build), result&: max_value)) {
106 return;
107 }
108 int64_t build_range;
109 if (!TrySubtractOperator::Operation(left: max_value, right: min_value, result&: build_range)) {
110 return;
111 }
112
113 // Fill join_stats for invisible join
114 auto &stats_probe = *op.join_stats[1].get(); // rhs stats
115 if (!NumericStats::HasMinMax(stats: stats_probe)) {
116 return;
117 }
118
119 // The max size our build must have to run the perfect HJ
120 const idx_t MAX_BUILD_SIZE = 1000000;
121 join_state.probe_min = NumericStats::Min(stats: stats_probe);
122 join_state.probe_max = NumericStats::Max(stats: stats_probe);
123 join_state.build_min = NumericStats::Min(stats: stats_build);
124 join_state.build_max = NumericStats::Max(stats: stats_build);
125 join_state.estimated_cardinality = op.estimated_cardinality;
126 join_state.build_range = build_range;
127 if (join_state.build_range > MAX_BUILD_SIZE) {
128 return;
129 }
130 if (NumericStats::Min(stats: stats_build) <= NumericStats::Min(stats: stats_probe) &&
131 NumericStats::Max(stats: stats_probe) <= NumericStats::Max(stats: stats_build)) {
132 join_state.is_probe_in_domain = true;
133 }
134 join_state.is_build_small = true;
135 return;
136}
137
138static optional_ptr<Index> CanUseIndexJoin(TableScanBindData &tbl, Expression &expr) {
139 optional_ptr<Index> result;
140 tbl.table.GetStorage().info->indexes.Scan(callback: [&](Index &index) {
141 if (index.unbound_expressions.size() != 1) {
142 return false;
143 }
144 if (expr.alias == index.unbound_expressions[0]->alias) {
145 result = &index;
146 return true;
147 }
148 return false;
149 });
150 return result;
151}
152
153optional_ptr<Index> CheckIndexJoin(ClientContext &context, LogicalComparisonJoin &op, PhysicalOperator &plan,
154 Expression &condition) {
155 if (op.type == LogicalOperatorType::LOGICAL_DELIM_JOIN) {
156 return nullptr;
157 }
158 // check if one of the tables has an index on column
159 if (op.join_type != JoinType::INNER) {
160 return nullptr;
161 }
162 if (op.conditions.size() != 1) {
163 return nullptr;
164 }
165 // check if the child is (1) a table scan, and (2) has an index on the join condition
166 if (plan.type != PhysicalOperatorType::TABLE_SCAN) {
167 return nullptr;
168 }
169 auto &tbl_scan = plan.Cast<PhysicalTableScan>();
170 auto tbl_data = dynamic_cast<TableScanBindData *>(tbl_scan.bind_data.get());
171 if (!tbl_data) {
172 return nullptr;
173 }
174 optional_ptr<Index> result;
175 if (CanPlanIndexJoin(context, bind_data&: *tbl_data, scan&: tbl_scan)) {
176 result = CanUseIndexJoin(tbl&: *tbl_data, expr&: condition);
177 }
178 return result;
179}
180
181static bool PlanIndexJoin(ClientContext &context, LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> &plan,
182 unique_ptr<PhysicalOperator> &left, unique_ptr<PhysicalOperator> &right,
183 optional_ptr<Index> index, bool swap_condition = false) {
184 if (!index) {
185 return false;
186 }
187 // index joins are not supported if there are pushed down table filters
188 D_ASSERT(right->type == PhysicalOperatorType::TABLE_SCAN);
189 auto &tbl_scan = right->Cast<PhysicalTableScan>();
190 // if (tbl_scan.table_filters && !tbl_scan.table_filters->filters.empty()) {
191 // return false;
192 // }
193 // index joins are disabled if enable_optimizer is false
194 if (!ClientConfig::GetConfig(context).enable_optimizer) {
195 return false;
196 }
197 // check if the cardinality difference justifies an index join
198 if (!((ClientConfig::GetConfig(context).force_index_join ||
199 left->estimated_cardinality < 0.01 * right->estimated_cardinality))) {
200 return false;
201 }
202
203 // plan the index join
204 if (swap_condition) {
205 swap(a&: op.conditions[0].left, b&: op.conditions[0].right);
206 swap(a&: op.left_projection_map, b&: op.right_projection_map);
207 }
208 plan = make_uniq<PhysicalIndexJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), args&: op.join_type,
209 args&: op.left_projection_map, args&: op.right_projection_map, args&: tbl_scan.column_ids, args&: *index,
210 args: !swap_condition, args&: op.estimated_cardinality);
211 return true;
212}
213
214static bool PlanIndexJoin(ClientContext &context, LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> &plan,
215 unique_ptr<PhysicalOperator> &left, unique_ptr<PhysicalOperator> &right) {
216 if (op.conditions.empty()) {
217 return false;
218 }
219 // check if we can plan an index join on the RHS
220 auto right_index = CheckIndexJoin(context, op, plan&: *right, condition&: *op.conditions[0].right);
221 if (PlanIndexJoin(context, op, plan, left, right, index: right_index)) {
222 return true;
223 }
224 // else check if we can plan an index join on the left side
225 auto left_index = CheckIndexJoin(context, op, plan&: *left, condition&: *op.conditions[0].left);
226 if (PlanIndexJoin(context, op, plan, left&: right, right&: left, index: left_index, swap_condition: true)) {
227 return true;
228 }
229 return false;
230}
231
232static void RewriteJoinCondition(Expression &expr, idx_t offset) {
233 if (expr.type == ExpressionType::BOUND_REF) {
234 auto &ref = expr.Cast<BoundReferenceExpression>();
235 ref.index += offset;
236 }
237 ExpressionIterator::EnumerateChildren(expression&: expr, callback: [&](Expression &child) { RewriteJoinCondition(expr&: child, offset); });
238}
239
240unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalComparisonJoin &op) {
241 // now visit the children
242 D_ASSERT(op.children.size() == 2);
243 idx_t lhs_cardinality = op.children[0]->EstimateCardinality(context);
244 idx_t rhs_cardinality = op.children[1]->EstimateCardinality(context);
245 auto left = CreatePlan(op&: *op.children[0]);
246 auto right = CreatePlan(op&: *op.children[1]);
247 left->estimated_cardinality = lhs_cardinality;
248 right->estimated_cardinality = rhs_cardinality;
249 D_ASSERT(left && right);
250
251 if (op.conditions.empty()) {
252 // no conditions: insert a cross product
253 return make_uniq<PhysicalCrossProduct>(args&: op.types, args: std::move(left), args: std::move(right), args&: op.estimated_cardinality);
254 }
255
256 bool has_equality = false;
257 // bool has_inequality = false;
258 size_t has_range = 0;
259 for (size_t c = 0; c < op.conditions.size(); ++c) {
260 auto &cond = op.conditions[c];
261 switch (cond.comparison) {
262 case ExpressionType::COMPARE_EQUAL:
263 case ExpressionType::COMPARE_NOT_DISTINCT_FROM:
264 has_equality = true;
265 break;
266 case ExpressionType::COMPARE_LESSTHAN:
267 case ExpressionType::COMPARE_GREATERTHAN:
268 case ExpressionType::COMPARE_LESSTHANOREQUALTO:
269 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
270 ++has_range;
271 break;
272 case ExpressionType::COMPARE_NOTEQUAL:
273 case ExpressionType::COMPARE_DISTINCT_FROM:
274 // has_inequality = true;
275 break;
276 default:
277 throw NotImplementedException("Unimplemented comparison join");
278 }
279 }
280
281 unique_ptr<PhysicalOperator> plan;
282 if (has_equality) {
283 // check if we can use an index join
284 if (PlanIndexJoin(context, op, plan, left, right)) {
285 return plan;
286 }
287 // Equality join with small number of keys : possible perfect join optimization
288 PerfectHashJoinStats perfect_join_stats;
289 CheckForPerfectJoinOpt(op, join_state&: perfect_join_stats);
290 plan = make_uniq<PhysicalHashJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions),
291 args&: op.join_type, args&: op.left_projection_map, args&: op.right_projection_map,
292 args: std::move(op.delim_types), args&: op.estimated_cardinality, args&: perfect_join_stats);
293
294 } else {
295 static constexpr const idx_t NESTED_LOOP_JOIN_THRESHOLD = 5;
296 bool can_merge = has_range > 0;
297 bool can_iejoin = has_range >= 2 && recursive_cte_tables.empty();
298 switch (op.join_type) {
299 case JoinType::SEMI:
300 case JoinType::ANTI:
301 case JoinType::MARK:
302 can_merge = can_merge && op.conditions.size() == 1;
303 can_iejoin = false;
304 break;
305 default:
306 break;
307 }
308 if (left->estimated_cardinality <= NESTED_LOOP_JOIN_THRESHOLD ||
309 right->estimated_cardinality <= NESTED_LOOP_JOIN_THRESHOLD) {
310 can_iejoin = false;
311 can_merge = false;
312 }
313 if (can_iejoin) {
314 plan = make_uniq<PhysicalIEJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions),
315 args&: op.join_type, args&: op.estimated_cardinality);
316 } else if (can_merge) {
317 // range join: use piecewise merge join
318 plan =
319 make_uniq<PhysicalPiecewiseMergeJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions),
320 args&: op.join_type, args&: op.estimated_cardinality);
321 } else if (PhysicalNestedLoopJoin::IsSupported(conditions: op.conditions, join_type: op.join_type)) {
322 // inequality join: use nested loop
323 plan = make_uniq<PhysicalNestedLoopJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions),
324 args&: op.join_type, args&: op.estimated_cardinality);
325 } else {
326 for (auto &cond : op.conditions) {
327 RewriteJoinCondition(expr&: *cond.right, offset: left->types.size());
328 }
329 auto condition = JoinCondition::CreateExpression(conditions: std::move(op.conditions));
330 plan = make_uniq<PhysicalBlockwiseNLJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(condition),
331 args&: op.join_type, args&: op.estimated_cardinality);
332 }
333 }
334 return plan;
335}
336
337} // namespace duckdb
338