| 1 | #include "duckdb/execution/operator/join/physical_iejoin.hpp" |
| 2 | |
| 3 | #include "duckdb/common/operator/comparison_operators.hpp" |
| 4 | #include "duckdb/common/row_operations/row_operations.hpp" |
| 5 | #include "duckdb/common/sort/sort.hpp" |
| 6 | #include "duckdb/common/sort/sorted_block.hpp" |
| 7 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 8 | #include "duckdb/execution/expression_executor.hpp" |
| 9 | #include "duckdb/main/client_context.hpp" |
| 10 | #include "duckdb/parallel/event.hpp" |
| 11 | #include "duckdb/parallel/meta_pipeline.hpp" |
| 12 | #include "duckdb/parallel/thread_context.hpp" |
| 13 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 14 | |
| 15 | #include <thread> |
| 16 | |
| 17 | namespace duckdb { |
| 18 | |
| 19 | PhysicalIEJoin::PhysicalIEJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
| 20 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type, |
| 21 | idx_t estimated_cardinality) |
| 22 | : PhysicalRangeJoin(op, PhysicalOperatorType::IE_JOIN, std::move(left), std::move(right), std::move(cond), |
| 23 | join_type, estimated_cardinality) { |
| 24 | |
| 25 | // 1. let L1 (resp. L2) be the array of column X (resp. Y) |
| 26 | D_ASSERT(conditions.size() >= 2); |
| 27 | lhs_orders.resize(new_size: 2); |
| 28 | rhs_orders.resize(new_size: 2); |
| 29 | for (idx_t i = 0; i < 2; ++i) { |
| 30 | auto &cond = conditions[i]; |
| 31 | D_ASSERT(cond.left->return_type == cond.right->return_type); |
| 32 | join_key_types.push_back(x: cond.left->return_type); |
| 33 | |
| 34 | // Convert the conditions to sort orders |
| 35 | auto left = cond.left->Copy(); |
| 36 | auto right = cond.right->Copy(); |
| 37 | auto sense = OrderType::INVALID; |
| 38 | |
| 39 | // 2. if (op1 ∈ {>, ≥}) sort L1 in descending order |
| 40 | // 3. else if (op1 ∈ {<, ≤}) sort L1 in ascending order |
| 41 | // 4. if (op2 ∈ {>, ≥}) sort L2 in ascending order |
| 42 | // 5. else if (op2 ∈ {<, ≤}) sort L2 in descending order |
| 43 | switch (cond.comparison) { |
| 44 | case ExpressionType::COMPARE_GREATERTHAN: |
| 45 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: |
| 46 | sense = i ? OrderType::ASCENDING : OrderType::DESCENDING; |
| 47 | break; |
| 48 | case ExpressionType::COMPARE_LESSTHAN: |
| 49 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: |
| 50 | sense = i ? OrderType::DESCENDING : OrderType::ASCENDING; |
| 51 | break; |
| 52 | default: |
| 53 | throw NotImplementedException("Unimplemented join type for IEJoin" ); |
| 54 | } |
| 55 | lhs_orders[i].emplace_back(args: BoundOrderByNode(sense, OrderByNullType::NULLS_LAST, std::move(left))); |
| 56 | rhs_orders[i].emplace_back(args: BoundOrderByNode(sense, OrderByNullType::NULLS_LAST, std::move(right))); |
| 57 | } |
| 58 | |
| 59 | for (idx_t i = 2; i < conditions.size(); ++i) { |
| 60 | auto &cond = conditions[i]; |
| 61 | D_ASSERT(cond.left->return_type == cond.right->return_type); |
| 62 | join_key_types.push_back(x: cond.left->return_type); |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | //===--------------------------------------------------------------------===// |
| 67 | // Sink |
| 68 | //===--------------------------------------------------------------------===// |
| 69 | class IEJoinLocalState : public LocalSinkState { |
| 70 | public: |
| 71 | using LocalSortedTable = PhysicalRangeJoin::LocalSortedTable; |
| 72 | |
| 73 | IEJoinLocalState(ClientContext &context, const PhysicalRangeJoin &op, const idx_t child) |
| 74 | : table(context, op, child) { |
| 75 | } |
| 76 | |
| 77 | //! The local sort state |
| 78 | LocalSortedTable table; |
| 79 | }; |
| 80 | |
| 81 | class IEJoinGlobalState : public GlobalSinkState { |
| 82 | public: |
| 83 | using GlobalSortedTable = PhysicalRangeJoin::GlobalSortedTable; |
| 84 | |
| 85 | public: |
| 86 | IEJoinGlobalState(ClientContext &context, const PhysicalIEJoin &op) : child(0) { |
| 87 | tables.resize(new_size: 2); |
| 88 | RowLayout lhs_layout; |
| 89 | lhs_layout.Initialize(types: op.children[0]->types); |
| 90 | vector<BoundOrderByNode> lhs_order; |
| 91 | lhs_order.emplace_back(args: op.lhs_orders[0][0].Copy()); |
| 92 | tables[0] = make_uniq<GlobalSortedTable>(args&: context, args&: lhs_order, args&: lhs_layout); |
| 93 | |
| 94 | RowLayout rhs_layout; |
| 95 | rhs_layout.Initialize(types: op.children[1]->types); |
| 96 | vector<BoundOrderByNode> rhs_order; |
| 97 | rhs_order.emplace_back(args: op.rhs_orders[0][0].Copy()); |
| 98 | tables[1] = make_uniq<GlobalSortedTable>(args&: context, args&: rhs_order, args&: rhs_layout); |
| 99 | } |
| 100 | |
| 101 | IEJoinGlobalState(IEJoinGlobalState &prev) |
| 102 | : GlobalSinkState(prev), tables(std::move(prev.tables)), child(prev.child + 1) { |
| 103 | } |
| 104 | |
| 105 | void Sink(DataChunk &input, IEJoinLocalState &lstate) { |
| 106 | auto &table = *tables[child]; |
| 107 | auto &global_sort_state = table.global_sort_state; |
| 108 | auto &local_sort_state = lstate.table.local_sort_state; |
| 109 | |
| 110 | // Sink the data into the local sort state |
| 111 | lstate.table.Sink(input, global_sort_state); |
| 112 | |
| 113 | // When sorting data reaches a certain size, we sort it |
| 114 | if (local_sort_state.SizeInBytes() >= table.memory_per_thread) { |
| 115 | local_sort_state.Sort(global_sort_state, reorder_heap: true); |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | vector<unique_ptr<GlobalSortedTable>> tables; |
| 120 | size_t child; |
| 121 | }; |
| 122 | |
| 123 | unique_ptr<GlobalSinkState> PhysicalIEJoin::GetGlobalSinkState(ClientContext &context) const { |
| 124 | D_ASSERT(!sink_state); |
| 125 | return make_uniq<IEJoinGlobalState>(args&: context, args: *this); |
| 126 | } |
| 127 | |
| 128 | unique_ptr<LocalSinkState> PhysicalIEJoin::GetLocalSinkState(ExecutionContext &context) const { |
| 129 | idx_t sink_child = 0; |
| 130 | if (sink_state) { |
| 131 | const auto &ie_sink = sink_state->Cast<IEJoinGlobalState>(); |
| 132 | sink_child = ie_sink.child; |
| 133 | } |
| 134 | return make_uniq<IEJoinLocalState>(args&: context.client, args: *this, args&: sink_child); |
| 135 | } |
| 136 | |
| 137 | SinkResultType PhysicalIEJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 138 | auto &gstate = input.global_state.Cast<IEJoinGlobalState>(); |
| 139 | auto &lstate = input.local_state.Cast<IEJoinLocalState>(); |
| 140 | |
| 141 | gstate.Sink(input&: chunk, lstate); |
| 142 | |
| 143 | return SinkResultType::NEED_MORE_INPUT; |
| 144 | } |
| 145 | |
| 146 | void PhysicalIEJoin::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
| 147 | auto &gstate = gstate_p.Cast<IEJoinGlobalState>(); |
| 148 | auto &lstate = lstate_p.Cast<IEJoinLocalState>(); |
| 149 | gstate.tables[gstate.child]->Combine(ltable&: lstate.table); |
| 150 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
| 151 | |
| 152 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.table.executor, name: gstate.child ? "rhs_executor" : "lhs_executor" , id: 1); |
| 153 | client_profiler.Flush(profiler&: context.thread.profiler); |
| 154 | } |
| 155 | |
| 156 | //===--------------------------------------------------------------------===// |
| 157 | // Finalize |
| 158 | //===--------------------------------------------------------------------===// |
| 159 | SinkFinalizeType PhysicalIEJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
| 160 | GlobalSinkState &gstate_p) const { |
| 161 | auto &gstate = gstate_p.Cast<IEJoinGlobalState>(); |
| 162 | auto &table = *gstate.tables[gstate.child]; |
| 163 | auto &global_sort_state = table.global_sort_state; |
| 164 | |
| 165 | if ((gstate.child == 1 && IsRightOuterJoin(type: join_type)) || (gstate.child == 0 && IsLeftOuterJoin(type: join_type))) { |
| 166 | // for FULL/LEFT/RIGHT OUTER JOIN, initialize found_match to false for every tuple |
| 167 | table.IntializeMatches(); |
| 168 | } |
| 169 | if (gstate.child == 1 && global_sort_state.sorted_blocks.empty() && EmptyResultIfRHSIsEmpty()) { |
| 170 | // Empty input! |
| 171 | return SinkFinalizeType::NO_OUTPUT_POSSIBLE; |
| 172 | } |
| 173 | |
| 174 | // Sort the current input child |
| 175 | table.Finalize(pipeline, event); |
| 176 | |
| 177 | // Move to the next input child |
| 178 | ++gstate.child; |
| 179 | |
| 180 | return SinkFinalizeType::READY; |
| 181 | } |
| 182 | |
| 183 | //===--------------------------------------------------------------------===// |
| 184 | // Operator |
| 185 | //===--------------------------------------------------------------------===// |
| 186 | OperatorResultType PhysicalIEJoin::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
| 187 | GlobalOperatorState &gstate, OperatorState &state) const { |
| 188 | return OperatorResultType::FINISHED; |
| 189 | } |
| 190 | |
| 191 | //===--------------------------------------------------------------------===// |
| 192 | // Source |
| 193 | //===--------------------------------------------------------------------===// |
| 194 | struct IEJoinUnion { |
| 195 | using SortedTable = PhysicalRangeJoin::GlobalSortedTable; |
| 196 | |
| 197 | static idx_t AppendKey(SortedTable &table, ExpressionExecutor &executor, SortedTable &marked, int64_t increment, |
| 198 | int64_t base, const idx_t block_idx); |
| 199 | |
| 200 | static void Sort(SortedTable &table) { |
| 201 | auto &global_sort_state = table.global_sort_state; |
| 202 | global_sort_state.PrepareMergePhase(); |
| 203 | while (global_sort_state.sorted_blocks.size() > 1) { |
| 204 | global_sort_state.InitializeMergeRound(); |
| 205 | MergeSorter merge_sorter(global_sort_state, global_sort_state.buffer_manager); |
| 206 | merge_sorter.PerformInMergeRound(); |
| 207 | global_sort_state.CompleteMergeRound(keep_radix_data: true); |
| 208 | } |
| 209 | } |
| 210 | |
| 211 | template <typename T> |
| 212 | static vector<T> ExtractColumn(SortedTable &table, idx_t col_idx) { |
| 213 | vector<T> result; |
| 214 | result.reserve(table.count); |
| 215 | |
| 216 | auto &gstate = table.global_sort_state; |
| 217 | auto &blocks = *gstate.sorted_blocks[0]->payload_data; |
| 218 | PayloadScanner scanner(blocks, gstate, false); |
| 219 | |
| 220 | DataChunk payload; |
| 221 | payload.Initialize(allocator&: Allocator::DefaultAllocator(), types: gstate.payload_layout.GetTypes()); |
| 222 | for (;;) { |
| 223 | scanner.Scan(chunk&: payload); |
| 224 | const auto count = payload.size(); |
| 225 | if (!count) { |
| 226 | break; |
| 227 | } |
| 228 | |
| 229 | const auto data_ptr = FlatVector::GetData<T>(payload.data[col_idx]); |
| 230 | result.insert(result.end(), data_ptr, data_ptr + count); |
| 231 | } |
| 232 | |
| 233 | return result; |
| 234 | } |
| 235 | |
| 236 | IEJoinUnion(ClientContext &context, const PhysicalIEJoin &op, SortedTable &t1, const idx_t b1, SortedTable &t2, |
| 237 | const idx_t b2); |
| 238 | |
| 239 | idx_t SearchL1(idx_t pos); |
| 240 | bool NextRow(); |
| 241 | |
| 242 | //! Inverted loop |
| 243 | idx_t JoinComplexBlocks(SelectionVector &lsel, SelectionVector &rsel); |
| 244 | |
| 245 | //! L1 |
| 246 | unique_ptr<SortedTable> l1; |
| 247 | //! L2 |
| 248 | unique_ptr<SortedTable> l2; |
| 249 | |
| 250 | //! Li |
| 251 | vector<int64_t> li; |
| 252 | //! P |
| 253 | vector<idx_t> p; |
| 254 | |
| 255 | //! B |
| 256 | vector<validity_t> bit_array; |
| 257 | ValidityMask bit_mask; |
| 258 | |
| 259 | //! Bloom Filter |
| 260 | static constexpr idx_t BLOOM_CHUNK_BITS = 1024; |
| 261 | idx_t bloom_count; |
| 262 | vector<validity_t> bloom_array; |
| 263 | ValidityMask bloom_filter; |
| 264 | |
| 265 | //! Iteration state |
| 266 | idx_t n; |
| 267 | idx_t i; |
| 268 | idx_t j; |
| 269 | unique_ptr<SBIterator> op1; |
| 270 | unique_ptr<SBIterator> off1; |
| 271 | unique_ptr<SBIterator> op2; |
| 272 | unique_ptr<SBIterator> off2; |
| 273 | int64_t lrid; |
| 274 | }; |
| 275 | |
| 276 | idx_t IEJoinUnion::AppendKey(SortedTable &table, ExpressionExecutor &executor, SortedTable &marked, int64_t increment, |
| 277 | int64_t base, const idx_t block_idx) { |
| 278 | LocalSortState local_sort_state; |
| 279 | local_sort_state.Initialize(global_sort_state&: marked.global_sort_state, buffer_manager_p&: marked.global_sort_state.buffer_manager); |
| 280 | |
| 281 | // Reading |
| 282 | const auto valid = table.count - table.has_null; |
| 283 | auto &gstate = table.global_sort_state; |
| 284 | PayloadScanner scanner(gstate, block_idx); |
| 285 | auto table_idx = block_idx * gstate.block_capacity; |
| 286 | |
| 287 | DataChunk scanned; |
| 288 | scanned.Initialize(allocator&: Allocator::DefaultAllocator(), types: scanner.GetPayloadTypes()); |
| 289 | |
| 290 | // Writing |
| 291 | auto types = local_sort_state.sort_layout->logical_types; |
| 292 | const idx_t payload_idx = types.size(); |
| 293 | |
| 294 | const auto &payload_types = local_sort_state.payload_layout->GetTypes(); |
| 295 | types.insert(position: types.end(), first: payload_types.begin(), last: payload_types.end()); |
| 296 | const idx_t rid_idx = types.size() - 1; |
| 297 | |
| 298 | DataChunk keys; |
| 299 | DataChunk payload; |
| 300 | keys.Initialize(allocator&: Allocator::DefaultAllocator(), types); |
| 301 | |
| 302 | idx_t inserted = 0; |
| 303 | for (auto rid = base; table_idx < valid;) { |
| 304 | scanner.Scan(chunk&: scanned); |
| 305 | |
| 306 | // NULLs are at the end, so stop when we reach them |
| 307 | auto scan_count = scanned.size(); |
| 308 | if (table_idx + scan_count > valid) { |
| 309 | scan_count = valid - table_idx; |
| 310 | scanned.SetCardinality(scan_count); |
| 311 | } |
| 312 | if (scan_count == 0) { |
| 313 | break; |
| 314 | } |
| 315 | table_idx += scan_count; |
| 316 | |
| 317 | // Compute the input columns from the payload |
| 318 | keys.Reset(); |
| 319 | keys.Split(other&: payload, split_idx: rid_idx); |
| 320 | executor.Execute(input&: scanned, result&: keys); |
| 321 | |
| 322 | // Mark the rid column |
| 323 | payload.data[0].Sequence(start: rid, increment, count: scan_count); |
| 324 | payload.SetCardinality(scan_count); |
| 325 | keys.Fuse(other&: payload); |
| 326 | rid += increment * scan_count; |
| 327 | |
| 328 | // Sort on the sort columns (which will no longer be needed) |
| 329 | keys.Split(other&: payload, split_idx: payload_idx); |
| 330 | local_sort_state.SinkChunk(sort&: keys, payload); |
| 331 | inserted += scan_count; |
| 332 | keys.Fuse(other&: payload); |
| 333 | |
| 334 | // Flush when we have enough data |
| 335 | if (local_sort_state.SizeInBytes() >= marked.memory_per_thread) { |
| 336 | local_sort_state.Sort(global_sort_state&: marked.global_sort_state, reorder_heap: true); |
| 337 | } |
| 338 | } |
| 339 | marked.global_sort_state.AddLocalState(local_sort_state); |
| 340 | marked.count += inserted; |
| 341 | |
| 342 | return inserted; |
| 343 | } |
| 344 | |
| 345 | IEJoinUnion::IEJoinUnion(ClientContext &context, const PhysicalIEJoin &op, SortedTable &t1, const idx_t b1, |
| 346 | SortedTable &t2, const idx_t b2) |
| 347 | : n(0), i(0) { |
| 348 | // input : query Q with 2 join predicates t1.X op1 t2.X' and t1.Y op2 t2.Y', tables T, T' of sizes m and n resp. |
| 349 | // output: a list of tuple pairs (ti , tj) |
| 350 | // Note that T/T' are already sorted on X/X' and contain the payload data |
| 351 | // We only join the two block numbers and use the sizes of the blocks as the counts |
| 352 | |
| 353 | // 0. Filter out tables with no overlap |
| 354 | if (!t1.BlockSize(i: b1) || !t2.BlockSize(i: b2)) { |
| 355 | return; |
| 356 | } |
| 357 | |
| 358 | const auto &cmp1 = op.conditions[0].comparison; |
| 359 | SBIterator bounds1(t1.global_sort_state, cmp1); |
| 360 | SBIterator bounds2(t2.global_sort_state, cmp1); |
| 361 | |
| 362 | // t1.X[0] op1 t2.X'[-1] |
| 363 | bounds1.SetIndex(bounds1.block_capacity * b1); |
| 364 | bounds2.SetIndex(bounds2.block_capacity * b2 + t2.BlockSize(i: b2) - 1); |
| 365 | if (!bounds1.Compare(other: bounds2)) { |
| 366 | return; |
| 367 | } |
| 368 | |
| 369 | // 1. let L1 (resp. L2) be the array of column X (resp. Y ) |
| 370 | const auto &order1 = op.lhs_orders[0][0]; |
| 371 | const auto &order2 = op.lhs_orders[1][0]; |
| 372 | |
| 373 | // 2. if (op1 ∈ {>, ≥}) sort L1 in descending order |
| 374 | // 3. else if (op1 ∈ {<, ≤}) sort L1 in ascending order |
| 375 | |
| 376 | // For the union algorithm, we make a unified table with the keys and the rids as the payload: |
| 377 | // X/X', Y/Y', R/R'/Li |
| 378 | // The first position is the sort key. |
| 379 | vector<LogicalType> types; |
| 380 | types.emplace_back(args&: order2.expression->return_type); |
| 381 | types.emplace_back(args: LogicalType::BIGINT); |
| 382 | RowLayout payload_layout; |
| 383 | payload_layout.Initialize(types); |
| 384 | |
| 385 | // Sort on the first expression |
| 386 | auto ref = make_uniq<BoundReferenceExpression>(args&: order1.expression->return_type, args: 0); |
| 387 | vector<BoundOrderByNode> orders; |
| 388 | orders.emplace_back(args: order1.type, args: order1.null_order, args: std::move(ref)); |
| 389 | |
| 390 | l1 = make_uniq<SortedTable>(args&: context, args&: orders, args&: payload_layout); |
| 391 | |
| 392 | // LHS has positive rids |
| 393 | ExpressionExecutor l_executor(context); |
| 394 | l_executor.AddExpression(expr: *order1.expression); |
| 395 | l_executor.AddExpression(expr: *order2.expression); |
| 396 | AppendKey(table&: t1, executor&: l_executor, marked&: *l1, increment: 1, base: 1, block_idx: b1); |
| 397 | |
| 398 | // RHS has negative rids |
| 399 | ExpressionExecutor r_executor(context); |
| 400 | r_executor.AddExpression(expr: *op.rhs_orders[0][0].expression); |
| 401 | r_executor.AddExpression(expr: *op.rhs_orders[1][0].expression); |
| 402 | AppendKey(table&: t2, executor&: r_executor, marked&: *l1, increment: -1, base: -1, block_idx: b2); |
| 403 | |
| 404 | if (l1->global_sort_state.sorted_blocks.empty()) { |
| 405 | return; |
| 406 | } |
| 407 | |
| 408 | Sort(table&: *l1); |
| 409 | |
| 410 | op1 = make_uniq<SBIterator>(args&: l1->global_sort_state, args: cmp1); |
| 411 | off1 = make_uniq<SBIterator>(args&: l1->global_sort_state, args: cmp1); |
| 412 | |
| 413 | // We don't actually need the L1 column, just its sort key, which is in the sort blocks |
| 414 | li = ExtractColumn<int64_t>(table&: *l1, col_idx: types.size() - 1); |
| 415 | |
| 416 | // 4. if (op2 ∈ {>, ≥}) sort L2 in ascending order |
| 417 | // 5. else if (op2 ∈ {<, ≤}) sort L2 in descending order |
| 418 | |
| 419 | // We sort on Y/Y' to obtain the sort keys and the permutation array. |
| 420 | // For this we just need a two-column table of Y, P |
| 421 | types.clear(); |
| 422 | types.emplace_back(args: LogicalType::BIGINT); |
| 423 | payload_layout.Initialize(types); |
| 424 | |
| 425 | // Sort on the first expression |
| 426 | orders.clear(); |
| 427 | ref = make_uniq<BoundReferenceExpression>(args&: order2.expression->return_type, args: 0); |
| 428 | orders.emplace_back(args: order2.type, args: order2.null_order, args: std::move(ref)); |
| 429 | |
| 430 | ExpressionExecutor executor(context); |
| 431 | executor.AddExpression(expr: *orders[0].expression); |
| 432 | |
| 433 | l2 = make_uniq<SortedTable>(args&: context, args&: orders, args&: payload_layout); |
| 434 | for (idx_t base = 0, block_idx = 0; block_idx < l1->BlockCount(); ++block_idx) { |
| 435 | base += AppendKey(table&: *l1, executor, marked&: *l2, increment: 1, base, block_idx); |
| 436 | } |
| 437 | |
| 438 | Sort(table&: *l2); |
| 439 | |
| 440 | // We don't actually need the L2 column, just its sort key, which is in the sort blocks |
| 441 | |
| 442 | // 6. compute the permutation array P of L2 w.r.t. L1 |
| 443 | p = ExtractColumn<idx_t>(table&: *l2, col_idx: types.size() - 1); |
| 444 | |
| 445 | // 7. initialize bit-array B (|B| = n), and set all bits to 0 |
| 446 | n = l2->count.load(); |
| 447 | bit_array.resize(new_size: ValidityMask::EntryCount(count: n), x: 0); |
| 448 | bit_mask.Initialize(validity: bit_array.data()); |
| 449 | |
| 450 | // Bloom filter |
| 451 | bloom_count = (n + (BLOOM_CHUNK_BITS - 1)) / BLOOM_CHUNK_BITS; |
| 452 | bloom_array.resize(new_size: ValidityMask::EntryCount(count: bloom_count), x: 0); |
| 453 | bloom_filter.Initialize(validity: bloom_array.data()); |
| 454 | |
| 455 | // 11. for(i←1 to n) do |
| 456 | const auto &cmp2 = op.conditions[1].comparison; |
| 457 | op2 = make_uniq<SBIterator>(args&: l2->global_sort_state, args: cmp2); |
| 458 | off2 = make_uniq<SBIterator>(args&: l2->global_sort_state, args: cmp2); |
| 459 | i = 0; |
| 460 | j = 0; |
| 461 | (void)NextRow(); |
| 462 | } |
| 463 | |
| 464 | idx_t IEJoinUnion::SearchL1(idx_t pos) { |
| 465 | // Perform an exponential search in the appropriate direction |
| 466 | op1->SetIndex(pos); |
| 467 | |
| 468 | idx_t step = 1; |
| 469 | auto hi = pos; |
| 470 | auto lo = pos; |
| 471 | if (!op1->cmp) { |
| 472 | // Scan left for loose inequality |
| 473 | lo -= MinValue(a: step, b: lo); |
| 474 | step *= 2; |
| 475 | off1->SetIndex(lo); |
| 476 | while (lo > 0 && op1->Compare(other: *off1)) { |
| 477 | hi = lo; |
| 478 | lo -= MinValue(a: step, b: lo); |
| 479 | step *= 2; |
| 480 | off1->SetIndex(lo); |
| 481 | } |
| 482 | } else { |
| 483 | // Scan right for strict inequality |
| 484 | hi += MinValue(a: step, b: n - hi); |
| 485 | step *= 2; |
| 486 | off1->SetIndex(hi); |
| 487 | while (hi < n && !op1->Compare(other: *off1)) { |
| 488 | lo = hi; |
| 489 | hi += MinValue(a: step, b: n - hi); |
| 490 | step *= 2; |
| 491 | off1->SetIndex(hi); |
| 492 | } |
| 493 | } |
| 494 | |
| 495 | // Binary search the target area |
| 496 | while (lo < hi) { |
| 497 | const auto mid = lo + (hi - lo) / 2; |
| 498 | off1->SetIndex(mid); |
| 499 | if (op1->Compare(other: *off1)) { |
| 500 | hi = mid; |
| 501 | } else { |
| 502 | lo = mid + 1; |
| 503 | } |
| 504 | } |
| 505 | |
| 506 | off1->SetIndex(lo); |
| 507 | |
| 508 | return lo; |
| 509 | } |
| 510 | |
| 511 | bool IEJoinUnion::NextRow() { |
| 512 | for (; i < n; ++i) { |
| 513 | // 12. pos ← P[i] |
| 514 | auto pos = p[i]; |
| 515 | lrid = li[pos]; |
| 516 | if (lrid < 0) { |
| 517 | continue; |
| 518 | } |
| 519 | |
| 520 | // 16. B[pos] ← 1 |
| 521 | op2->SetIndex(i); |
| 522 | for (; off2->GetIndex() < n; ++(*off2)) { |
| 523 | if (!off2->Compare(other: *op2)) { |
| 524 | break; |
| 525 | } |
| 526 | const auto p2 = p[off2->GetIndex()]; |
| 527 | if (li[p2] < 0) { |
| 528 | // Only mark rhs matches. |
| 529 | bit_mask.SetValid(p2); |
| 530 | bloom_filter.SetValid(p2 / BLOOM_CHUNK_BITS); |
| 531 | } |
| 532 | } |
| 533 | |
| 534 | // 9. if (op1 ∈ {≤,≥} and op2 ∈ {≤,≥}) eqOff = 0 |
| 535 | // 10. else eqOff = 1 |
| 536 | // No, because there could be more than one equal value. |
| 537 | // Find the leftmost off1 where L1[pos] op1 L1[off1..n] |
| 538 | // These are the rows that satisfy the op1 condition |
| 539 | // and that is where we should start scanning B from |
| 540 | j = SearchL1(pos); |
| 541 | |
| 542 | return true; |
| 543 | } |
| 544 | return false; |
| 545 | } |
| 546 | |
| 547 | static idx_t NextValid(const ValidityMask &bits, idx_t j, const idx_t n) { |
| 548 | if (j >= n) { |
| 549 | return n; |
| 550 | } |
| 551 | |
| 552 | // We can do a first approximation by checking entries one at a time |
| 553 | // which gives 64:1. |
| 554 | idx_t entry_idx, idx_in_entry; |
| 555 | bits.GetEntryIndex(row_idx: j, entry_idx, idx_in_entry); |
| 556 | auto entry = bits.GetValidityEntry(entry_idx: entry_idx++); |
| 557 | |
| 558 | // Trim the bits before the start position |
| 559 | entry &= (ValidityMask::ValidityBuffer::MAX_ENTRY << idx_in_entry); |
| 560 | |
| 561 | // Check the non-ragged entries |
| 562 | for (const auto entry_count = bits.EntryCount(count: n); entry_idx < entry_count; ++entry_idx) { |
| 563 | if (entry) { |
| 564 | for (; idx_in_entry < bits.BITS_PER_VALUE; ++idx_in_entry, ++j) { |
| 565 | if (bits.RowIsValid(entry, idx_in_entry)) { |
| 566 | return j; |
| 567 | } |
| 568 | } |
| 569 | } else { |
| 570 | j += bits.BITS_PER_VALUE - idx_in_entry; |
| 571 | } |
| 572 | |
| 573 | entry = bits.GetValidityEntry(entry_idx); |
| 574 | idx_in_entry = 0; |
| 575 | } |
| 576 | |
| 577 | // Check the final entry |
| 578 | for (; j < n; ++idx_in_entry, ++j) { |
| 579 | if (bits.RowIsValid(entry, idx_in_entry)) { |
| 580 | return j; |
| 581 | } |
| 582 | } |
| 583 | |
| 584 | return j; |
| 585 | } |
| 586 | |
| 587 | idx_t IEJoinUnion::JoinComplexBlocks(SelectionVector &lsel, SelectionVector &rsel) { |
| 588 | // 8. initialize join result as an empty list for tuple pairs |
| 589 | idx_t result_count = 0; |
| 590 | |
| 591 | // 11. for(i←1 to n) do |
| 592 | while (i < n) { |
| 593 | // 13. for (j ← pos+eqOff to n) do |
| 594 | for (;;) { |
| 595 | // 14. if B[j] = 1 then |
| 596 | |
| 597 | // Use the Bloom filter to find candidate blocks |
| 598 | while (j < n) { |
| 599 | auto bloom_begin = NextValid(bits: bloom_filter, j: j / BLOOM_CHUNK_BITS, n: bloom_count) * BLOOM_CHUNK_BITS; |
| 600 | auto bloom_end = MinValue<idx_t>(a: n, b: bloom_begin + BLOOM_CHUNK_BITS); |
| 601 | |
| 602 | j = MaxValue<idx_t>(a: j, b: bloom_begin); |
| 603 | j = NextValid(bits: bit_mask, j, n: bloom_end); |
| 604 | if (j < bloom_end) { |
| 605 | break; |
| 606 | } |
| 607 | } |
| 608 | |
| 609 | if (j >= n) { |
| 610 | break; |
| 611 | } |
| 612 | |
| 613 | // Filter out tuples with the same sign (they come from the same table) |
| 614 | const auto rrid = li[j]; |
| 615 | ++j; |
| 616 | |
| 617 | // 15. add tuples w.r.t. (L1[j], L1[i]) to join result |
| 618 | if (lrid > 0 && rrid < 0) { |
| 619 | lsel.set_index(idx: result_count, loc: sel_t(+lrid - 1)); |
| 620 | rsel.set_index(idx: result_count, loc: sel_t(-rrid - 1)); |
| 621 | ++result_count; |
| 622 | if (result_count == STANDARD_VECTOR_SIZE) { |
| 623 | // out of space! |
| 624 | return result_count; |
| 625 | } |
| 626 | } |
| 627 | } |
| 628 | ++i; |
| 629 | |
| 630 | if (!NextRow()) { |
| 631 | break; |
| 632 | } |
| 633 | } |
| 634 | |
| 635 | return result_count; |
| 636 | } |
| 637 | |
| 638 | class IEJoinLocalSourceState : public LocalSourceState { |
| 639 | public: |
| 640 | explicit IEJoinLocalSourceState(ClientContext &context, const PhysicalIEJoin &op) |
| 641 | : op(op), true_sel(STANDARD_VECTOR_SIZE), left_executor(context), right_executor(context), |
| 642 | left_matches(nullptr), right_matches(nullptr) { |
| 643 | auto &allocator = Allocator::Get(context); |
| 644 | if (op.conditions.size() < 3) { |
| 645 | return; |
| 646 | } |
| 647 | |
| 648 | vector<LogicalType> left_types; |
| 649 | vector<LogicalType> right_types; |
| 650 | for (idx_t i = 2; i < op.conditions.size(); ++i) { |
| 651 | const auto &cond = op.conditions[i]; |
| 652 | |
| 653 | left_types.push_back(x: cond.left->return_type); |
| 654 | left_executor.AddExpression(expr: *cond.left); |
| 655 | |
| 656 | right_types.push_back(x: cond.left->return_type); |
| 657 | right_executor.AddExpression(expr: *cond.right); |
| 658 | } |
| 659 | |
| 660 | left_keys.Initialize(allocator, types: left_types); |
| 661 | right_keys.Initialize(allocator, types: right_types); |
| 662 | } |
| 663 | |
| 664 | idx_t SelectOuterRows(bool *matches) { |
| 665 | idx_t count = 0; |
| 666 | for (; outer_idx < outer_count; ++outer_idx) { |
| 667 | if (!matches[outer_idx]) { |
| 668 | true_sel.set_index(idx: count++, loc: outer_idx); |
| 669 | if (count >= STANDARD_VECTOR_SIZE) { |
| 670 | outer_idx++; |
| 671 | break; |
| 672 | } |
| 673 | } |
| 674 | } |
| 675 | |
| 676 | return count; |
| 677 | } |
| 678 | |
| 679 | const PhysicalIEJoin &op; |
| 680 | |
| 681 | // Joining |
| 682 | unique_ptr<IEJoinUnion> joiner; |
| 683 | |
| 684 | idx_t left_base; |
| 685 | idx_t left_block_index; |
| 686 | |
| 687 | idx_t right_base; |
| 688 | idx_t right_block_index; |
| 689 | |
| 690 | // Trailing predicates |
| 691 | SelectionVector true_sel; |
| 692 | |
| 693 | ExpressionExecutor left_executor; |
| 694 | DataChunk left_keys; |
| 695 | |
| 696 | ExpressionExecutor right_executor; |
| 697 | DataChunk right_keys; |
| 698 | |
| 699 | // Outer joins |
| 700 | idx_t outer_idx; |
| 701 | idx_t outer_count; |
| 702 | bool *left_matches; |
| 703 | bool *right_matches; |
| 704 | }; |
| 705 | |
| 706 | void PhysicalIEJoin::ResolveComplexJoin(ExecutionContext &context, DataChunk &chunk, LocalSourceState &state_p) const { |
| 707 | auto &state = state_p.Cast<IEJoinLocalSourceState>(); |
| 708 | auto &ie_sink = sink_state->Cast<IEJoinGlobalState>(); |
| 709 | auto &left_table = *ie_sink.tables[0]; |
| 710 | auto &right_table = *ie_sink.tables[1]; |
| 711 | |
| 712 | const auto left_cols = children[0]->GetTypes().size(); |
| 713 | do { |
| 714 | SelectionVector lsel(STANDARD_VECTOR_SIZE); |
| 715 | SelectionVector rsel(STANDARD_VECTOR_SIZE); |
| 716 | auto result_count = state.joiner->JoinComplexBlocks(lsel, rsel); |
| 717 | if (result_count == 0) { |
| 718 | // exhausted this pair |
| 719 | return; |
| 720 | } |
| 721 | |
| 722 | // found matches: extract them |
| 723 | chunk.Reset(); |
| 724 | SliceSortedPayload(payload&: chunk, state&: left_table.global_sort_state, block_idx: state.left_block_index, result: lsel, result_count, left_cols: 0); |
| 725 | SliceSortedPayload(payload&: chunk, state&: right_table.global_sort_state, block_idx: state.right_block_index, result: rsel, result_count, |
| 726 | left_cols); |
| 727 | chunk.SetCardinality(result_count); |
| 728 | |
| 729 | auto sel = FlatVector::IncrementalSelectionVector(); |
| 730 | if (conditions.size() > 2) { |
| 731 | // If there are more expressions to compute, |
| 732 | // split the result chunk into the left and right halves |
| 733 | // so we can compute the values for comparison. |
| 734 | const auto tail_cols = conditions.size() - 2; |
| 735 | |
| 736 | DataChunk right_chunk; |
| 737 | chunk.Split(other&: right_chunk, split_idx: left_cols); |
| 738 | state.left_executor.SetChunk(chunk); |
| 739 | state.right_executor.SetChunk(right_chunk); |
| 740 | |
| 741 | auto tail_count = result_count; |
| 742 | auto true_sel = &state.true_sel; |
| 743 | for (size_t cmp_idx = 0; cmp_idx < tail_cols; ++cmp_idx) { |
| 744 | auto &left = state.left_keys.data[cmp_idx]; |
| 745 | state.left_executor.ExecuteExpression(expr_idx: cmp_idx, result&: left); |
| 746 | |
| 747 | auto &right = state.right_keys.data[cmp_idx]; |
| 748 | state.right_executor.ExecuteExpression(expr_idx: cmp_idx, result&: right); |
| 749 | |
| 750 | if (tail_count < result_count) { |
| 751 | left.Slice(sel: *sel, count: tail_count); |
| 752 | right.Slice(sel: *sel, count: tail_count); |
| 753 | } |
| 754 | tail_count = SelectJoinTail(condition: conditions[cmp_idx + 2].comparison, left, right, sel, count: tail_count, true_sel); |
| 755 | sel = true_sel; |
| 756 | } |
| 757 | chunk.Fuse(other&: right_chunk); |
| 758 | |
| 759 | if (tail_count < result_count) { |
| 760 | result_count = tail_count; |
| 761 | chunk.Slice(sel_vector: *sel, count: result_count); |
| 762 | } |
| 763 | } |
| 764 | |
| 765 | // found matches: mark the found matches if required |
| 766 | if (left_table.found_match) { |
| 767 | for (idx_t i = 0; i < result_count; i++) { |
| 768 | left_table.found_match[state.left_base + lsel[sel->get_index(idx: i)]] = true; |
| 769 | } |
| 770 | } |
| 771 | if (right_table.found_match) { |
| 772 | for (idx_t i = 0; i < result_count; i++) { |
| 773 | right_table.found_match[state.right_base + rsel[sel->get_index(idx: i)]] = true; |
| 774 | } |
| 775 | } |
| 776 | chunk.Verify(); |
| 777 | } while (chunk.size() == 0); |
| 778 | } |
| 779 | |
| 780 | class IEJoinGlobalSourceState : public GlobalSourceState { |
| 781 | public: |
| 782 | explicit IEJoinGlobalSourceState(const PhysicalIEJoin &op) |
| 783 | : op(op), initialized(false), next_pair(0), completed(0), left_outers(0), next_left(0), right_outers(0), |
| 784 | next_right(0) { |
| 785 | } |
| 786 | |
| 787 | void Initialize(IEJoinGlobalState &sink_state) { |
| 788 | lock_guard<mutex> initializing(lock); |
| 789 | if (initialized) { |
| 790 | return; |
| 791 | } |
| 792 | |
| 793 | // Compute the starting row for reach block |
| 794 | // (In theory these are all the same size, but you never know...) |
| 795 | auto &left_table = *sink_state.tables[0]; |
| 796 | const auto left_blocks = left_table.BlockCount(); |
| 797 | idx_t left_base = 0; |
| 798 | |
| 799 | for (size_t lhs = 0; lhs < left_blocks; ++lhs) { |
| 800 | left_bases.emplace_back(args&: left_base); |
| 801 | left_base += left_table.BlockSize(i: lhs); |
| 802 | } |
| 803 | |
| 804 | auto &right_table = *sink_state.tables[1]; |
| 805 | const auto right_blocks = right_table.BlockCount(); |
| 806 | idx_t right_base = 0; |
| 807 | for (size_t rhs = 0; rhs < right_blocks; ++rhs) { |
| 808 | right_bases.emplace_back(args&: right_base); |
| 809 | right_base += right_table.BlockSize(i: rhs); |
| 810 | } |
| 811 | |
| 812 | // Outer join block counts |
| 813 | if (left_table.found_match) { |
| 814 | left_outers = left_blocks; |
| 815 | } |
| 816 | |
| 817 | if (right_table.found_match) { |
| 818 | right_outers = right_blocks; |
| 819 | } |
| 820 | |
| 821 | // Ready for action |
| 822 | initialized = true; |
| 823 | } |
| 824 | |
| 825 | public: |
| 826 | idx_t MaxThreads() override { |
| 827 | // We can't leverage any more threads than block pairs. |
| 828 | const auto &sink_state = (op.sink_state->Cast<IEJoinGlobalState>()); |
| 829 | return sink_state.tables[0]->BlockCount() * sink_state.tables[1]->BlockCount(); |
| 830 | } |
| 831 | |
| 832 | void GetNextPair(ClientContext &client, IEJoinGlobalState &gstate, IEJoinLocalSourceState &lstate) { |
| 833 | auto &left_table = *gstate.tables[0]; |
| 834 | auto &right_table = *gstate.tables[1]; |
| 835 | |
| 836 | const auto left_blocks = left_table.BlockCount(); |
| 837 | const auto right_blocks = right_table.BlockCount(); |
| 838 | const auto pair_count = left_blocks * right_blocks; |
| 839 | |
| 840 | // Regular block |
| 841 | const auto i = next_pair++; |
| 842 | if (i < pair_count) { |
| 843 | const auto b1 = i / right_blocks; |
| 844 | const auto b2 = i % right_blocks; |
| 845 | |
| 846 | lstate.left_block_index = b1; |
| 847 | lstate.left_base = left_bases[b1]; |
| 848 | |
| 849 | lstate.right_block_index = b2; |
| 850 | lstate.right_base = right_bases[b2]; |
| 851 | |
| 852 | lstate.joiner = make_uniq<IEJoinUnion>(args&: client, args: op, args&: left_table, args: b1, args&: right_table, args: b2); |
| 853 | return; |
| 854 | } |
| 855 | |
| 856 | // Outer joins |
| 857 | if (!left_outers && !right_outers) { |
| 858 | return; |
| 859 | } |
| 860 | |
| 861 | // Spin wait for regular blocks to finish(!) |
| 862 | while (completed < pair_count) { |
| 863 | std::this_thread::yield(); |
| 864 | } |
| 865 | |
| 866 | // Left outer blocks |
| 867 | const auto l = next_left++; |
| 868 | if (l < left_outers) { |
| 869 | lstate.joiner = nullptr; |
| 870 | lstate.left_block_index = l; |
| 871 | lstate.left_base = left_bases[l]; |
| 872 | |
| 873 | lstate.left_matches = left_table.found_match.get() + lstate.left_base; |
| 874 | lstate.outer_idx = 0; |
| 875 | lstate.outer_count = left_table.BlockSize(i: l); |
| 876 | return; |
| 877 | } else { |
| 878 | lstate.left_matches = nullptr; |
| 879 | } |
| 880 | |
| 881 | // Right outer block |
| 882 | const auto r = next_right++; |
| 883 | if (r < right_outers) { |
| 884 | lstate.joiner = nullptr; |
| 885 | lstate.right_block_index = r; |
| 886 | lstate.right_base = right_bases[r]; |
| 887 | |
| 888 | lstate.right_matches = right_table.found_match.get() + lstate.right_base; |
| 889 | lstate.outer_idx = 0; |
| 890 | lstate.outer_count = right_table.BlockSize(i: r); |
| 891 | return; |
| 892 | } else { |
| 893 | lstate.right_matches = nullptr; |
| 894 | } |
| 895 | } |
| 896 | |
| 897 | void PairCompleted(ClientContext &client, IEJoinGlobalState &gstate, IEJoinLocalSourceState &lstate) { |
| 898 | lstate.joiner.reset(); |
| 899 | ++completed; |
| 900 | GetNextPair(client, gstate, lstate); |
| 901 | } |
| 902 | |
| 903 | const PhysicalIEJoin &op; |
| 904 | |
| 905 | mutex lock; |
| 906 | bool initialized; |
| 907 | |
| 908 | // Join queue state |
| 909 | std::atomic<size_t> next_pair; |
| 910 | std::atomic<size_t> completed; |
| 911 | |
| 912 | // Block base row number |
| 913 | vector<idx_t> left_bases; |
| 914 | vector<idx_t> right_bases; |
| 915 | |
| 916 | // Outer joins |
| 917 | idx_t left_outers; |
| 918 | std::atomic<idx_t> next_left; |
| 919 | |
| 920 | idx_t right_outers; |
| 921 | std::atomic<idx_t> next_right; |
| 922 | }; |
| 923 | |
| 924 | unique_ptr<GlobalSourceState> PhysicalIEJoin::GetGlobalSourceState(ClientContext &context) const { |
| 925 | return make_uniq<IEJoinGlobalSourceState>(args: *this); |
| 926 | } |
| 927 | |
| 928 | unique_ptr<LocalSourceState> PhysicalIEJoin::GetLocalSourceState(ExecutionContext &context, |
| 929 | GlobalSourceState &gstate) const { |
| 930 | return make_uniq<IEJoinLocalSourceState>(args&: context.client, args: *this); |
| 931 | } |
| 932 | |
| 933 | SourceResultType PhysicalIEJoin::GetData(ExecutionContext &context, DataChunk &result, |
| 934 | OperatorSourceInput &input) const { |
| 935 | auto &ie_sink = sink_state->Cast<IEJoinGlobalState>(); |
| 936 | auto &ie_gstate = input.global_state.Cast<IEJoinGlobalSourceState>(); |
| 937 | auto &ie_lstate = input.local_state.Cast<IEJoinLocalSourceState>(); |
| 938 | |
| 939 | ie_gstate.Initialize(sink_state&: ie_sink); |
| 940 | |
| 941 | if (!ie_lstate.joiner && !ie_lstate.left_matches && !ie_lstate.right_matches) { |
| 942 | ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
| 943 | } |
| 944 | |
| 945 | // Process INNER results |
| 946 | while (ie_lstate.joiner) { |
| 947 | ResolveComplexJoin(context, chunk&: result, state_p&: ie_lstate); |
| 948 | |
| 949 | if (result.size()) { |
| 950 | return SourceResultType::HAVE_MORE_OUTPUT; |
| 951 | } |
| 952 | |
| 953 | ie_gstate.PairCompleted(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
| 954 | } |
| 955 | |
| 956 | // Process LEFT OUTER results |
| 957 | const auto left_cols = children[0]->GetTypes().size(); |
| 958 | while (ie_lstate.left_matches) { |
| 959 | const idx_t count = ie_lstate.SelectOuterRows(matches: ie_lstate.left_matches); |
| 960 | if (!count) { |
| 961 | ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
| 962 | continue; |
| 963 | } |
| 964 | SliceSortedPayload(payload&: result, state&: ie_sink.tables[0]->global_sort_state, block_idx: ie_lstate.left_block_index, result: ie_lstate.true_sel, |
| 965 | result_count: count); |
| 966 | |
| 967 | // Fill in NULLs to the right |
| 968 | for (auto col_idx = left_cols; col_idx < result.ColumnCount(); ++col_idx) { |
| 969 | result.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR); |
| 970 | ConstantVector::SetNull(vector&: result.data[col_idx], is_null: true); |
| 971 | } |
| 972 | |
| 973 | result.SetCardinality(count); |
| 974 | result.Verify(); |
| 975 | |
| 976 | return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 977 | } |
| 978 | |
| 979 | // Process RIGHT OUTER results |
| 980 | while (ie_lstate.right_matches) { |
| 981 | const idx_t count = ie_lstate.SelectOuterRows(matches: ie_lstate.right_matches); |
| 982 | if (!count) { |
| 983 | ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
| 984 | continue; |
| 985 | } |
| 986 | |
| 987 | SliceSortedPayload(payload&: result, state&: ie_sink.tables[1]->global_sort_state, block_idx: ie_lstate.right_block_index, |
| 988 | result: ie_lstate.true_sel, result_count: count, left_cols); |
| 989 | |
| 990 | // Fill in NULLs to the left |
| 991 | for (idx_t col_idx = 0; col_idx < left_cols; ++col_idx) { |
| 992 | result.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR); |
| 993 | ConstantVector::SetNull(vector&: result.data[col_idx], is_null: true); |
| 994 | } |
| 995 | |
| 996 | result.SetCardinality(count); |
| 997 | result.Verify(); |
| 998 | |
| 999 | break; |
| 1000 | } |
| 1001 | |
| 1002 | return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 1003 | } |
| 1004 | |
| 1005 | //===--------------------------------------------------------------------===// |
| 1006 | // Pipeline Construction |
| 1007 | //===--------------------------------------------------------------------===// |
| 1008 | void PhysicalIEJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
| 1009 | D_ASSERT(children.size() == 2); |
| 1010 | if (meta_pipeline.HasRecursiveCTE()) { |
| 1011 | throw NotImplementedException("IEJoins are not supported in recursive CTEs yet" ); |
| 1012 | } |
| 1013 | |
| 1014 | // becomes a source after both children fully sink their data |
| 1015 | meta_pipeline.GetState().SetPipelineSource(pipeline&: current, op&: *this); |
| 1016 | |
| 1017 | // Create one child meta pipeline that will hold the LHS and RHS pipelines |
| 1018 | auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this); |
| 1019 | |
| 1020 | // Build out LHS |
| 1021 | auto lhs_pipeline = child_meta_pipeline.GetBasePipeline(); |
| 1022 | children[0]->BuildPipelines(current&: *lhs_pipeline, meta_pipeline&: child_meta_pipeline); |
| 1023 | |
| 1024 | // Build out RHS |
| 1025 | auto rhs_pipeline = child_meta_pipeline.CreatePipeline(); |
| 1026 | children[1]->BuildPipelines(current&: *rhs_pipeline, meta_pipeline&: child_meta_pipeline); |
| 1027 | |
| 1028 | // Despite having the same sink, RHS and everything created after it need their own (same) PipelineFinishEvent |
| 1029 | child_meta_pipeline.AddFinishEvent(pipeline: rhs_pipeline); |
| 1030 | } |
| 1031 | |
| 1032 | } // namespace duckdb |
| 1033 | |