| 1 | #include "duckdb/execution/operator/join/perfect_hash_join_executor.hpp" |
| 2 | #include "duckdb/execution/operator/join/physical_cross_product.hpp" |
| 3 | #include "duckdb/execution/operator/join/physical_hash_join.hpp" |
| 4 | #include "duckdb/execution/operator/join/physical_iejoin.hpp" |
| 5 | #include "duckdb/execution/operator/join/physical_index_join.hpp" |
| 6 | #include "duckdb/execution/operator/join/physical_nested_loop_join.hpp" |
| 7 | #include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp" |
| 8 | #include "duckdb/execution/operator/scan/physical_table_scan.hpp" |
| 9 | #include "duckdb/execution/physical_plan_generator.hpp" |
| 10 | #include "duckdb/function/table/table_scan.hpp" |
| 11 | #include "duckdb/main/client_context.hpp" |
| 12 | #include "duckdb/planner/operator/logical_comparison_join.hpp" |
| 13 | #include "duckdb/transaction/duck_transaction.hpp" |
| 14 | #include "duckdb/common/operator/subtract.hpp" |
| 15 | #include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp" |
| 16 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 17 | #include "duckdb/planner/expression_iterator.hpp" |
| 18 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
| 19 | |
| 20 | namespace duckdb { |
| 21 | |
| 22 | static bool CanPlanIndexJoin(ClientContext &context, TableScanBindData &bind_data, PhysicalTableScan &scan) { |
| 23 | auto &table = bind_data.table; |
| 24 | auto &transaction = DuckTransaction::Get(context, catalog&: table.catalog); |
| 25 | auto &local_storage = LocalStorage::Get(transaction); |
| 26 | if (local_storage.Find(table&: table.GetStorage())) { |
| 27 | // transaction local appends: skip index join |
| 28 | return false; |
| 29 | } |
| 30 | if (scan.table_filters && !scan.table_filters->filters.empty()) { |
| 31 | // table scan filters |
| 32 | return false; |
| 33 | } |
| 34 | return true; |
| 35 | } |
| 36 | |
| 37 | bool (Value val, int64_t &result) { |
| 38 | if (!val.type().IsIntegral()) { |
| 39 | switch (val.type().InternalType()) { |
| 40 | case PhysicalType::INT16: |
| 41 | result = val.GetValueUnsafe<int16_t>(); |
| 42 | break; |
| 43 | case PhysicalType::INT32: |
| 44 | result = val.GetValueUnsafe<int32_t>(); |
| 45 | break; |
| 46 | case PhysicalType::INT64: |
| 47 | result = val.GetValueUnsafe<int64_t>(); |
| 48 | break; |
| 49 | default: |
| 50 | return false; |
| 51 | } |
| 52 | } else { |
| 53 | if (!val.DefaultTryCastAs(target_type: LogicalType::BIGINT)) { |
| 54 | return false; |
| 55 | } |
| 56 | result = val.GetValue<int64_t>(); |
| 57 | } |
| 58 | return true; |
| 59 | } |
| 60 | |
| 61 | void CheckForPerfectJoinOpt(LogicalComparisonJoin &op, PerfectHashJoinStats &join_state) { |
| 62 | // we only do this optimization for inner joins |
| 63 | if (op.join_type != JoinType::INNER) { |
| 64 | return; |
| 65 | } |
| 66 | // with one condition |
| 67 | if (op.conditions.size() != 1) { |
| 68 | return; |
| 69 | } |
| 70 | // with propagated statistics |
| 71 | if (op.join_stats.empty()) { |
| 72 | return; |
| 73 | } |
| 74 | for (auto &type : op.children[1]->types) { |
| 75 | switch (type.InternalType()) { |
| 76 | case PhysicalType::STRUCT: |
| 77 | case PhysicalType::LIST: |
| 78 | return; |
| 79 | default: |
| 80 | break; |
| 81 | } |
| 82 | } |
| 83 | // with equality condition and null values not equal |
| 84 | for (auto &&condition : op.conditions) { |
| 85 | if (condition.comparison != ExpressionType::COMPARE_EQUAL) { |
| 86 | return; |
| 87 | } |
| 88 | } |
| 89 | // with integral internal types |
| 90 | for (auto &&join_stat : op.join_stats) { |
| 91 | if (!TypeIsInteger(type: join_stat->GetType().InternalType()) || |
| 92 | join_stat->GetType().InternalType() == PhysicalType::INT128) { |
| 93 | // perfect join not possible for non-integral types or hugeint |
| 94 | return; |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | // and when the build range is smaller than the threshold |
| 99 | auto &stats_build = *op.join_stats[0].get(); // lhs stats |
| 100 | if (!NumericStats::HasMinMax(stats: stats_build)) { |
| 101 | return; |
| 102 | } |
| 103 | int64_t min_value, max_value; |
| 104 | if (!ExtractNumericValue(val: NumericStats::Min(stats: stats_build), result&: min_value) || |
| 105 | !ExtractNumericValue(val: NumericStats::Max(stats: stats_build), result&: max_value)) { |
| 106 | return; |
| 107 | } |
| 108 | int64_t build_range; |
| 109 | if (!TrySubtractOperator::Operation(left: max_value, right: min_value, result&: build_range)) { |
| 110 | return; |
| 111 | } |
| 112 | |
| 113 | // Fill join_stats for invisible join |
| 114 | auto &stats_probe = *op.join_stats[1].get(); // rhs stats |
| 115 | if (!NumericStats::HasMinMax(stats: stats_probe)) { |
| 116 | return; |
| 117 | } |
| 118 | |
| 119 | // The max size our build must have to run the perfect HJ |
| 120 | const idx_t MAX_BUILD_SIZE = 1000000; |
| 121 | join_state.probe_min = NumericStats::Min(stats: stats_probe); |
| 122 | join_state.probe_max = NumericStats::Max(stats: stats_probe); |
| 123 | join_state.build_min = NumericStats::Min(stats: stats_build); |
| 124 | join_state.build_max = NumericStats::Max(stats: stats_build); |
| 125 | join_state.estimated_cardinality = op.estimated_cardinality; |
| 126 | join_state.build_range = build_range; |
| 127 | if (join_state.build_range > MAX_BUILD_SIZE) { |
| 128 | return; |
| 129 | } |
| 130 | if (NumericStats::Min(stats: stats_build) <= NumericStats::Min(stats: stats_probe) && |
| 131 | NumericStats::Max(stats: stats_probe) <= NumericStats::Max(stats: stats_build)) { |
| 132 | join_state.is_probe_in_domain = true; |
| 133 | } |
| 134 | join_state.is_build_small = true; |
| 135 | return; |
| 136 | } |
| 137 | |
| 138 | static optional_ptr<Index> CanUseIndexJoin(TableScanBindData &tbl, Expression &expr) { |
| 139 | optional_ptr<Index> result; |
| 140 | tbl.table.GetStorage().info->indexes.Scan(callback: [&](Index &index) { |
| 141 | if (index.unbound_expressions.size() != 1) { |
| 142 | return false; |
| 143 | } |
| 144 | if (expr.alias == index.unbound_expressions[0]->alias) { |
| 145 | result = &index; |
| 146 | return true; |
| 147 | } |
| 148 | return false; |
| 149 | }); |
| 150 | return result; |
| 151 | } |
| 152 | |
| 153 | optional_ptr<Index> CheckIndexJoin(ClientContext &context, LogicalComparisonJoin &op, PhysicalOperator &plan, |
| 154 | Expression &condition) { |
| 155 | if (op.type == LogicalOperatorType::LOGICAL_DELIM_JOIN) { |
| 156 | return nullptr; |
| 157 | } |
| 158 | // check if one of the tables has an index on column |
| 159 | if (op.join_type != JoinType::INNER) { |
| 160 | return nullptr; |
| 161 | } |
| 162 | if (op.conditions.size() != 1) { |
| 163 | return nullptr; |
| 164 | } |
| 165 | // check if the child is (1) a table scan, and (2) has an index on the join condition |
| 166 | if (plan.type != PhysicalOperatorType::TABLE_SCAN) { |
| 167 | return nullptr; |
| 168 | } |
| 169 | auto &tbl_scan = plan.Cast<PhysicalTableScan>(); |
| 170 | auto tbl_data = dynamic_cast<TableScanBindData *>(tbl_scan.bind_data.get()); |
| 171 | if (!tbl_data) { |
| 172 | return nullptr; |
| 173 | } |
| 174 | optional_ptr<Index> result; |
| 175 | if (CanPlanIndexJoin(context, bind_data&: *tbl_data, scan&: tbl_scan)) { |
| 176 | result = CanUseIndexJoin(tbl&: *tbl_data, expr&: condition); |
| 177 | } |
| 178 | return result; |
| 179 | } |
| 180 | |
| 181 | static bool PlanIndexJoin(ClientContext &context, LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> &plan, |
| 182 | unique_ptr<PhysicalOperator> &left, unique_ptr<PhysicalOperator> &right, |
| 183 | optional_ptr<Index> index, bool swap_condition = false) { |
| 184 | if (!index) { |
| 185 | return false; |
| 186 | } |
| 187 | // index joins are not supported if there are pushed down table filters |
| 188 | D_ASSERT(right->type == PhysicalOperatorType::TABLE_SCAN); |
| 189 | auto &tbl_scan = right->Cast<PhysicalTableScan>(); |
| 190 | // if (tbl_scan.table_filters && !tbl_scan.table_filters->filters.empty()) { |
| 191 | // return false; |
| 192 | // } |
| 193 | // index joins are disabled if enable_optimizer is false |
| 194 | if (!ClientConfig::GetConfig(context).enable_optimizer) { |
| 195 | return false; |
| 196 | } |
| 197 | // check if the cardinality difference justifies an index join |
| 198 | if (!((ClientConfig::GetConfig(context).force_index_join || |
| 199 | left->estimated_cardinality < 0.01 * right->estimated_cardinality))) { |
| 200 | return false; |
| 201 | } |
| 202 | |
| 203 | // plan the index join |
| 204 | if (swap_condition) { |
| 205 | swap(a&: op.conditions[0].left, b&: op.conditions[0].right); |
| 206 | swap(a&: op.left_projection_map, b&: op.right_projection_map); |
| 207 | } |
| 208 | plan = make_uniq<PhysicalIndexJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), args&: op.join_type, |
| 209 | args&: op.left_projection_map, args&: op.right_projection_map, args&: tbl_scan.column_ids, args&: *index, |
| 210 | args: !swap_condition, args&: op.estimated_cardinality); |
| 211 | return true; |
| 212 | } |
| 213 | |
| 214 | static bool PlanIndexJoin(ClientContext &context, LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> &plan, |
| 215 | unique_ptr<PhysicalOperator> &left, unique_ptr<PhysicalOperator> &right) { |
| 216 | if (op.conditions.empty()) { |
| 217 | return false; |
| 218 | } |
| 219 | // check if we can plan an index join on the RHS |
| 220 | auto right_index = CheckIndexJoin(context, op, plan&: *right, condition&: *op.conditions[0].right); |
| 221 | if (PlanIndexJoin(context, op, plan, left, right, index: right_index)) { |
| 222 | return true; |
| 223 | } |
| 224 | // else check if we can plan an index join on the left side |
| 225 | auto left_index = CheckIndexJoin(context, op, plan&: *left, condition&: *op.conditions[0].left); |
| 226 | if (PlanIndexJoin(context, op, plan, left&: right, right&: left, index: left_index, swap_condition: true)) { |
| 227 | return true; |
| 228 | } |
| 229 | return false; |
| 230 | } |
| 231 | |
| 232 | static void RewriteJoinCondition(Expression &expr, idx_t offset) { |
| 233 | if (expr.type == ExpressionType::BOUND_REF) { |
| 234 | auto &ref = expr.Cast<BoundReferenceExpression>(); |
| 235 | ref.index += offset; |
| 236 | } |
| 237 | ExpressionIterator::EnumerateChildren(expression&: expr, callback: [&](Expression &child) { RewriteJoinCondition(expr&: child, offset); }); |
| 238 | } |
| 239 | |
| 240 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalComparisonJoin &op) { |
| 241 | // now visit the children |
| 242 | D_ASSERT(op.children.size() == 2); |
| 243 | idx_t lhs_cardinality = op.children[0]->EstimateCardinality(context); |
| 244 | idx_t rhs_cardinality = op.children[1]->EstimateCardinality(context); |
| 245 | auto left = CreatePlan(op&: *op.children[0]); |
| 246 | auto right = CreatePlan(op&: *op.children[1]); |
| 247 | left->estimated_cardinality = lhs_cardinality; |
| 248 | right->estimated_cardinality = rhs_cardinality; |
| 249 | D_ASSERT(left && right); |
| 250 | |
| 251 | if (op.conditions.empty()) { |
| 252 | // no conditions: insert a cross product |
| 253 | return make_uniq<PhysicalCrossProduct>(args&: op.types, args: std::move(left), args: std::move(right), args&: op.estimated_cardinality); |
| 254 | } |
| 255 | |
| 256 | bool has_equality = false; |
| 257 | // bool has_inequality = false; |
| 258 | size_t has_range = 0; |
| 259 | for (size_t c = 0; c < op.conditions.size(); ++c) { |
| 260 | auto &cond = op.conditions[c]; |
| 261 | switch (cond.comparison) { |
| 262 | case ExpressionType::COMPARE_EQUAL: |
| 263 | case ExpressionType::COMPARE_NOT_DISTINCT_FROM: |
| 264 | has_equality = true; |
| 265 | break; |
| 266 | case ExpressionType::COMPARE_LESSTHAN: |
| 267 | case ExpressionType::COMPARE_GREATERTHAN: |
| 268 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: |
| 269 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: |
| 270 | ++has_range; |
| 271 | break; |
| 272 | case ExpressionType::COMPARE_NOTEQUAL: |
| 273 | case ExpressionType::COMPARE_DISTINCT_FROM: |
| 274 | // has_inequality = true; |
| 275 | break; |
| 276 | default: |
| 277 | throw NotImplementedException("Unimplemented comparison join" ); |
| 278 | } |
| 279 | } |
| 280 | |
| 281 | unique_ptr<PhysicalOperator> plan; |
| 282 | if (has_equality) { |
| 283 | // check if we can use an index join |
| 284 | if (PlanIndexJoin(context, op, plan, left, right)) { |
| 285 | return plan; |
| 286 | } |
| 287 | // Equality join with small number of keys : possible perfect join optimization |
| 288 | PerfectHashJoinStats perfect_join_stats; |
| 289 | CheckForPerfectJoinOpt(op, join_state&: perfect_join_stats); |
| 290 | plan = make_uniq<PhysicalHashJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
| 291 | args&: op.join_type, args&: op.left_projection_map, args&: op.right_projection_map, |
| 292 | args: std::move(op.delim_types), args&: op.estimated_cardinality, args&: perfect_join_stats); |
| 293 | |
| 294 | } else { |
| 295 | static constexpr const idx_t NESTED_LOOP_JOIN_THRESHOLD = 5; |
| 296 | bool can_merge = has_range > 0; |
| 297 | bool can_iejoin = has_range >= 2 && recursive_cte_tables.empty(); |
| 298 | switch (op.join_type) { |
| 299 | case JoinType::SEMI: |
| 300 | case JoinType::ANTI: |
| 301 | case JoinType::MARK: |
| 302 | can_merge = can_merge && op.conditions.size() == 1; |
| 303 | can_iejoin = false; |
| 304 | break; |
| 305 | default: |
| 306 | break; |
| 307 | } |
| 308 | if (left->estimated_cardinality <= NESTED_LOOP_JOIN_THRESHOLD || |
| 309 | right->estimated_cardinality <= NESTED_LOOP_JOIN_THRESHOLD) { |
| 310 | can_iejoin = false; |
| 311 | can_merge = false; |
| 312 | } |
| 313 | if (can_iejoin) { |
| 314 | plan = make_uniq<PhysicalIEJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
| 315 | args&: op.join_type, args&: op.estimated_cardinality); |
| 316 | } else if (can_merge) { |
| 317 | // range join: use piecewise merge join |
| 318 | plan = |
| 319 | make_uniq<PhysicalPiecewiseMergeJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
| 320 | args&: op.join_type, args&: op.estimated_cardinality); |
| 321 | } else if (PhysicalNestedLoopJoin::IsSupported(conditions: op.conditions, join_type: op.join_type)) { |
| 322 | // inequality join: use nested loop |
| 323 | plan = make_uniq<PhysicalNestedLoopJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(op.conditions), |
| 324 | args&: op.join_type, args&: op.estimated_cardinality); |
| 325 | } else { |
| 326 | for (auto &cond : op.conditions) { |
| 327 | RewriteJoinCondition(expr&: *cond.right, offset: left->types.size()); |
| 328 | } |
| 329 | auto condition = JoinCondition::CreateExpression(conditions: std::move(op.conditions)); |
| 330 | plan = make_uniq<PhysicalBlockwiseNLJoin>(args&: op, args: std::move(left), args: std::move(right), args: std::move(condition), |
| 331 | args&: op.join_type, args&: op.estimated_cardinality); |
| 332 | } |
| 333 | } |
| 334 | return plan; |
| 335 | } |
| 336 | |
| 337 | } // namespace duckdb |
| 338 | |