1 | #include "duckdb/execution/operator/join/physical_iejoin.hpp" |
2 | |
3 | #include "duckdb/common/operator/comparison_operators.hpp" |
4 | #include "duckdb/common/row_operations/row_operations.hpp" |
5 | #include "duckdb/common/sort/sort.hpp" |
6 | #include "duckdb/common/sort/sorted_block.hpp" |
7 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
8 | #include "duckdb/execution/expression_executor.hpp" |
9 | #include "duckdb/main/client_context.hpp" |
10 | #include "duckdb/parallel/event.hpp" |
11 | #include "duckdb/parallel/meta_pipeline.hpp" |
12 | #include "duckdb/parallel/thread_context.hpp" |
13 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
14 | |
15 | #include <thread> |
16 | |
17 | namespace duckdb { |
18 | |
19 | PhysicalIEJoin::PhysicalIEJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
20 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type, |
21 | idx_t estimated_cardinality) |
22 | : PhysicalRangeJoin(op, PhysicalOperatorType::IE_JOIN, std::move(left), std::move(right), std::move(cond), |
23 | join_type, estimated_cardinality) { |
24 | |
25 | // 1. let L1 (resp. L2) be the array of column X (resp. Y) |
26 | D_ASSERT(conditions.size() >= 2); |
27 | lhs_orders.resize(new_size: 2); |
28 | rhs_orders.resize(new_size: 2); |
29 | for (idx_t i = 0; i < 2; ++i) { |
30 | auto &cond = conditions[i]; |
31 | D_ASSERT(cond.left->return_type == cond.right->return_type); |
32 | join_key_types.push_back(x: cond.left->return_type); |
33 | |
34 | // Convert the conditions to sort orders |
35 | auto left = cond.left->Copy(); |
36 | auto right = cond.right->Copy(); |
37 | auto sense = OrderType::INVALID; |
38 | |
39 | // 2. if (op1 ∈ {>, ≥}) sort L1 in descending order |
40 | // 3. else if (op1 ∈ {<, ≤}) sort L1 in ascending order |
41 | // 4. if (op2 ∈ {>, ≥}) sort L2 in ascending order |
42 | // 5. else if (op2 ∈ {<, ≤}) sort L2 in descending order |
43 | switch (cond.comparison) { |
44 | case ExpressionType::COMPARE_GREATERTHAN: |
45 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: |
46 | sense = i ? OrderType::ASCENDING : OrderType::DESCENDING; |
47 | break; |
48 | case ExpressionType::COMPARE_LESSTHAN: |
49 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: |
50 | sense = i ? OrderType::DESCENDING : OrderType::ASCENDING; |
51 | break; |
52 | default: |
53 | throw NotImplementedException("Unimplemented join type for IEJoin" ); |
54 | } |
55 | lhs_orders[i].emplace_back(args: BoundOrderByNode(sense, OrderByNullType::NULLS_LAST, std::move(left))); |
56 | rhs_orders[i].emplace_back(args: BoundOrderByNode(sense, OrderByNullType::NULLS_LAST, std::move(right))); |
57 | } |
58 | |
59 | for (idx_t i = 2; i < conditions.size(); ++i) { |
60 | auto &cond = conditions[i]; |
61 | D_ASSERT(cond.left->return_type == cond.right->return_type); |
62 | join_key_types.push_back(x: cond.left->return_type); |
63 | } |
64 | } |
65 | |
66 | //===--------------------------------------------------------------------===// |
67 | // Sink |
68 | //===--------------------------------------------------------------------===// |
69 | class IEJoinLocalState : public LocalSinkState { |
70 | public: |
71 | using LocalSortedTable = PhysicalRangeJoin::LocalSortedTable; |
72 | |
73 | IEJoinLocalState(ClientContext &context, const PhysicalRangeJoin &op, const idx_t child) |
74 | : table(context, op, child) { |
75 | } |
76 | |
77 | //! The local sort state |
78 | LocalSortedTable table; |
79 | }; |
80 | |
81 | class IEJoinGlobalState : public GlobalSinkState { |
82 | public: |
83 | using GlobalSortedTable = PhysicalRangeJoin::GlobalSortedTable; |
84 | |
85 | public: |
86 | IEJoinGlobalState(ClientContext &context, const PhysicalIEJoin &op) : child(0) { |
87 | tables.resize(new_size: 2); |
88 | RowLayout lhs_layout; |
89 | lhs_layout.Initialize(types: op.children[0]->types); |
90 | vector<BoundOrderByNode> lhs_order; |
91 | lhs_order.emplace_back(args: op.lhs_orders[0][0].Copy()); |
92 | tables[0] = make_uniq<GlobalSortedTable>(args&: context, args&: lhs_order, args&: lhs_layout); |
93 | |
94 | RowLayout rhs_layout; |
95 | rhs_layout.Initialize(types: op.children[1]->types); |
96 | vector<BoundOrderByNode> rhs_order; |
97 | rhs_order.emplace_back(args: op.rhs_orders[0][0].Copy()); |
98 | tables[1] = make_uniq<GlobalSortedTable>(args&: context, args&: rhs_order, args&: rhs_layout); |
99 | } |
100 | |
101 | IEJoinGlobalState(IEJoinGlobalState &prev) |
102 | : GlobalSinkState(prev), tables(std::move(prev.tables)), child(prev.child + 1) { |
103 | } |
104 | |
105 | void Sink(DataChunk &input, IEJoinLocalState &lstate) { |
106 | auto &table = *tables[child]; |
107 | auto &global_sort_state = table.global_sort_state; |
108 | auto &local_sort_state = lstate.table.local_sort_state; |
109 | |
110 | // Sink the data into the local sort state |
111 | lstate.table.Sink(input, global_sort_state); |
112 | |
113 | // When sorting data reaches a certain size, we sort it |
114 | if (local_sort_state.SizeInBytes() >= table.memory_per_thread) { |
115 | local_sort_state.Sort(global_sort_state, reorder_heap: true); |
116 | } |
117 | } |
118 | |
119 | vector<unique_ptr<GlobalSortedTable>> tables; |
120 | size_t child; |
121 | }; |
122 | |
123 | unique_ptr<GlobalSinkState> PhysicalIEJoin::GetGlobalSinkState(ClientContext &context) const { |
124 | D_ASSERT(!sink_state); |
125 | return make_uniq<IEJoinGlobalState>(args&: context, args: *this); |
126 | } |
127 | |
128 | unique_ptr<LocalSinkState> PhysicalIEJoin::GetLocalSinkState(ExecutionContext &context) const { |
129 | idx_t sink_child = 0; |
130 | if (sink_state) { |
131 | const auto &ie_sink = sink_state->Cast<IEJoinGlobalState>(); |
132 | sink_child = ie_sink.child; |
133 | } |
134 | return make_uniq<IEJoinLocalState>(args&: context.client, args: *this, args&: sink_child); |
135 | } |
136 | |
137 | SinkResultType PhysicalIEJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
138 | auto &gstate = input.global_state.Cast<IEJoinGlobalState>(); |
139 | auto &lstate = input.local_state.Cast<IEJoinLocalState>(); |
140 | |
141 | gstate.Sink(input&: chunk, lstate); |
142 | |
143 | return SinkResultType::NEED_MORE_INPUT; |
144 | } |
145 | |
146 | void PhysicalIEJoin::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
147 | auto &gstate = gstate_p.Cast<IEJoinGlobalState>(); |
148 | auto &lstate = lstate_p.Cast<IEJoinLocalState>(); |
149 | gstate.tables[gstate.child]->Combine(ltable&: lstate.table); |
150 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
151 | |
152 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.table.executor, name: gstate.child ? "rhs_executor" : "lhs_executor" , id: 1); |
153 | client_profiler.Flush(profiler&: context.thread.profiler); |
154 | } |
155 | |
156 | //===--------------------------------------------------------------------===// |
157 | // Finalize |
158 | //===--------------------------------------------------------------------===// |
159 | SinkFinalizeType PhysicalIEJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
160 | GlobalSinkState &gstate_p) const { |
161 | auto &gstate = gstate_p.Cast<IEJoinGlobalState>(); |
162 | auto &table = *gstate.tables[gstate.child]; |
163 | auto &global_sort_state = table.global_sort_state; |
164 | |
165 | if ((gstate.child == 1 && IsRightOuterJoin(type: join_type)) || (gstate.child == 0 && IsLeftOuterJoin(type: join_type))) { |
166 | // for FULL/LEFT/RIGHT OUTER JOIN, initialize found_match to false for every tuple |
167 | table.IntializeMatches(); |
168 | } |
169 | if (gstate.child == 1 && global_sort_state.sorted_blocks.empty() && EmptyResultIfRHSIsEmpty()) { |
170 | // Empty input! |
171 | return SinkFinalizeType::NO_OUTPUT_POSSIBLE; |
172 | } |
173 | |
174 | // Sort the current input child |
175 | table.Finalize(pipeline, event); |
176 | |
177 | // Move to the next input child |
178 | ++gstate.child; |
179 | |
180 | return SinkFinalizeType::READY; |
181 | } |
182 | |
183 | //===--------------------------------------------------------------------===// |
184 | // Operator |
185 | //===--------------------------------------------------------------------===// |
186 | OperatorResultType PhysicalIEJoin::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
187 | GlobalOperatorState &gstate, OperatorState &state) const { |
188 | return OperatorResultType::FINISHED; |
189 | } |
190 | |
191 | //===--------------------------------------------------------------------===// |
192 | // Source |
193 | //===--------------------------------------------------------------------===// |
194 | struct IEJoinUnion { |
195 | using SortedTable = PhysicalRangeJoin::GlobalSortedTable; |
196 | |
197 | static idx_t AppendKey(SortedTable &table, ExpressionExecutor &executor, SortedTable &marked, int64_t increment, |
198 | int64_t base, const idx_t block_idx); |
199 | |
200 | static void Sort(SortedTable &table) { |
201 | auto &global_sort_state = table.global_sort_state; |
202 | global_sort_state.PrepareMergePhase(); |
203 | while (global_sort_state.sorted_blocks.size() > 1) { |
204 | global_sort_state.InitializeMergeRound(); |
205 | MergeSorter merge_sorter(global_sort_state, global_sort_state.buffer_manager); |
206 | merge_sorter.PerformInMergeRound(); |
207 | global_sort_state.CompleteMergeRound(keep_radix_data: true); |
208 | } |
209 | } |
210 | |
211 | template <typename T> |
212 | static vector<T> ExtractColumn(SortedTable &table, idx_t col_idx) { |
213 | vector<T> result; |
214 | result.reserve(table.count); |
215 | |
216 | auto &gstate = table.global_sort_state; |
217 | auto &blocks = *gstate.sorted_blocks[0]->payload_data; |
218 | PayloadScanner scanner(blocks, gstate, false); |
219 | |
220 | DataChunk payload; |
221 | payload.Initialize(allocator&: Allocator::DefaultAllocator(), types: gstate.payload_layout.GetTypes()); |
222 | for (;;) { |
223 | scanner.Scan(chunk&: payload); |
224 | const auto count = payload.size(); |
225 | if (!count) { |
226 | break; |
227 | } |
228 | |
229 | const auto data_ptr = FlatVector::GetData<T>(payload.data[col_idx]); |
230 | result.insert(result.end(), data_ptr, data_ptr + count); |
231 | } |
232 | |
233 | return result; |
234 | } |
235 | |
236 | IEJoinUnion(ClientContext &context, const PhysicalIEJoin &op, SortedTable &t1, const idx_t b1, SortedTable &t2, |
237 | const idx_t b2); |
238 | |
239 | idx_t SearchL1(idx_t pos); |
240 | bool NextRow(); |
241 | |
242 | //! Inverted loop |
243 | idx_t JoinComplexBlocks(SelectionVector &lsel, SelectionVector &rsel); |
244 | |
245 | //! L1 |
246 | unique_ptr<SortedTable> l1; |
247 | //! L2 |
248 | unique_ptr<SortedTable> l2; |
249 | |
250 | //! Li |
251 | vector<int64_t> li; |
252 | //! P |
253 | vector<idx_t> p; |
254 | |
255 | //! B |
256 | vector<validity_t> bit_array; |
257 | ValidityMask bit_mask; |
258 | |
259 | //! Bloom Filter |
260 | static constexpr idx_t BLOOM_CHUNK_BITS = 1024; |
261 | idx_t bloom_count; |
262 | vector<validity_t> bloom_array; |
263 | ValidityMask bloom_filter; |
264 | |
265 | //! Iteration state |
266 | idx_t n; |
267 | idx_t i; |
268 | idx_t j; |
269 | unique_ptr<SBIterator> op1; |
270 | unique_ptr<SBIterator> off1; |
271 | unique_ptr<SBIterator> op2; |
272 | unique_ptr<SBIterator> off2; |
273 | int64_t lrid; |
274 | }; |
275 | |
276 | idx_t IEJoinUnion::AppendKey(SortedTable &table, ExpressionExecutor &executor, SortedTable &marked, int64_t increment, |
277 | int64_t base, const idx_t block_idx) { |
278 | LocalSortState local_sort_state; |
279 | local_sort_state.Initialize(global_sort_state&: marked.global_sort_state, buffer_manager_p&: marked.global_sort_state.buffer_manager); |
280 | |
281 | // Reading |
282 | const auto valid = table.count - table.has_null; |
283 | auto &gstate = table.global_sort_state; |
284 | PayloadScanner scanner(gstate, block_idx); |
285 | auto table_idx = block_idx * gstate.block_capacity; |
286 | |
287 | DataChunk scanned; |
288 | scanned.Initialize(allocator&: Allocator::DefaultAllocator(), types: scanner.GetPayloadTypes()); |
289 | |
290 | // Writing |
291 | auto types = local_sort_state.sort_layout->logical_types; |
292 | const idx_t payload_idx = types.size(); |
293 | |
294 | const auto &payload_types = local_sort_state.payload_layout->GetTypes(); |
295 | types.insert(position: types.end(), first: payload_types.begin(), last: payload_types.end()); |
296 | const idx_t rid_idx = types.size() - 1; |
297 | |
298 | DataChunk keys; |
299 | DataChunk payload; |
300 | keys.Initialize(allocator&: Allocator::DefaultAllocator(), types); |
301 | |
302 | idx_t inserted = 0; |
303 | for (auto rid = base; table_idx < valid;) { |
304 | scanner.Scan(chunk&: scanned); |
305 | |
306 | // NULLs are at the end, so stop when we reach them |
307 | auto scan_count = scanned.size(); |
308 | if (table_idx + scan_count > valid) { |
309 | scan_count = valid - table_idx; |
310 | scanned.SetCardinality(scan_count); |
311 | } |
312 | if (scan_count == 0) { |
313 | break; |
314 | } |
315 | table_idx += scan_count; |
316 | |
317 | // Compute the input columns from the payload |
318 | keys.Reset(); |
319 | keys.Split(other&: payload, split_idx: rid_idx); |
320 | executor.Execute(input&: scanned, result&: keys); |
321 | |
322 | // Mark the rid column |
323 | payload.data[0].Sequence(start: rid, increment, count: scan_count); |
324 | payload.SetCardinality(scan_count); |
325 | keys.Fuse(other&: payload); |
326 | rid += increment * scan_count; |
327 | |
328 | // Sort on the sort columns (which will no longer be needed) |
329 | keys.Split(other&: payload, split_idx: payload_idx); |
330 | local_sort_state.SinkChunk(sort&: keys, payload); |
331 | inserted += scan_count; |
332 | keys.Fuse(other&: payload); |
333 | |
334 | // Flush when we have enough data |
335 | if (local_sort_state.SizeInBytes() >= marked.memory_per_thread) { |
336 | local_sort_state.Sort(global_sort_state&: marked.global_sort_state, reorder_heap: true); |
337 | } |
338 | } |
339 | marked.global_sort_state.AddLocalState(local_sort_state); |
340 | marked.count += inserted; |
341 | |
342 | return inserted; |
343 | } |
344 | |
345 | IEJoinUnion::IEJoinUnion(ClientContext &context, const PhysicalIEJoin &op, SortedTable &t1, const idx_t b1, |
346 | SortedTable &t2, const idx_t b2) |
347 | : n(0), i(0) { |
348 | // input : query Q with 2 join predicates t1.X op1 t2.X' and t1.Y op2 t2.Y', tables T, T' of sizes m and n resp. |
349 | // output: a list of tuple pairs (ti , tj) |
350 | // Note that T/T' are already sorted on X/X' and contain the payload data |
351 | // We only join the two block numbers and use the sizes of the blocks as the counts |
352 | |
353 | // 0. Filter out tables with no overlap |
354 | if (!t1.BlockSize(i: b1) || !t2.BlockSize(i: b2)) { |
355 | return; |
356 | } |
357 | |
358 | const auto &cmp1 = op.conditions[0].comparison; |
359 | SBIterator bounds1(t1.global_sort_state, cmp1); |
360 | SBIterator bounds2(t2.global_sort_state, cmp1); |
361 | |
362 | // t1.X[0] op1 t2.X'[-1] |
363 | bounds1.SetIndex(bounds1.block_capacity * b1); |
364 | bounds2.SetIndex(bounds2.block_capacity * b2 + t2.BlockSize(i: b2) - 1); |
365 | if (!bounds1.Compare(other: bounds2)) { |
366 | return; |
367 | } |
368 | |
369 | // 1. let L1 (resp. L2) be the array of column X (resp. Y ) |
370 | const auto &order1 = op.lhs_orders[0][0]; |
371 | const auto &order2 = op.lhs_orders[1][0]; |
372 | |
373 | // 2. if (op1 ∈ {>, ≥}) sort L1 in descending order |
374 | // 3. else if (op1 ∈ {<, ≤}) sort L1 in ascending order |
375 | |
376 | // For the union algorithm, we make a unified table with the keys and the rids as the payload: |
377 | // X/X', Y/Y', R/R'/Li |
378 | // The first position is the sort key. |
379 | vector<LogicalType> types; |
380 | types.emplace_back(args&: order2.expression->return_type); |
381 | types.emplace_back(args: LogicalType::BIGINT); |
382 | RowLayout payload_layout; |
383 | payload_layout.Initialize(types); |
384 | |
385 | // Sort on the first expression |
386 | auto ref = make_uniq<BoundReferenceExpression>(args&: order1.expression->return_type, args: 0); |
387 | vector<BoundOrderByNode> orders; |
388 | orders.emplace_back(args: order1.type, args: order1.null_order, args: std::move(ref)); |
389 | |
390 | l1 = make_uniq<SortedTable>(args&: context, args&: orders, args&: payload_layout); |
391 | |
392 | // LHS has positive rids |
393 | ExpressionExecutor l_executor(context); |
394 | l_executor.AddExpression(expr: *order1.expression); |
395 | l_executor.AddExpression(expr: *order2.expression); |
396 | AppendKey(table&: t1, executor&: l_executor, marked&: *l1, increment: 1, base: 1, block_idx: b1); |
397 | |
398 | // RHS has negative rids |
399 | ExpressionExecutor r_executor(context); |
400 | r_executor.AddExpression(expr: *op.rhs_orders[0][0].expression); |
401 | r_executor.AddExpression(expr: *op.rhs_orders[1][0].expression); |
402 | AppendKey(table&: t2, executor&: r_executor, marked&: *l1, increment: -1, base: -1, block_idx: b2); |
403 | |
404 | if (l1->global_sort_state.sorted_blocks.empty()) { |
405 | return; |
406 | } |
407 | |
408 | Sort(table&: *l1); |
409 | |
410 | op1 = make_uniq<SBIterator>(args&: l1->global_sort_state, args: cmp1); |
411 | off1 = make_uniq<SBIterator>(args&: l1->global_sort_state, args: cmp1); |
412 | |
413 | // We don't actually need the L1 column, just its sort key, which is in the sort blocks |
414 | li = ExtractColumn<int64_t>(table&: *l1, col_idx: types.size() - 1); |
415 | |
416 | // 4. if (op2 ∈ {>, ≥}) sort L2 in ascending order |
417 | // 5. else if (op2 ∈ {<, ≤}) sort L2 in descending order |
418 | |
419 | // We sort on Y/Y' to obtain the sort keys and the permutation array. |
420 | // For this we just need a two-column table of Y, P |
421 | types.clear(); |
422 | types.emplace_back(args: LogicalType::BIGINT); |
423 | payload_layout.Initialize(types); |
424 | |
425 | // Sort on the first expression |
426 | orders.clear(); |
427 | ref = make_uniq<BoundReferenceExpression>(args&: order2.expression->return_type, args: 0); |
428 | orders.emplace_back(args: order2.type, args: order2.null_order, args: std::move(ref)); |
429 | |
430 | ExpressionExecutor executor(context); |
431 | executor.AddExpression(expr: *orders[0].expression); |
432 | |
433 | l2 = make_uniq<SortedTable>(args&: context, args&: orders, args&: payload_layout); |
434 | for (idx_t base = 0, block_idx = 0; block_idx < l1->BlockCount(); ++block_idx) { |
435 | base += AppendKey(table&: *l1, executor, marked&: *l2, increment: 1, base, block_idx); |
436 | } |
437 | |
438 | Sort(table&: *l2); |
439 | |
440 | // We don't actually need the L2 column, just its sort key, which is in the sort blocks |
441 | |
442 | // 6. compute the permutation array P of L2 w.r.t. L1 |
443 | p = ExtractColumn<idx_t>(table&: *l2, col_idx: types.size() - 1); |
444 | |
445 | // 7. initialize bit-array B (|B| = n), and set all bits to 0 |
446 | n = l2->count.load(); |
447 | bit_array.resize(new_size: ValidityMask::EntryCount(count: n), x: 0); |
448 | bit_mask.Initialize(validity: bit_array.data()); |
449 | |
450 | // Bloom filter |
451 | bloom_count = (n + (BLOOM_CHUNK_BITS - 1)) / BLOOM_CHUNK_BITS; |
452 | bloom_array.resize(new_size: ValidityMask::EntryCount(count: bloom_count), x: 0); |
453 | bloom_filter.Initialize(validity: bloom_array.data()); |
454 | |
455 | // 11. for(i←1 to n) do |
456 | const auto &cmp2 = op.conditions[1].comparison; |
457 | op2 = make_uniq<SBIterator>(args&: l2->global_sort_state, args: cmp2); |
458 | off2 = make_uniq<SBIterator>(args&: l2->global_sort_state, args: cmp2); |
459 | i = 0; |
460 | j = 0; |
461 | (void)NextRow(); |
462 | } |
463 | |
464 | idx_t IEJoinUnion::SearchL1(idx_t pos) { |
465 | // Perform an exponential search in the appropriate direction |
466 | op1->SetIndex(pos); |
467 | |
468 | idx_t step = 1; |
469 | auto hi = pos; |
470 | auto lo = pos; |
471 | if (!op1->cmp) { |
472 | // Scan left for loose inequality |
473 | lo -= MinValue(a: step, b: lo); |
474 | step *= 2; |
475 | off1->SetIndex(lo); |
476 | while (lo > 0 && op1->Compare(other: *off1)) { |
477 | hi = lo; |
478 | lo -= MinValue(a: step, b: lo); |
479 | step *= 2; |
480 | off1->SetIndex(lo); |
481 | } |
482 | } else { |
483 | // Scan right for strict inequality |
484 | hi += MinValue(a: step, b: n - hi); |
485 | step *= 2; |
486 | off1->SetIndex(hi); |
487 | while (hi < n && !op1->Compare(other: *off1)) { |
488 | lo = hi; |
489 | hi += MinValue(a: step, b: n - hi); |
490 | step *= 2; |
491 | off1->SetIndex(hi); |
492 | } |
493 | } |
494 | |
495 | // Binary search the target area |
496 | while (lo < hi) { |
497 | const auto mid = lo + (hi - lo) / 2; |
498 | off1->SetIndex(mid); |
499 | if (op1->Compare(other: *off1)) { |
500 | hi = mid; |
501 | } else { |
502 | lo = mid + 1; |
503 | } |
504 | } |
505 | |
506 | off1->SetIndex(lo); |
507 | |
508 | return lo; |
509 | } |
510 | |
511 | bool IEJoinUnion::NextRow() { |
512 | for (; i < n; ++i) { |
513 | // 12. pos ← P[i] |
514 | auto pos = p[i]; |
515 | lrid = li[pos]; |
516 | if (lrid < 0) { |
517 | continue; |
518 | } |
519 | |
520 | // 16. B[pos] ← 1 |
521 | op2->SetIndex(i); |
522 | for (; off2->GetIndex() < n; ++(*off2)) { |
523 | if (!off2->Compare(other: *op2)) { |
524 | break; |
525 | } |
526 | const auto p2 = p[off2->GetIndex()]; |
527 | if (li[p2] < 0) { |
528 | // Only mark rhs matches. |
529 | bit_mask.SetValid(p2); |
530 | bloom_filter.SetValid(p2 / BLOOM_CHUNK_BITS); |
531 | } |
532 | } |
533 | |
534 | // 9. if (op1 ∈ {≤,≥} and op2 ∈ {≤,≥}) eqOff = 0 |
535 | // 10. else eqOff = 1 |
536 | // No, because there could be more than one equal value. |
537 | // Find the leftmost off1 where L1[pos] op1 L1[off1..n] |
538 | // These are the rows that satisfy the op1 condition |
539 | // and that is where we should start scanning B from |
540 | j = SearchL1(pos); |
541 | |
542 | return true; |
543 | } |
544 | return false; |
545 | } |
546 | |
547 | static idx_t NextValid(const ValidityMask &bits, idx_t j, const idx_t n) { |
548 | if (j >= n) { |
549 | return n; |
550 | } |
551 | |
552 | // We can do a first approximation by checking entries one at a time |
553 | // which gives 64:1. |
554 | idx_t entry_idx, idx_in_entry; |
555 | bits.GetEntryIndex(row_idx: j, entry_idx, idx_in_entry); |
556 | auto entry = bits.GetValidityEntry(entry_idx: entry_idx++); |
557 | |
558 | // Trim the bits before the start position |
559 | entry &= (ValidityMask::ValidityBuffer::MAX_ENTRY << idx_in_entry); |
560 | |
561 | // Check the non-ragged entries |
562 | for (const auto entry_count = bits.EntryCount(count: n); entry_idx < entry_count; ++entry_idx) { |
563 | if (entry) { |
564 | for (; idx_in_entry < bits.BITS_PER_VALUE; ++idx_in_entry, ++j) { |
565 | if (bits.RowIsValid(entry, idx_in_entry)) { |
566 | return j; |
567 | } |
568 | } |
569 | } else { |
570 | j += bits.BITS_PER_VALUE - idx_in_entry; |
571 | } |
572 | |
573 | entry = bits.GetValidityEntry(entry_idx); |
574 | idx_in_entry = 0; |
575 | } |
576 | |
577 | // Check the final entry |
578 | for (; j < n; ++idx_in_entry, ++j) { |
579 | if (bits.RowIsValid(entry, idx_in_entry)) { |
580 | return j; |
581 | } |
582 | } |
583 | |
584 | return j; |
585 | } |
586 | |
587 | idx_t IEJoinUnion::JoinComplexBlocks(SelectionVector &lsel, SelectionVector &rsel) { |
588 | // 8. initialize join result as an empty list for tuple pairs |
589 | idx_t result_count = 0; |
590 | |
591 | // 11. for(i←1 to n) do |
592 | while (i < n) { |
593 | // 13. for (j ← pos+eqOff to n) do |
594 | for (;;) { |
595 | // 14. if B[j] = 1 then |
596 | |
597 | // Use the Bloom filter to find candidate blocks |
598 | while (j < n) { |
599 | auto bloom_begin = NextValid(bits: bloom_filter, j: j / BLOOM_CHUNK_BITS, n: bloom_count) * BLOOM_CHUNK_BITS; |
600 | auto bloom_end = MinValue<idx_t>(a: n, b: bloom_begin + BLOOM_CHUNK_BITS); |
601 | |
602 | j = MaxValue<idx_t>(a: j, b: bloom_begin); |
603 | j = NextValid(bits: bit_mask, j, n: bloom_end); |
604 | if (j < bloom_end) { |
605 | break; |
606 | } |
607 | } |
608 | |
609 | if (j >= n) { |
610 | break; |
611 | } |
612 | |
613 | // Filter out tuples with the same sign (they come from the same table) |
614 | const auto rrid = li[j]; |
615 | ++j; |
616 | |
617 | // 15. add tuples w.r.t. (L1[j], L1[i]) to join result |
618 | if (lrid > 0 && rrid < 0) { |
619 | lsel.set_index(idx: result_count, loc: sel_t(+lrid - 1)); |
620 | rsel.set_index(idx: result_count, loc: sel_t(-rrid - 1)); |
621 | ++result_count; |
622 | if (result_count == STANDARD_VECTOR_SIZE) { |
623 | // out of space! |
624 | return result_count; |
625 | } |
626 | } |
627 | } |
628 | ++i; |
629 | |
630 | if (!NextRow()) { |
631 | break; |
632 | } |
633 | } |
634 | |
635 | return result_count; |
636 | } |
637 | |
638 | class IEJoinLocalSourceState : public LocalSourceState { |
639 | public: |
640 | explicit IEJoinLocalSourceState(ClientContext &context, const PhysicalIEJoin &op) |
641 | : op(op), true_sel(STANDARD_VECTOR_SIZE), left_executor(context), right_executor(context), |
642 | left_matches(nullptr), right_matches(nullptr) { |
643 | auto &allocator = Allocator::Get(context); |
644 | if (op.conditions.size() < 3) { |
645 | return; |
646 | } |
647 | |
648 | vector<LogicalType> left_types; |
649 | vector<LogicalType> right_types; |
650 | for (idx_t i = 2; i < op.conditions.size(); ++i) { |
651 | const auto &cond = op.conditions[i]; |
652 | |
653 | left_types.push_back(x: cond.left->return_type); |
654 | left_executor.AddExpression(expr: *cond.left); |
655 | |
656 | right_types.push_back(x: cond.left->return_type); |
657 | right_executor.AddExpression(expr: *cond.right); |
658 | } |
659 | |
660 | left_keys.Initialize(allocator, types: left_types); |
661 | right_keys.Initialize(allocator, types: right_types); |
662 | } |
663 | |
664 | idx_t SelectOuterRows(bool *matches) { |
665 | idx_t count = 0; |
666 | for (; outer_idx < outer_count; ++outer_idx) { |
667 | if (!matches[outer_idx]) { |
668 | true_sel.set_index(idx: count++, loc: outer_idx); |
669 | if (count >= STANDARD_VECTOR_SIZE) { |
670 | outer_idx++; |
671 | break; |
672 | } |
673 | } |
674 | } |
675 | |
676 | return count; |
677 | } |
678 | |
679 | const PhysicalIEJoin &op; |
680 | |
681 | // Joining |
682 | unique_ptr<IEJoinUnion> joiner; |
683 | |
684 | idx_t left_base; |
685 | idx_t left_block_index; |
686 | |
687 | idx_t right_base; |
688 | idx_t right_block_index; |
689 | |
690 | // Trailing predicates |
691 | SelectionVector true_sel; |
692 | |
693 | ExpressionExecutor left_executor; |
694 | DataChunk left_keys; |
695 | |
696 | ExpressionExecutor right_executor; |
697 | DataChunk right_keys; |
698 | |
699 | // Outer joins |
700 | idx_t outer_idx; |
701 | idx_t outer_count; |
702 | bool *left_matches; |
703 | bool *right_matches; |
704 | }; |
705 | |
706 | void PhysicalIEJoin::ResolveComplexJoin(ExecutionContext &context, DataChunk &chunk, LocalSourceState &state_p) const { |
707 | auto &state = state_p.Cast<IEJoinLocalSourceState>(); |
708 | auto &ie_sink = sink_state->Cast<IEJoinGlobalState>(); |
709 | auto &left_table = *ie_sink.tables[0]; |
710 | auto &right_table = *ie_sink.tables[1]; |
711 | |
712 | const auto left_cols = children[0]->GetTypes().size(); |
713 | do { |
714 | SelectionVector lsel(STANDARD_VECTOR_SIZE); |
715 | SelectionVector rsel(STANDARD_VECTOR_SIZE); |
716 | auto result_count = state.joiner->JoinComplexBlocks(lsel, rsel); |
717 | if (result_count == 0) { |
718 | // exhausted this pair |
719 | return; |
720 | } |
721 | |
722 | // found matches: extract them |
723 | chunk.Reset(); |
724 | SliceSortedPayload(payload&: chunk, state&: left_table.global_sort_state, block_idx: state.left_block_index, result: lsel, result_count, left_cols: 0); |
725 | SliceSortedPayload(payload&: chunk, state&: right_table.global_sort_state, block_idx: state.right_block_index, result: rsel, result_count, |
726 | left_cols); |
727 | chunk.SetCardinality(result_count); |
728 | |
729 | auto sel = FlatVector::IncrementalSelectionVector(); |
730 | if (conditions.size() > 2) { |
731 | // If there are more expressions to compute, |
732 | // split the result chunk into the left and right halves |
733 | // so we can compute the values for comparison. |
734 | const auto tail_cols = conditions.size() - 2; |
735 | |
736 | DataChunk right_chunk; |
737 | chunk.Split(other&: right_chunk, split_idx: left_cols); |
738 | state.left_executor.SetChunk(chunk); |
739 | state.right_executor.SetChunk(right_chunk); |
740 | |
741 | auto tail_count = result_count; |
742 | auto true_sel = &state.true_sel; |
743 | for (size_t cmp_idx = 0; cmp_idx < tail_cols; ++cmp_idx) { |
744 | auto &left = state.left_keys.data[cmp_idx]; |
745 | state.left_executor.ExecuteExpression(expr_idx: cmp_idx, result&: left); |
746 | |
747 | auto &right = state.right_keys.data[cmp_idx]; |
748 | state.right_executor.ExecuteExpression(expr_idx: cmp_idx, result&: right); |
749 | |
750 | if (tail_count < result_count) { |
751 | left.Slice(sel: *sel, count: tail_count); |
752 | right.Slice(sel: *sel, count: tail_count); |
753 | } |
754 | tail_count = SelectJoinTail(condition: conditions[cmp_idx + 2].comparison, left, right, sel, count: tail_count, true_sel); |
755 | sel = true_sel; |
756 | } |
757 | chunk.Fuse(other&: right_chunk); |
758 | |
759 | if (tail_count < result_count) { |
760 | result_count = tail_count; |
761 | chunk.Slice(sel_vector: *sel, count: result_count); |
762 | } |
763 | } |
764 | |
765 | // found matches: mark the found matches if required |
766 | if (left_table.found_match) { |
767 | for (idx_t i = 0; i < result_count; i++) { |
768 | left_table.found_match[state.left_base + lsel[sel->get_index(idx: i)]] = true; |
769 | } |
770 | } |
771 | if (right_table.found_match) { |
772 | for (idx_t i = 0; i < result_count; i++) { |
773 | right_table.found_match[state.right_base + rsel[sel->get_index(idx: i)]] = true; |
774 | } |
775 | } |
776 | chunk.Verify(); |
777 | } while (chunk.size() == 0); |
778 | } |
779 | |
780 | class IEJoinGlobalSourceState : public GlobalSourceState { |
781 | public: |
782 | explicit IEJoinGlobalSourceState(const PhysicalIEJoin &op) |
783 | : op(op), initialized(false), next_pair(0), completed(0), left_outers(0), next_left(0), right_outers(0), |
784 | next_right(0) { |
785 | } |
786 | |
787 | void Initialize(IEJoinGlobalState &sink_state) { |
788 | lock_guard<mutex> initializing(lock); |
789 | if (initialized) { |
790 | return; |
791 | } |
792 | |
793 | // Compute the starting row for reach block |
794 | // (In theory these are all the same size, but you never know...) |
795 | auto &left_table = *sink_state.tables[0]; |
796 | const auto left_blocks = left_table.BlockCount(); |
797 | idx_t left_base = 0; |
798 | |
799 | for (size_t lhs = 0; lhs < left_blocks; ++lhs) { |
800 | left_bases.emplace_back(args&: left_base); |
801 | left_base += left_table.BlockSize(i: lhs); |
802 | } |
803 | |
804 | auto &right_table = *sink_state.tables[1]; |
805 | const auto right_blocks = right_table.BlockCount(); |
806 | idx_t right_base = 0; |
807 | for (size_t rhs = 0; rhs < right_blocks; ++rhs) { |
808 | right_bases.emplace_back(args&: right_base); |
809 | right_base += right_table.BlockSize(i: rhs); |
810 | } |
811 | |
812 | // Outer join block counts |
813 | if (left_table.found_match) { |
814 | left_outers = left_blocks; |
815 | } |
816 | |
817 | if (right_table.found_match) { |
818 | right_outers = right_blocks; |
819 | } |
820 | |
821 | // Ready for action |
822 | initialized = true; |
823 | } |
824 | |
825 | public: |
826 | idx_t MaxThreads() override { |
827 | // We can't leverage any more threads than block pairs. |
828 | const auto &sink_state = (op.sink_state->Cast<IEJoinGlobalState>()); |
829 | return sink_state.tables[0]->BlockCount() * sink_state.tables[1]->BlockCount(); |
830 | } |
831 | |
832 | void GetNextPair(ClientContext &client, IEJoinGlobalState &gstate, IEJoinLocalSourceState &lstate) { |
833 | auto &left_table = *gstate.tables[0]; |
834 | auto &right_table = *gstate.tables[1]; |
835 | |
836 | const auto left_blocks = left_table.BlockCount(); |
837 | const auto right_blocks = right_table.BlockCount(); |
838 | const auto pair_count = left_blocks * right_blocks; |
839 | |
840 | // Regular block |
841 | const auto i = next_pair++; |
842 | if (i < pair_count) { |
843 | const auto b1 = i / right_blocks; |
844 | const auto b2 = i % right_blocks; |
845 | |
846 | lstate.left_block_index = b1; |
847 | lstate.left_base = left_bases[b1]; |
848 | |
849 | lstate.right_block_index = b2; |
850 | lstate.right_base = right_bases[b2]; |
851 | |
852 | lstate.joiner = make_uniq<IEJoinUnion>(args&: client, args: op, args&: left_table, args: b1, args&: right_table, args: b2); |
853 | return; |
854 | } |
855 | |
856 | // Outer joins |
857 | if (!left_outers && !right_outers) { |
858 | return; |
859 | } |
860 | |
861 | // Spin wait for regular blocks to finish(!) |
862 | while (completed < pair_count) { |
863 | std::this_thread::yield(); |
864 | } |
865 | |
866 | // Left outer blocks |
867 | const auto l = next_left++; |
868 | if (l < left_outers) { |
869 | lstate.joiner = nullptr; |
870 | lstate.left_block_index = l; |
871 | lstate.left_base = left_bases[l]; |
872 | |
873 | lstate.left_matches = left_table.found_match.get() + lstate.left_base; |
874 | lstate.outer_idx = 0; |
875 | lstate.outer_count = left_table.BlockSize(i: l); |
876 | return; |
877 | } else { |
878 | lstate.left_matches = nullptr; |
879 | } |
880 | |
881 | // Right outer block |
882 | const auto r = next_right++; |
883 | if (r < right_outers) { |
884 | lstate.joiner = nullptr; |
885 | lstate.right_block_index = r; |
886 | lstate.right_base = right_bases[r]; |
887 | |
888 | lstate.right_matches = right_table.found_match.get() + lstate.right_base; |
889 | lstate.outer_idx = 0; |
890 | lstate.outer_count = right_table.BlockSize(i: r); |
891 | return; |
892 | } else { |
893 | lstate.right_matches = nullptr; |
894 | } |
895 | } |
896 | |
897 | void PairCompleted(ClientContext &client, IEJoinGlobalState &gstate, IEJoinLocalSourceState &lstate) { |
898 | lstate.joiner.reset(); |
899 | ++completed; |
900 | GetNextPair(client, gstate, lstate); |
901 | } |
902 | |
903 | const PhysicalIEJoin &op; |
904 | |
905 | mutex lock; |
906 | bool initialized; |
907 | |
908 | // Join queue state |
909 | std::atomic<size_t> next_pair; |
910 | std::atomic<size_t> completed; |
911 | |
912 | // Block base row number |
913 | vector<idx_t> left_bases; |
914 | vector<idx_t> right_bases; |
915 | |
916 | // Outer joins |
917 | idx_t left_outers; |
918 | std::atomic<idx_t> next_left; |
919 | |
920 | idx_t right_outers; |
921 | std::atomic<idx_t> next_right; |
922 | }; |
923 | |
924 | unique_ptr<GlobalSourceState> PhysicalIEJoin::GetGlobalSourceState(ClientContext &context) const { |
925 | return make_uniq<IEJoinGlobalSourceState>(args: *this); |
926 | } |
927 | |
928 | unique_ptr<LocalSourceState> PhysicalIEJoin::GetLocalSourceState(ExecutionContext &context, |
929 | GlobalSourceState &gstate) const { |
930 | return make_uniq<IEJoinLocalSourceState>(args&: context.client, args: *this); |
931 | } |
932 | |
933 | SourceResultType PhysicalIEJoin::GetData(ExecutionContext &context, DataChunk &result, |
934 | OperatorSourceInput &input) const { |
935 | auto &ie_sink = sink_state->Cast<IEJoinGlobalState>(); |
936 | auto &ie_gstate = input.global_state.Cast<IEJoinGlobalSourceState>(); |
937 | auto &ie_lstate = input.local_state.Cast<IEJoinLocalSourceState>(); |
938 | |
939 | ie_gstate.Initialize(sink_state&: ie_sink); |
940 | |
941 | if (!ie_lstate.joiner && !ie_lstate.left_matches && !ie_lstate.right_matches) { |
942 | ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
943 | } |
944 | |
945 | // Process INNER results |
946 | while (ie_lstate.joiner) { |
947 | ResolveComplexJoin(context, chunk&: result, state_p&: ie_lstate); |
948 | |
949 | if (result.size()) { |
950 | return SourceResultType::HAVE_MORE_OUTPUT; |
951 | } |
952 | |
953 | ie_gstate.PairCompleted(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
954 | } |
955 | |
956 | // Process LEFT OUTER results |
957 | const auto left_cols = children[0]->GetTypes().size(); |
958 | while (ie_lstate.left_matches) { |
959 | const idx_t count = ie_lstate.SelectOuterRows(matches: ie_lstate.left_matches); |
960 | if (!count) { |
961 | ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
962 | continue; |
963 | } |
964 | SliceSortedPayload(payload&: result, state&: ie_sink.tables[0]->global_sort_state, block_idx: ie_lstate.left_block_index, result: ie_lstate.true_sel, |
965 | result_count: count); |
966 | |
967 | // Fill in NULLs to the right |
968 | for (auto col_idx = left_cols; col_idx < result.ColumnCount(); ++col_idx) { |
969 | result.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR); |
970 | ConstantVector::SetNull(vector&: result.data[col_idx], is_null: true); |
971 | } |
972 | |
973 | result.SetCardinality(count); |
974 | result.Verify(); |
975 | |
976 | return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
977 | } |
978 | |
979 | // Process RIGHT OUTER results |
980 | while (ie_lstate.right_matches) { |
981 | const idx_t count = ie_lstate.SelectOuterRows(matches: ie_lstate.right_matches); |
982 | if (!count) { |
983 | ie_gstate.GetNextPair(client&: context.client, gstate&: ie_sink, lstate&: ie_lstate); |
984 | continue; |
985 | } |
986 | |
987 | SliceSortedPayload(payload&: result, state&: ie_sink.tables[1]->global_sort_state, block_idx: ie_lstate.right_block_index, |
988 | result: ie_lstate.true_sel, result_count: count, left_cols); |
989 | |
990 | // Fill in NULLs to the left |
991 | for (idx_t col_idx = 0; col_idx < left_cols; ++col_idx) { |
992 | result.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR); |
993 | ConstantVector::SetNull(vector&: result.data[col_idx], is_null: true); |
994 | } |
995 | |
996 | result.SetCardinality(count); |
997 | result.Verify(); |
998 | |
999 | break; |
1000 | } |
1001 | |
1002 | return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
1003 | } |
1004 | |
1005 | //===--------------------------------------------------------------------===// |
1006 | // Pipeline Construction |
1007 | //===--------------------------------------------------------------------===// |
1008 | void PhysicalIEJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
1009 | D_ASSERT(children.size() == 2); |
1010 | if (meta_pipeline.HasRecursiveCTE()) { |
1011 | throw NotImplementedException("IEJoins are not supported in recursive CTEs yet" ); |
1012 | } |
1013 | |
1014 | // becomes a source after both children fully sink their data |
1015 | meta_pipeline.GetState().SetPipelineSource(pipeline&: current, op&: *this); |
1016 | |
1017 | // Create one child meta pipeline that will hold the LHS and RHS pipelines |
1018 | auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this); |
1019 | |
1020 | // Build out LHS |
1021 | auto lhs_pipeline = child_meta_pipeline.GetBasePipeline(); |
1022 | children[0]->BuildPipelines(current&: *lhs_pipeline, meta_pipeline&: child_meta_pipeline); |
1023 | |
1024 | // Build out RHS |
1025 | auto rhs_pipeline = child_meta_pipeline.CreatePipeline(); |
1026 | children[1]->BuildPipelines(current&: *rhs_pipeline, meta_pipeline&: child_meta_pipeline); |
1027 | |
1028 | // Despite having the same sink, RHS and everything created after it need their own (same) PipelineFinishEvent |
1029 | child_meta_pipeline.AddFinishEvent(pipeline: rhs_pipeline); |
1030 | } |
1031 | |
1032 | } // namespace duckdb |
1033 | |