1#include "duckdb/execution/operator/join/physical_iejoin.hpp"
2
3#include "duckdb/common/operator/comparison_operators.hpp"
4#include "duckdb/common/row_operations/row_operations.hpp"
5#include "duckdb/common/sort/sort.hpp"
6#include "duckdb/common/sort/sorted_block.hpp"
7#include "duckdb/common/vector_operations/vector_operations.hpp"
8#include "duckdb/execution/expression_executor.hpp"
9#include "duckdb/main/client_context.hpp"
10#include "duckdb/parallel/event.hpp"
11#include "duckdb/parallel/meta_pipeline.hpp"
12#include "duckdb/parallel/thread_context.hpp"
13#include "duckdb/planner/expression/bound_reference_expression.hpp"
14
15#include <thread>
16
17namespace duckdb {
18
19PhysicalIEJoin::PhysicalIEJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left,
20 unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type,
21 idx_t estimated_cardinality)
22 : PhysicalRangeJoin(op, PhysicalOperatorType::IE_JOIN, std::move(left), std::move(right), std::move(cond),
23 join_type, estimated_cardinality) {
24
25 // 1. let L1 (resp. L2) be the array of column X (resp. Y)
26 D_ASSERT(conditions.size() >= 2);
27 lhs_orders.resize(new_size: 2);
28 rhs_orders.resize(new_size: 2);
29 for (idx_t i = 0; i < 2; ++i) {
30 auto &cond = conditions[i];
31 D_ASSERT(cond.left->return_type == cond.right->return_type);
32 join_key_types.push_back(x: cond.left->return_type);
33
34 // Convert the conditions to sort orders
35 auto left = cond.left->Copy();
36 auto right = cond.right->Copy();
37 auto sense = OrderType::INVALID;
38
39 // 2. if (op1 ∈ {>, ≥}) sort L1 in descending order
40 // 3. else if (op1 ∈ {<, ≤}) sort L1 in ascending order
41 // 4. if (op2 ∈ {>, ≥}) sort L2 in ascending order
42 // 5. else if (op2 ∈ {<, ≤}) sort L2 in descending order
43 switch (cond.comparison) {
44 case ExpressionType::COMPARE_GREATERTHAN:
45 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
46 sense = i ? OrderType::ASCENDING : OrderType::DESCENDING;
47 break;
48 case ExpressionType::COMPARE_LESSTHAN:
49 case ExpressionType::COMPARE_LESSTHANOREQUALTO:
50 sense = i ? OrderType::DESCENDING : OrderType::ASCENDING;
51 break;
52 default:
53 throw NotImplementedException("Unimplemented join type for IEJoin");
54 }
55 lhs_orders[i].emplace_back(args: BoundOrderByNode(sense, OrderByNullType::NULLS_LAST, std::move(left)));
56 rhs_orders[i].emplace_back(args: BoundOrderByNode(sense, OrderByNullType::NULLS_LAST, std::move(right)));
57 }
58
59 for (idx_t i = 2; i < conditions.size(); ++i) {
60 auto &cond = conditions[i];
61 D_ASSERT(cond.left->return_type == cond.right->return_type);
62 join_key_types.push_back(x: cond.left->return_type);
63 }
64}
65
66//===--------------------------------------------------------------------===//
67// Sink
68//===--------------------------------------------------------------------===//
69class IEJoinLocalState : public LocalSinkState {
70public:
71 using LocalSortedTable = PhysicalRangeJoin::LocalSortedTable;
72
73 IEJoinLocalState(ClientContext &context, const PhysicalRangeJoin &op, const idx_t child)
74 : table(context, op, child) {
75 }
76
77 //! The local sort state
78 LocalSortedTable table;
79};
80
81class IEJoinGlobalState : public GlobalSinkState {
82public:
83 using GlobalSortedTable = PhysicalRangeJoin::GlobalSortedTable;
84
85public:
86 IEJoinGlobalState(ClientContext &context, const PhysicalIEJoin &op) : child(0) {
87 tables.resize(new_size: 2);
88 RowLayout lhs_layout;
89 lhs_layout.Initialize(types: op.children[0]->types);
90 vector<BoundOrderByNode> lhs_order;
91 lhs_order.emplace_back(args: op.lhs_orders[0][0].Copy());
92 tables[0] = make_uniq<GlobalSortedTable>(args&: context, args&: lhs_order, args&: lhs_layout);
93
94 RowLayout rhs_layout;
95 rhs_layout.Initialize(types: op.children[1]->types);
96 vector<BoundOrderByNode> rhs_order;
97 rhs_order.emplace_back(args: op.rhs_orders[0][0].Copy());
98 tables[1] = make_uniq<GlobalSortedTable>(args&: context, args&: rhs_order, args&: rhs_layout);
99 }
100
101 IEJoinGlobalState(IEJoinGlobalState &prev)
102 : GlobalSinkState(prev), tables(std::move(prev.tables)), child(prev.child + 1) {
103 }
104
105 void Sink(DataChunk &input, IEJoinLocalState &lstate) {
106 auto &table = *tables[child];
107 auto &global_sort_state = table.global_sort_state;
108 auto &local_sort_state = lstate.table.local_sort_state;
109
110 // Sink the data into the local sort state
111 lstate.table.Sink(input, global_sort_state);
112
113 // When sorting data reaches a certain size, we sort it
114 if (local_sort_state.SizeInBytes() >= table.memory_per_thread) {
115 local_sort_state.Sort(global_sort_state, reorder_heap: true);
116 }
117 }
118
119 vector<unique_ptr<GlobalSortedTable>> tables;
120 size_t child;
121};
122
123unique_ptr<GlobalSinkState> PhysicalIEJoin::GetGlobalSinkState(ClientContext &context) const {
124 D_ASSERT(!sink_state);
125 return make_uniq<IEJoinGlobalState>(args&: context, args: *this);
126}
127
128unique_ptr<LocalSinkState> PhysicalIEJoin::GetLocalSinkState(ExecutionContext &context) const {
129 idx_t sink_child = 0;
130 if (sink_state) {
131 const auto &ie_sink = sink_state->Cast<IEJoinGlobalState>();
132 sink_child = ie_sink.child;
133 }
134 return make_uniq<IEJoinLocalState>(args&: context.client, args: *this, args&: sink_child);
135}
136
137SinkResultType PhysicalIEJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
138 auto &gstate = input.global_state.Cast<IEJoinGlobalState>();
139 auto &lstate = input.local_state.Cast<IEJoinLocalState>();
140
141 gstate.Sink(input&: chunk, lstate);
142
143 return SinkResultType::NEED_MORE_INPUT;
144}
145
146void PhysicalIEJoin::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
147 auto &gstate = gstate_p.Cast<IEJoinGlobalState>();
148 auto &lstate = lstate_p.Cast<IEJoinLocalState>();
149 gstate.tables[gstate.child]->Combine(ltable&: lstate.table);
150 auto &client_profiler = QueryProfiler::Get(context&: context.client);
151
152 context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.table.executor, name: gstate.child ? "rhs_executor" : "lhs_executor", id: 1);
153 client_profiler.Flush(profiler&: context.thread.profiler);
154}
155
156//===--------------------------------------------------------------------===//
157// Finalize
158//===--------------------------------------------------------------------===//
159SinkFinalizeType PhysicalIEJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
160 GlobalSinkState &gstate_p) const {
161 auto &gstate = gstate_p.Cast<IEJoinGlobalState>();
162 auto &table = *gstate.tables[gstate.child];
163 auto &global_sort_state = table.global_sort_state;
164
165 if ((gstate.child == 1 && IsRightOuterJoin(type: join_type)) || (gstate.child == 0 && IsLeftOuterJoin(type: join_type))) {
166 // for FULL/LEFT/RIGHT OUTER JOIN, initialize found_match to false for every tuple
167 table.IntializeMatches();
168 }
169 if (gstate.child == 1 && global_sort_state.sorted_blocks.empty() && EmptyResultIfRHSIsEmpty()) {
170 // Empty input!
171 return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
172 }
173
174 // Sort the current input child
175 table.Finalize(pipeline, event);
176
177 // Move to the next input child
178 ++gstate.child;
179
180 return SinkFinalizeType::READY;
181}
182
183//===--------------------------------------------------------------------===//
184// Operator
185//===--------------------------------------------------------------------===//
186OperatorResultType PhysicalIEJoin::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
187 GlobalOperatorState &gstate, OperatorState &state) const {
188 return OperatorResultType::FINISHED;
189}
190
191//===--------------------------------------------------------------------===//
192// Source
193//===--------------------------------------------------------------------===//
194struct IEJoinUnion {
195 using SortedTable = PhysicalRangeJoin::GlobalSortedTable;
196
197 static idx_t AppendKey(SortedTable &table, ExpressionExecutor &executor, SortedTable &marked, int64_t increment,
198 int64_t base, const idx_t block_idx);
199
200 static void Sort(SortedTable &table) {
201 auto &global_sort_state = table.global_sort_state;
202 global_sort_state.PrepareMergePhase();
203 while (global_sort_state.sorted_blocks.size() > 1) {
204 global_sort_state.InitializeMergeRound();
205 MergeSorter merge_sorter(global_sort_state, global_sort_state.buffer_manager);
206 merge_sorter.PerformInMergeRound();
207 global_sort_state.CompleteMergeRound(keep_radix_data: true);
208 }
209 }
210
211 template <typename T>
212 static vector<T> ExtractColumn(SortedTable &table, idx_t col_idx) {
213 vector<T> result;
214 result.reserve(table.count);
215
216 auto &gstate = table.global_sort_state;
217 auto &blocks = *gstate.sorted_blocks[0]->payload_data;
218 PayloadScanner scanner(blocks, gstate, false);
219
220 DataChunk payload;
221 payload.Initialize(allocator&: Allocator::DefaultAllocator(), types: gstate.payload_layout.GetTypes());
222 for (;;) {
223 scanner.Scan(chunk&: payload);
224 const auto count = payload.size();
225 if (!count) {
226 break;
227 }
228
229 const auto data_ptr = FlatVector::GetData<T>(payload.data[col_idx]);
230 result.insert(result.end(), data_ptr, data_ptr + count);
231 }
232
233 return result;
234 }
235
236 IEJoinUnion(ClientContext &context, const PhysicalIEJoin &op, SortedTable &t1, const idx_t b1, SortedTable &t2,
237 const idx_t b2);
238
239 idx_t SearchL1(idx_t pos);
240 bool NextRow();
241
242 //! Inverted loop
243 idx_t JoinComplexBlocks(SelectionVector &lsel, SelectionVector &rsel);
244
245 //! L1
246 unique_ptr<SortedTable> l1;
247 //! L2
248 unique_ptr<SortedTable> l2;
249
250 //! Li
251 vector<int64_t> li;
252 //! P
253 vector<idx_t> p;
254
255 //! B
256 vector<validity_t> bit_array;
257 ValidityMask bit_mask;
258
259 //! Bloom Filter
260 static constexpr idx_t BLOOM_CHUNK_BITS = 1024;
261 idx_t bloom_count;
262 vector<validity_t> bloom_array;
263 ValidityMask bloom_filter;
264
265 //! Iteration state
266 idx_t n;
267 idx_t i;
268 idx_t j;
269 unique_ptr<SBIterator> op1;
270 unique_ptr<SBIterator> off1;
271 unique_ptr<SBIterator> op2;
272 unique_ptr<SBIterator> off2;
273 int64_t lrid;
274};
275
276idx_t IEJoinUnion::AppendKey(SortedTable &table, ExpressionExecutor &executor, SortedTable &marked, int64_t increment,
277 int64_t base, const idx_t block_idx) {
278 LocalSortState local_sort_state;
279 local_sort_state.Initialize(global_sort_state&: marked.global_sort_state, buffer_manager_p&: marked.global_sort_state.buffer_manager);
280
281 // Reading
282 const auto valid = table.count - table.has_null;
283 auto &gstate = table.global_sort_state;
284 PayloadScanner scanner(gstate, block_idx);
285 auto table_idx = block_idx * gstate.block_capacity;
286
287 DataChunk scanned;
288 scanned.Initialize(allocator&: Allocator::DefaultAllocator(), types: scanner.GetPayloadTypes());
289
290 // Writing
291 auto types = local_sort_state.sort_layout->logical_types;
292 const idx_t payload_idx = types.size();
293
294 const auto &payload_types = local_sort_state.payload_layout->GetTypes();
295 types.insert(position: types.end(), first: payload_types.begin(), last: payload_types.end());
296 const idx_t rid_idx = types.size() - 1;
297
298 DataChunk keys;
299 DataChunk payload;
300 keys.Initialize(allocator&: Allocator::DefaultAllocator(), types);
301
302 idx_t inserted = 0;
303 for (auto rid = base; table_idx < valid;) {
304 scanner.Scan(chunk&: scanned);
305
306 // NULLs are at the end, so stop when we reach them
307 auto scan_count = scanned.size();
308 if (table_idx + scan_count > valid) {
309 scan_count = valid - table_idx;
310 scanned.SetCardinality(scan_count);
311 }
312 if (scan_count == 0) {
313 break;
314 }
315 table_idx += scan_count;
316
317 // Compute the input columns from the payload
318 keys.Reset();
319 keys.Split(other&: payload, split_idx: rid_idx);
320 executor.Execute(input&: scanned, result&: keys);
321
322 // Mark the rid column
323 payload.data[0].Sequence(start: rid, increment, count: scan_count);
324 payload.SetCardinality(scan_count);
325 keys.Fuse(other&: payload);
326 rid += increment * scan_count;
327
328 // Sort on the sort columns (which will no longer be needed)
329 keys.Split(other&: payload, split_idx: payload_idx);
330 local_sort_state.SinkChunk(sort&: keys, payload);
331 inserted += scan_count;
332 keys.Fuse(other&: payload);
333
334 // Flush when we have enough data
335 if (local_sort_state.SizeInBytes() >= marked.memory_per_thread) {
336 local_sort_state.Sort(global_sort_state&: marked.global_sort_state, reorder_heap: true);
337 }
338 }
339 marked.global_sort_state.AddLocalState(local_sort_state);
340 marked.count += inserted;
341
342 return inserted;
343}
344
345IEJoinUnion::IEJoinUnion(ClientContext &context, const PhysicalIEJoin &op, SortedTable &t1, const idx_t b1,
346 SortedTable &t2, const idx_t b2)
347 : n(0), i(0) {
348 // input : query Q with 2 join predicates t1.X op1 t2.X' and t1.Y op2 t2.Y', tables T, T' of sizes m and n resp.
349 // output: a list of tuple pairs (ti , tj)
350 // Note that T/T' are already sorted on X/X' and contain the payload data
351 // We only join the two block numbers and use the sizes of the blocks as the counts
352
353 // 0. Filter out tables with no overlap
354 if (!t1.BlockSize(i: b1) || !t2.BlockSize(i: b2)) {
355 return;
356 }
357
358 const auto &cmp1 = op.conditions[0].comparison;
359 SBIterator bounds1(t1.global_sort_state, cmp1);
360 SBIterator bounds2(t2.global_sort_state, cmp1);
361
362 // t1.X[0] op1 t2.X'[-1]
363 bounds1.SetIndex(bounds1.block_capacity * b1);
364 bounds2.SetIndex(bounds2.block_capacity * b2 + t2.BlockSize(i: b2) - 1);
365 if (!bounds1.Compare(other: bounds2)) {
366 return;
367 }
368
369 // 1. let L1 (resp. L2) be the array of column X (resp. Y )
370 const auto &order1 = op.lhs_orders[0][0];
371 const auto &order2 = op.lhs_orders[1][0];
372
373 // 2. if (op1 ∈ {>, ≥}) sort L1 in descending order
374 // 3. else if (op1 ∈ {<, ≤}) sort L1 in ascending order
375
376 // For the union algorithm, we make a unified table with the keys and the rids as the payload:
377 // X/X', Y/Y', R/R'/Li
378 // The first position is the sort key.
379 vector<LogicalType> types;
380 types.emplace_back(args&: order2.expression->return_type);
381 types.emplace_back(args: LogicalType::BIGINT);
382 RowLayout payload_layout;
383 payload_layout.Initialize(types);
384
385 // Sort on the first expression
386 auto ref = make_uniq<BoundReferenceExpression>(args&: order1.expression->return_type, args: 0);
387 vector<BoundOrderByNode> orders;
388 orders.emplace_back(args: order1.type, args: order1.null_order, args: std::move(ref));
389
390 l1 = make_uniq<SortedTable>(args&: context, args&: orders, args&: payload_layout);
391
392 // LHS has positive rids
393 ExpressionExecutor l_executor(context);
394 l_executor.AddExpression(expr: *order1.expression);
395 l_executor.AddExpression(expr: *order2.expression);
396 AppendKey(table&: t1, executor&: l_executor, marked&: *l1, increment: 1, base: 1, block_idx: b1);
397
398 // RHS has negative rids
399 ExpressionExecutor r_executor(context);
400 r_executor.AddExpression(expr: *op.rhs_orders[0][0].expression);
401 r_executor.AddExpression(expr: *op.rhs_orders[1][0].expression);
402 AppendKey(table&: t2, executor&: r_executor, marked&: *l1, increment: -1, base: -1, block_idx: b2);
403
404 if (l1->global_sort_state.sorted_blocks.empty()) {
405 return;
406 }
407
408 Sort(table&: *l1);
409
410 op1 = make_uniq<SBIterator>(args&: l1->global_sort_state, args: cmp1);
411 off1 = make_uniq<SBIterator>(args&: l1->global_sort_state, args: cmp1);
412
413 // We don't actually need the L1 column, just its sort key, which is in the sort blocks
414 li = ExtractColumn<int64_t>(table&: *l1, col_idx: types.size() - 1);
415
416 // 4. if (op2 ∈ {>, ≥}) sort L2 in ascending order
417 // 5. else if (op2 ∈ {<, ≤}) sort L2 in descending order
418
419 // We sort on Y/Y' to obtain the sort keys and the permutation array.
420 // For this we just need a two-column table of Y, P
421 types.clear();
422 types.emplace_back(args: LogicalType::BIGINT);
423 payload_layout.Initialize(types);
424
425 // Sort on the first expression
426 orders.clear();
427 ref = make_uniq<BoundReferenceExpression>(args&: order2.expression->return_type, args: 0);
428 orders.emplace_back(args: order2.type, args: order2.null_order, args: std::move(ref));
429
430 ExpressionExecutor executor(context);
431 executor.AddExpression(expr: *orders[0].expression);
432
433 l2 = make_uniq<SortedTable>(args&: context, args&: orders, args&: payload_layout);
434 for (idx_t base = 0, block_idx = 0; block_idx < l1->BlockCount(); ++block_idx) {
435 base += AppendKey(table&: *l1, executor, marked&: *l2, increment: 1, base, block_idx);
436 }
437
438 Sort(table&: *l2);
439
440 // We don't actually need the L2 column, just its sort key, which is in the sort blocks
441
442 // 6. compute the permutation array P of L2 w.r.t. L1
443 p = ExtractColumn<idx_t>(table&: *l2, col_idx: types.size() - 1);
444
445 // 7. initialize bit-array B (|B| = n), and set all bits to 0
446 n = l2->count.load();
447 bit_array.resize(new_size: ValidityMask::EntryCount(count: n), x: 0);
448 bit_mask.Initialize(validity: bit_array.data());
449
450 // Bloom filter
451 bloom_count = (n + (BLOOM_CHUNK_BITS - 1)) / BLOOM_CHUNK_BITS;
452 bloom_array.resize(new_size: ValidityMask::EntryCount(count: bloom_count), x: 0);
453 bloom_filter.Initialize(validity: bloom_array.data());
454
455 // 11. for(i←1 to n) do
456 const auto &cmp2 = op.conditions[1].comparison;
457 op2 = make_uniq<SBIterator>(args&: l2->global_sort_state, args: cmp2);
458 off2 = make_uniq<SBIterator>(args&: l2->global_sort_state, args: cmp2);
459 i = 0;
460 j = 0;
461 (void)NextRow();
462}
463
464idx_t IEJoinUnion::SearchL1(idx_t pos) {
465 // Perform an exponential search in the appropriate direction
466 op1->SetIndex(pos);
467
468 idx_t step = 1;
469 auto hi = pos;
470 auto lo = pos;
471 if (!op1->cmp) {
472 // Scan left for loose inequality
473 lo -= MinValue(a: step, b: lo);
474 step *= 2;
475 off1->SetIndex(lo);
476 while (lo > 0 && op1->Compare(other: *off1)) {
477 hi = lo;
478 lo -= MinValue(a: step, b: lo);
479 step *= 2;
480 off1->SetIndex(lo);
481 }
482 } else {
483 // Scan right for strict inequality
484 hi += MinValue(a: step, b: n - hi);
485 step *= 2;
486 off1->SetIndex(hi);
487 while (hi < n && !op1->Compare(other: *off1)) {
488 lo = hi;
489 hi += MinValue(a: step, b: n - hi);
490 step *= 2;
491 off1->SetIndex(hi);
492 }
493 }
494
495 // Binary search the target area
496 while (lo < hi) {
497 const auto mid = lo + (hi - lo) / 2;
498 off1->SetIndex(mid);
499 if (op1->Compare(other: *off1)) {
500 hi = mid;
501 } else {
502 lo = mid + 1;
503 }
504 }
505
506 off1->SetIndex(lo);
507
508 return lo;
509}
510
511bool IEJoinUnion::NextRow() {
512 for (; i < n; ++i) {
513 // 12. pos ← P[i]
514 auto pos = p[i];
515 lrid = li[pos];
516 if (lrid < 0) {
517 continue;
518 }
519
520 // 16. B[pos] ← 1
521 op2->SetIndex(i);
522 for (; off2->GetIndex() < n; ++(*off2)) {
523 if (!off2->Compare(other: *op2)) {
524 break;
525 }
526 const auto p2 = p[off2->GetIndex()];
527 if (li[p2] < 0) {
528 // Only mark rhs matches.
529 bit_mask.SetValid(p2);
530 bloom_filter.SetValid(p2 / BLOOM_CHUNK_BITS);
531 }
532 }
533
534 // 9. if (op1 ∈ {≤,≥} and op2 ∈ {≤,≥}) eqOff = 0
535 // 10. else eqOff = 1
536 // No, because there could be more than one equal value.
537 // Find the leftmost off1 where L1[pos] op1 L1[off1..n]
538 // These are the rows that satisfy the op1 condition
539 // and that is where we should start scanning B from
540 j = SearchL1(pos);
541
542 return true;
543 }
544 return false;
545}
546
547static idx_t NextValid(const ValidityMask &bits, idx_t j, const idx_t n) {
548 if (j >= n) {
549 return n;
550 }
551
552 // We can do a first approximation by checking entries one at a time
553 // which gives 64:1.
554 idx_t entry_idx, idx_in_entry;
555 bits.GetEntryIndex(row_idx: j, entry_idx, idx_in_entry);
556 auto entry = bits.GetValidityEntry(entry_idx: entry_idx++);
557
558 // Trim the bits before the start position
559 entry &= (ValidityMask::ValidityBuffer::MAX_ENTRY << idx_in_entry);
560
561 // Check the non-ragged entries
562 for (const auto entry_count = bits.EntryCount(count: n); entry_idx < entry_count; ++entry_idx) {
563 if (entry) {
564 for (; idx_in_entry < bits.BITS_PER_VALUE; ++idx_in_entry, ++j) {
565 if (bits.RowIsValid(entry, idx_in_entry)) {
566 return j;
567 }
568 }
569 } else {
570 j += bits.BITS_PER_VALUE - idx_in_entry;
571 }
572
573 entry = bits.GetValidityEntry(entry_idx);
574 idx_in_entry = 0;
575 }
576
577 // Check the final entry
578 for (; j < n; ++idx_in_entry, ++j) {
579 if (bits.RowIsValid(entry, idx_in_entry)) {
580 return j;
581 }
582 }
583
584 return j;
585}
586
587idx_t IEJoinUnion::JoinComplexBlocks(SelectionVector &lsel, SelectionVector &rsel) {
588 // 8. initialize join result as an empty list for tuple pairs
589 idx_t result_count = 0;
590
591 // 11. for(i←1 to n) do
592 while (i < n) {
593 // 13. for (j ← pos+eqOff to n) do
594 for (;;) {
595 // 14. if B[j] = 1 then
596
597 // Use the Bloom filter to find candidate blocks
598 while (j < n) {
599 auto bloom_begin = NextValid(bits: bloom_filter, j: j / BLOOM_CHUNK_BITS, n: bloom_count) * BLOOM_CHUNK_BITS;
600 auto bloom_end = MinValue<idx_t>(a: n, b: bloom_begin + BLOOM_CHUNK_BITS);
601
602 j = MaxValue<idx_t>(a: j, b: bloom_begin);
603 j = NextValid(bits: bit_mask, j, n: bloom_end);
604 if (j < bloom_end) {
605 break;
606 }
607 }
608
609 if (j >= n) {
610 break;
611 }
612
613 // Filter out tuples with the same sign (they come from the same table)
614 const auto rrid = li[j];
615 ++j;
616
617 // 15. add tuples w.r.t. (L1[j], L1[i]) to join result
618 if (lrid > 0 && rrid < 0) {
619 lsel.set_index(idx: result_count, loc: sel_t(+lrid - 1));
620 rsel.set_index(idx: result_count, loc: sel_t(-rrid - 1));
621 ++result_count;
622 if (result_count == STANDARD_VECTOR_SIZE) {
623 // out of space!
624 return result_count;
625 }
626 }
627 }
628 ++i;
629
630 if (!NextRow()) {
631 break;
632 }
633 }
634
635 return result_count;
636}
637
638class IEJoinLocalSourceState : public LocalSourceState {
639public:
640 explicit IEJoinLocalSourceState(ClientContext &context, const PhysicalIEJoin &op)
641 : op(op), true_sel(STANDARD_VECTOR_SIZE), left_executor(context), right_executor(context),
642 left_matches(nullptr), right_matches(nullptr) {
643 auto &allocator = Allocator::Get(context);
644 if (op.conditions.size() < 3) {
645 return;
646 }
647
648 vector<LogicalType> left_types;
649 vector<LogicalType> right_types;
650 for (idx_t i = 2; i < op.conditions.size(); ++i) {
651 const auto &cond = op.conditions[i];
652
653 left_types.push_back(x: cond.left->return_type);
654 left_executor.AddExpression(expr: *cond.left);
655
656 right_types.push_back(x: cond.left->return_type);
657 right_executor.AddExpression(expr: *cond.right);
658 }
659
660 left_keys.Initialize(allocator, types: left_types);
661 right_keys.Initialize(allocator, types: right_types);
662 }
663
664 idx_t SelectOuterRows(bool *matches) {
665 idx_t count = 0;
666 for (; outer_idx < outer_count; ++outer_idx) {
667 if (!matches[outer_idx]) {
668 true_sel.set_index(idx: count++, loc: outer_idx);
669 if (count >= STANDARD_VECTOR_SIZE) {
670 outer_idx++;
671 break;
672 }
673 }
674 }
675
676 return count;
677 }
678
679 const PhysicalIEJoin &op;
680
681 // Joining
682 unique_ptr<IEJoinUnion> joiner;
683
684 idx_t left_base;
685 idx_t left_block_index;
686
687 idx_t right_base;
688 idx_t right_block_index;
689
690 // Trailing predicates
691 SelectionVector true_sel;
692
693 ExpressionExecutor left_executor;
694 DataChunk left_keys;
695
696 ExpressionExecutor right_executor;
697 DataChunk right_keys;
698
699 // Outer joins
700 idx_t outer_idx;
701 idx_t outer_count;
702 bool *left_matches;
703 bool *right_matches;
704};
705
706void PhysicalIEJoin::ResolveComplexJoin(ExecutionContext &context, DataChunk &chunk, LocalSourceState &state_p) const {
707 auto &state = state_p.Cast<IEJoinLocalSourceState>();
708 auto &ie_sink = sink_state->Cast<IEJoinGlobalState>();
709 auto &left_table = *ie_sink.tables[0];
710 auto &right_table = *ie_sink.tables[1];
711
712 const auto left_cols = children[0]->GetTypes().size();
713 do {
714 SelectionVector lsel(STANDARD_VECTOR_SIZE);
715 SelectionVector rsel(STANDARD_VECTOR_SIZE);
716 auto result_count = state.joiner->JoinComplexBlocks(lsel, rsel);
717 if (result_count == 0) {
718 // exhausted this pair
719 return;
720 }
721
722 // found matches: extract them
723 chunk.Reset();
724 SliceSortedPayload(payload&: chunk, state&: left_table.global_sort_state, block_idx: state.left_block_index, result: lsel, result_count, left_cols: 0);
725 SliceSortedPayload(payload&: chunk, state&: right_table.global_sort_state, block_idx: state.right_block_index, result: rsel, result_count,
726 left_cols);
727 chunk.SetCardinality(result_count);
728
729 auto sel = FlatVector::IncrementalSelectionVector();
730 if (conditions.size() > 2) {
731 // If there are more expressions to compute,
732 // split the result chunk into the left and right halves
733 // so we can compute the values for comparison.
734 const auto tail_cols = conditions.size() - 2;
735
736 DataChunk right_chunk;
737 chunk.Split(other&: right_chunk, split_idx: left_cols);
738 state.left_executor.SetChunk(chunk);
739 state.right_executor.SetChunk(right_chunk);
740
741 auto tail_count = result_count;
742 auto true_sel = &state.true_sel;
743 for (size_t cmp_idx = 0; cmp_idx < tail_cols; ++cmp_idx) {
744 auto &left = state.left_keys.data[cmp_idx];
745 state.left_executor.ExecuteExpression(expr_idx: cmp_idx, result&: left);
746
747 auto &right = state.right_keys.data[cmp_idx];
748 state.right_executor.ExecuteExpression(expr_idx: cmp_idx, result&: right);
749
750 if (tail_count < result_count) {
751 left.Slice(sel: *sel, count: tail_count);
752 right.Slice(sel: *sel, count: tail_count);
753 }
754 tail_count = SelectJoinTail(condition: conditions[cmp_idx + 2].comparison, left, right, sel, count: tail_count, true_sel);
755 sel = true_sel;
756 }
757 chunk.Fuse(other&: right_chunk);
758
759 if (tail_count < result_count) {
760 result_count = tail_count;
761 chunk.Slice(sel_vector: *sel, count: result_count);
762 }
763 }
764
765 // found matches: mark the found matches if required
766 if (left_table.found_match) {
767 for (idx_t i = 0; i < result_count; i++) {
768 left_table.found_match[state.left_base + lsel[sel->get_index(idx: i)]] = true;
769 }
770 }
771 if (right_table.found_match) {
772 for (idx_t i = 0; i < result_count; i++) {
773 right_table.found_match[state.right_base + rsel[sel->get_index(idx: i)]] = true;
774 }
775 }
776 chunk.Verify();
777 } while (chunk.size() == 0);
778}
779
780class IEJoinGlobalSourceState : public GlobalSourceState {
781public:
782 explicit IEJoinGlobalSourceState(const PhysicalIEJoin &op)
783 : op(op), initialized(false), next_pair(0), completed(0), left_outers(0), next_left(0), right_outers(0),
784 next_right(0) {
785 }
786
787 void Initialize(IEJoinGlobalState &sink_state) {
788 lock_guard<mutex> initializing(lock);
789 if (initialized) {
790 return;
791 }
792
793 // Compute the starting row for reach block
794 // (In theory these are all the same size, but you never know...)
795 auto &left_table = *sink_state.tables[0];
796 const auto left_blocks = left_table.BlockCount();
797 idx_t left_base = 0;
798
799 for (size_t lhs = 0; lhs < left_blocks; ++lhs) {
800 left_bases.emplace_back(args&: left_base);
801 left_base += left_table.BlockSize(i: lhs);
802 }
803
804 auto &right_table = *sink_state.tables[1];
805 const auto right_blocks = right_table.BlockCount();
806 idx_t right_base = 0;
807 for (size_t rhs = 0; rhs < right_blocks; ++rhs) {
808 right_bases.emplace_back(args&: right_base);
809 right_base += right_table.BlockSize(i: rhs);
810 }
811
812 // Outer join block counts
813 if (left_table.found_match) {
814 left_outers = left_blocks;
815 }
816
817 if (right_table.found_match) {
818 right_outers = right_blocks;
819 }
820
821 // Ready for action
822 initialized = true;
823 }
824
825public:
826 idx_t MaxThreads() override {
827 // We can't leverage any more threads than block pairs.
828 const auto &sink_state = (op.sink_state->Cast<IEJoinGlobalState>());
829 return sink_state.tables[0]->BlockCount() * sink_state.tables[1]->BlockCount();
830 }
831
832 void GetNextPair(ClientContext &client, IEJoinGlobalState &gstate, IEJoinLocalSourceState &lstate) {
833 auto &left_table = *gstate.tables[0];
834 auto &right_table = *gstate.tables[1];
835
836 const auto left_blocks = left_table.BlockCount();
837 const auto right_blocks = right_table.BlockCount();
838 const auto pair_count = left_blocks * right_blocks;
839
840 // Regular block
841 const auto i = next_pair++;
842 if (i < pair_count) {
843 const auto b1 = i / right_blocks;
844 const auto b2 = i % right_blocks;
845
846 lstate.left_block_index = b1;
847 lstate.left_base = left_bases[b1];
848
849 lstate.right_block_index = b2;
850 lstate.right_base = right_bases[b2];
851
852 lstate.joiner = make_uniq<IEJoinUnion>(args&: client, args: op, args&: left_table, args: b1, args&: right_table, args: b2);
853 return;
854 }
855
856 // Outer joins
857 if (!left_outers && !right_outers) {
858 return;
859 }
860
861 // Spin wait for regular blocks to finish(!)
862 while (completed < pair_count) {
863 std::this_thread::yield();
864 }
865
866 // Left outer blocks
867 const auto l = next_left++;
868 if (l < left_outers) {
869 lstate.joiner = nullptr;
870 lstate.left_block_index = l;
871 lstate.left_base = left_bases[l];
872
873 lstate.left_matches = left_table.found_match.get() + lstate.left_base;
874 lstate.outer_idx = 0;
875 lstate.outer_count = left_table.BlockSize(i: l);
876 return;
877 } else {
878 lstate.left_matches = nullptr;
879 }
880
881 // Right outer block
882 const auto r = next_right++;
883 if (r < right_outers) {
884 lstate.joiner = nullptr;
885 lstate.right_block_index = r;
886 lstate.right_base = right_bases[r];
887
888 lstate.right_matches = right_table.found_match.get() + lstate.right_base;
889 lstate.outer_idx = 0;
890 lstate.outer_count = right_table.BlockSize(i: r);
891 return;
892 } else {
893 lstate.right_matches = nullptr;
894 }
895 }
896
897 void PairCompleted(ClientContext &client, IEJoinGlobalState &gstate, IEJoinLocalSourceState &lstate) {
898 lstate.joiner.reset();
899 ++completed;
900 GetNextPair(client, gstate, lstate);
901 }
902
903 const PhysicalIEJoin &op;
904
905 mutex lock;
906 bool initialized;
907
908 // Join queue state
909 std::atomic<size_t> next_pair;
910 std::atomic<size_t> completed;
911
912 // Block base row number
913 vector<idx_t> left_bases;
914 vector<idx_t> right_bases;
915
916 // Outer joins
917 idx_t left_outers;
918 std::atomic<idx_t> next_left;
919
920 idx_t right_outers;
921 std::atomic<idx_t> next_right;
922};
923
924unique_ptr<GlobalSourceState> PhysicalIEJoin::GetGlobalSourceState(ClientContext &context) const {
925 return make_uniq<IEJoinGlobalSourceState>(args: *this);
926}
927
928unique_ptr<LocalSourceState> PhysicalIEJoin::GetLocalSourceState(ExecutionContext &context,
929 GlobalSourceState &gstate) const {
930 return make_uniq<IEJoinLocalSourceState>(args&: context.client, args: *this);
931}
932
933SourceResultType PhysicalIEJoin::GetData(ExecutionContext &context, DataChunk &result,
934 OperatorSourceInput &input) const {
935 auto &ie_sink = sink_state->Cast<IEJoinGlobalState>();
936 auto &ie_gstate = input.global_state.Cast<IEJoinGlobalSourceState>();
937 auto &ie_lstate = input.local_state.Cast<IEJoinLocalSourceState>();
938
939 ie_gstate.Initialize(sink_state&: ie_sink);
940
941 if (!ie_lstate.joiner && !ie_lstate.left_matches && !ie_lstate.right_matches) {
942 ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate);
943 }
944
945 // Process INNER results
946 while (ie_lstate.joiner) {
947 ResolveComplexJoin(context, chunk&: result, state_p&: ie_lstate);
948
949 if (result.size()) {
950 return SourceResultType::HAVE_MORE_OUTPUT;
951 }
952
953 ie_gstate.PairCompleted(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate);
954 }
955
956 // Process LEFT OUTER results
957 const auto left_cols = children[0]->GetTypes().size();
958 while (ie_lstate.left_matches) {
959 const idx_t count = ie_lstate.SelectOuterRows(matches: ie_lstate.left_matches);
960 if (!count) {
961 ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate);
962 continue;
963 }
964 SliceSortedPayload(payload&: result, state&: ie_sink.tables[0]->global_sort_state, block_idx: ie_lstate.left_block_index, result: ie_lstate.true_sel,
965 result_count: count);
966
967 // Fill in NULLs to the right
968 for (auto col_idx = left_cols; col_idx < result.ColumnCount(); ++col_idx) {
969 result.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR);
970 ConstantVector::SetNull(vector&: result.data[col_idx], is_null: true);
971 }
972
973 result.SetCardinality(count);
974 result.Verify();
975
976 return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
977 }
978
979 // Process RIGHT OUTER results
980 while (ie_lstate.right_matches) {
981 const idx_t count = ie_lstate.SelectOuterRows(matches: ie_lstate.right_matches);
982 if (!count) {
983 ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate);
984 continue;
985 }
986
987 SliceSortedPayload(payload&: result, state&: ie_sink.tables[1]->global_sort_state, block_idx: ie_lstate.right_block_index,
988 result: ie_lstate.true_sel, result_count: count, left_cols);
989
990 // Fill in NULLs to the left
991 for (idx_t col_idx = 0; col_idx < left_cols; ++col_idx) {
992 result.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR);
993 ConstantVector::SetNull(vector&: result.data[col_idx], is_null: true);
994 }
995
996 result.SetCardinality(count);
997 result.Verify();
998
999 break;
1000 }
1001
1002 return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
1003}
1004
1005//===--------------------------------------------------------------------===//
1006// Pipeline Construction
1007//===--------------------------------------------------------------------===//
1008void PhysicalIEJoin::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
1009 D_ASSERT(children.size() == 2);
1010 if (meta_pipeline.HasRecursiveCTE()) {
1011 throw NotImplementedException("IEJoins are not supported in recursive CTEs yet");
1012 }
1013
1014 // becomes a source after both children fully sink their data
1015 meta_pipeline.GetState().SetPipelineSource(pipeline&: current, op&: *this);
1016
1017 // Create one child meta pipeline that will hold the LHS and RHS pipelines
1018 auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this);
1019
1020 // Build out LHS
1021 auto lhs_pipeline = child_meta_pipeline.GetBasePipeline();
1022 children[0]->BuildPipelines(current&: *lhs_pipeline, meta_pipeline&: child_meta_pipeline);
1023
1024 // Build out RHS
1025 auto rhs_pipeline = child_meta_pipeline.CreatePipeline();
1026 children[1]->BuildPipelines(current&: *rhs_pipeline, meta_pipeline&: child_meta_pipeline);
1027
1028 // Despite having the same sink, RHS and everything created after it need their own (same) PipelineFinishEvent
1029 child_meta_pipeline.AddFinishEvent(pipeline: rhs_pipeline);
1030}
1031
1032} // namespace duckdb
1033