1#include "duckdb/execution/operator/join/physical_asof_join.hpp"
2
3#include "duckdb/common/fast_mem.hpp"
4#include "duckdb/common/operator/comparison_operators.hpp"
5#include "duckdb/common/row_operations/row_operations.hpp"
6#include "duckdb/common/sort/comparators.hpp"
7#include "duckdb/common/sort/partition_state.hpp"
8#include "duckdb/common/sort/sort.hpp"
9#include "duckdb/common/vector_operations/vector_operations.hpp"
10#include "duckdb/execution/expression_executor.hpp"
11#include "duckdb/execution/operator/join/outer_join_marker.hpp"
12#include "duckdb/main/client_context.hpp"
13#include "duckdb/parallel/event.hpp"
14#include "duckdb/parallel/thread_context.hpp"
15
16namespace duckdb {
17
18PhysicalAsOfJoin::PhysicalAsOfJoin(LogicalComparisonJoin &op, unique_ptr<PhysicalOperator> left,
19 unique_ptr<PhysicalOperator> right)
20 : PhysicalComparisonJoin(op, PhysicalOperatorType::ASOF_JOIN, std::move(op.conditions), op.join_type,
21 op.estimated_cardinality) {
22
23 // Convert the conditions partitions and sorts
24 for (auto &cond : conditions) {
25 D_ASSERT(cond.left->return_type == cond.right->return_type);
26 join_key_types.push_back(x: cond.left->return_type);
27
28 auto left = cond.left->Copy();
29 auto right = cond.right->Copy();
30 switch (cond.comparison) {
31 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
32 null_sensitive.emplace_back(args: lhs_orders.size());
33 lhs_orders.emplace_back(args: OrderType::ASCENDING, args: OrderByNullType::NULLS_LAST, args: std::move(left));
34 rhs_orders.emplace_back(args: OrderType::ASCENDING, args: OrderByNullType::NULLS_LAST, args: std::move(right));
35 break;
36 case ExpressionType::COMPARE_EQUAL:
37 null_sensitive.emplace_back(args: lhs_orders.size());
38 // Fall through
39 case ExpressionType::COMPARE_NOT_DISTINCT_FROM:
40 lhs_partitions.emplace_back(args: std::move(left));
41 rhs_partitions.emplace_back(args: std::move(right));
42 break;
43 default:
44 throw NotImplementedException("Unsupported join condition for ASOF join");
45 }
46 }
47 D_ASSERT(!lhs_orders.empty());
48 D_ASSERT(!rhs_orders.empty());
49
50 children.push_back(x: std::move(left));
51 children.push_back(x: std::move(right));
52
53 // Fill out the right projection map.
54 right_projection_map = op.right_projection_map;
55 if (right_projection_map.empty()) {
56 const auto right_count = children[1]->types.size();
57 right_projection_map.reserve(n: right_count);
58 for (column_t i = 0; i < right_count; ++i) {
59 right_projection_map.emplace_back(args&: i);
60 }
61 }
62}
63
64//===--------------------------------------------------------------------===//
65// Sink
66//===--------------------------------------------------------------------===//
67class AsOfGlobalSinkState : public GlobalSinkState {
68public:
69 AsOfGlobalSinkState(ClientContext &context, const PhysicalAsOfJoin &op)
70 : global_partition(context, op.rhs_partitions, op.rhs_orders, op.children[1]->types, {},
71 op.estimated_cardinality),
72 is_outer(IsRightOuterJoin(type: op.join_type)), has_null(false) {
73 }
74
75 idx_t Count() const {
76 return global_partition.count;
77 }
78
79 PartitionGlobalSinkState global_partition;
80
81 // One per partition
82 const bool is_outer;
83 vector<OuterJoinMarker> right_outers;
84 bool has_null;
85};
86
87class AsOfLocalSinkState : public LocalSinkState {
88public:
89 explicit AsOfLocalSinkState(ClientContext &context, PartitionGlobalSinkState &gstate_p)
90 : local_partition(context, gstate_p) {
91 }
92
93 void Sink(DataChunk &input_chunk) {
94 local_partition.Sink(input_chunk);
95 }
96
97 void Combine() {
98 local_partition.Combine();
99 }
100
101 PartitionLocalSinkState local_partition;
102};
103
104unique_ptr<GlobalSinkState> PhysicalAsOfJoin::GetGlobalSinkState(ClientContext &context) const {
105 return make_uniq<AsOfGlobalSinkState>(args&: context, args: *this);
106}
107
108unique_ptr<LocalSinkState> PhysicalAsOfJoin::GetLocalSinkState(ExecutionContext &context) const {
109 // We only sink the RHS
110 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
111 return make_uniq<AsOfLocalSinkState>(args&: context.client, args&: gsink.global_partition);
112}
113
114SinkResultType PhysicalAsOfJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
115 auto &lstate = input.local_state.Cast<AsOfLocalSinkState>();
116
117 lstate.Sink(input_chunk&: chunk);
118
119 return SinkResultType::NEED_MORE_INPUT;
120}
121
122void PhysicalAsOfJoin::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
123 auto &lstate = lstate_p.Cast<AsOfLocalSinkState>();
124 lstate.Combine();
125}
126
127//===--------------------------------------------------------------------===//
128// Finalize
129//===--------------------------------------------------------------------===//
130SinkFinalizeType PhysicalAsOfJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
131 GlobalSinkState &gstate_p) const {
132 auto &gstate = gstate_p.Cast<AsOfGlobalSinkState>();
133
134 // Find the first group to sort
135 auto &groups = gstate.global_partition.grouping_data->GetPartitions();
136 if (groups.empty() && EmptyResultIfRHSIsEmpty()) {
137 // Empty input!
138 return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
139 }
140
141 // Schedule all the sorts for maximum thread utilisation
142 auto new_event = make_shared<PartitionMergeEvent>(args&: gstate.global_partition, args&: pipeline);
143 event.InsertEvent(replacement_event: std::move(new_event));
144
145 return SinkFinalizeType::READY;
146}
147
148//===--------------------------------------------------------------------===//
149// Operator
150//===--------------------------------------------------------------------===//
151class AsOfGlobalState : public GlobalOperatorState {
152public:
153 explicit AsOfGlobalState(AsOfGlobalSinkState &gsink) {
154 // for FULL/RIGHT OUTER JOIN, initialize right_outers to false for every tuple
155 auto &global_partition = gsink.global_partition;
156 auto &right_outers = gsink.right_outers;
157 right_outers.reserve(n: global_partition.hash_groups.size());
158 for (const auto &hash_group : global_partition.hash_groups) {
159 right_outers.emplace_back(args: OuterJoinMarker(gsink.is_outer));
160 right_outers.back().Initialize(count_p: hash_group->count);
161 }
162 }
163};
164
165unique_ptr<GlobalOperatorState> PhysicalAsOfJoin::GetGlobalOperatorState(ClientContext &context) const {
166 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
167 return make_uniq<AsOfGlobalState>(args&: gsink);
168}
169
170class AsOfLocalState : public CachingOperatorState {
171public:
172 using Orders = vector<BoundOrderByNode>;
173 using Match = std::pair<hash_t, idx_t>;
174
175 AsOfLocalState(ClientContext &context, const PhysicalAsOfJoin &op, bool force_external);
176
177public:
178 void ResolveJoin(DataChunk &input, bool *found_matches, Match *matches = nullptr);
179
180 void ResolveJoinKeys(DataChunk &input);
181
182 ClientContext &context;
183 Allocator &allocator;
184 const PhysicalAsOfJoin &op;
185 BufferManager &buffer_manager;
186 const bool force_external;
187 Orders lhs_orders;
188
189 // LHS sorting
190 ExpressionExecutor lhs_executor;
191 DataChunk lhs_keys;
192 ValidityMask lhs_valid_mask;
193 SelectionVector lhs_sel;
194 idx_t lhs_valid;
195 RowLayout lhs_layout;
196 unique_ptr<GlobalSortState> lhs_global_state;
197 DataChunk lhs_sorted;
198
199 // LHS binning
200 Vector hash_vector;
201 Vector bin_vector;
202
203 // Output
204 idx_t lhs_match_count;
205 SelectionVector lhs_matched;
206 OuterJoinMarker left_outer;
207 bool fetch_next_left;
208 DataChunk group_payload;
209 DataChunk rhs_payload;
210};
211
212AsOfLocalState::AsOfLocalState(ClientContext &context, const PhysicalAsOfJoin &op, bool force_external)
213 : context(context), allocator(Allocator::Get(context)), op(op),
214 buffer_manager(BufferManager::GetBufferManager(context)), force_external(force_external), lhs_executor(context),
215 hash_vector(LogicalType::HASH), bin_vector(LogicalType::HASH), left_outer(IsLeftOuterJoin(type: op.join_type)),
216 fetch_next_left(true) {
217 vector<unique_ptr<BaseStatistics>> partition_stats;
218 Orders partitions; // Not used.
219 PartitionGlobalSinkState::GenerateOrderings(partitions, orders&: lhs_orders, partition_bys: op.lhs_partitions, order_bys: op.lhs_orders,
220 partitions_stats: partition_stats);
221
222 // We sort the row numbers of the incoming block, not the rows
223 lhs_layout.Initialize(types: {LogicalType::UINTEGER});
224 lhs_sorted.Initialize(allocator, types: lhs_layout.GetTypes());
225
226 lhs_keys.Initialize(allocator, types: op.join_key_types);
227 for (const auto &cond : op.conditions) {
228 lhs_executor.AddExpression(expr: *cond.left);
229 }
230
231 group_payload.Initialize(allocator, types: op.children[1]->types);
232 rhs_payload.Initialize(allocator, types: op.children[1]->types);
233
234 lhs_matched.Initialize();
235 lhs_sel.Initialize();
236 left_outer.Initialize(STANDARD_VECTOR_SIZE);
237}
238
239void AsOfLocalState::ResolveJoinKeys(DataChunk &input) {
240 // Compute the join keys
241 lhs_keys.Reset();
242 lhs_executor.Execute(input, result&: lhs_keys);
243
244 // Extract the NULLs
245 const auto count = input.size();
246 lhs_valid_mask.Reset();
247 for (auto col_idx : op.null_sensitive) {
248 auto &col = lhs_keys.data[col_idx];
249 UnifiedVectorFormat unified;
250 col.ToUnifiedFormat(count, data&: unified);
251 lhs_valid_mask.Combine(other: unified.validity, count);
252 }
253
254 // Convert the mask to a selection vector.
255 // We need this anyway for sorting
256 lhs_valid = 0;
257 const auto entry_count = lhs_valid_mask.EntryCount(count);
258 idx_t base_idx = 0;
259 for (idx_t entry_idx = 0; entry_idx < entry_count;) {
260 const auto validity_entry = lhs_valid_mask.GetValidityEntry(entry_idx: entry_idx++);
261 const auto next = MinValue<idx_t>(a: base_idx + ValidityMask::BITS_PER_VALUE, b: count);
262 if (ValidityMask::AllValid(entry: validity_entry)) {
263 for (; base_idx < next; ++base_idx) {
264 lhs_sel.set_index(idx: lhs_valid++, loc: base_idx);
265 }
266 } else if (ValidityMask::NoneValid(entry: validity_entry)) {
267 base_idx = next;
268 } else {
269 const auto start = base_idx;
270 for (; base_idx < next; ++base_idx) {
271 if (ValidityMask::RowIsValid(entry: validity_entry, idx_in_entry: base_idx - start)) {
272 lhs_sel.set_index(idx: lhs_valid++, loc: base_idx);
273 }
274 }
275 }
276 }
277
278 // Slice the keys to the ones we can match
279 if (lhs_valid < count) {
280 lhs_keys.Slice(sel_vector: lhs_sel, count: lhs_valid);
281 }
282
283 // Hash to assign the partitions
284 auto &global_partition = op.sink_state->Cast<AsOfGlobalSinkState>().global_partition;
285 if (op.lhs_partitions.empty()) {
286 // Only one hash group
287 bin_vector.Reference(value: Value::HASH(value: 0));
288 } else {
289 // Hash to determine the partitions.
290 VectorOperations::Hash(input&: lhs_keys.data[0], hashes&: hash_vector, rsel: lhs_sel, count: lhs_valid);
291 for (size_t prt_idx = 1; prt_idx < op.lhs_partitions.size(); ++prt_idx) {
292 VectorOperations::CombineHash(hashes&: hash_vector, input&: lhs_keys.data[prt_idx], rsel: lhs_sel, count: lhs_valid);
293 }
294
295 // Convert hashes to hash groups
296 const auto radix_bits = global_partition.grouping_data->GetRadixBits();
297 RadixPartitioning::HashesToBins(hashes&: hash_vector, radix_bits, bins&: bin_vector, count);
298 }
299
300 // Sort the selection vector on the valid keys
301 lhs_global_state = make_uniq<GlobalSortState>(args&: buffer_manager, args&: lhs_orders, args&: lhs_layout);
302 auto &global_state = *lhs_global_state;
303 LocalSortState local_sort;
304 local_sort.Initialize(global_sort_state&: *lhs_global_state, buffer_manager_p&: buffer_manager);
305
306 DataChunk payload_chunk;
307 payload_chunk.InitializeEmpty(types: {LogicalType::UINTEGER});
308 FlatVector::SetData(vector&: payload_chunk.data[0], data: data_ptr_cast(src: lhs_sel.data()));
309 payload_chunk.SetCardinality(lhs_valid);
310 local_sort.SinkChunk(sort&: lhs_keys, payload&: payload_chunk);
311
312 // Set external (can be forced with the PRAGMA)
313 global_state.external = force_external;
314 global_state.AddLocalState(local_sort_state&: local_sort);
315 global_state.PrepareMergePhase();
316 while (global_state.sorted_blocks.size() > 1) {
317 MergeSorter merge_sorter(*lhs_global_state, buffer_manager);
318 merge_sorter.PerformInMergeRound();
319 global_state.CompleteMergeRound();
320 }
321
322 // Scan the sorted selection
323 D_ASSERT(global_state.sorted_blocks.size() == 1);
324
325 auto scanner = make_uniq<PayloadScanner>(args&: *global_state.sorted_blocks[0]->payload_data, args&: global_state, args: false);
326 lhs_sorted.Reset();
327 scanner->Scan(chunk&: lhs_sorted);
328}
329
330void AsOfLocalState::ResolveJoin(DataChunk &input, bool *found_match, std::pair<hash_t, idx_t> *matches) {
331 // Sort the input into lhs_payload, radix keys in lhs_global_state
332 ResolveJoinKeys(input);
333
334 auto &gsink = op.sink_state->Cast<AsOfGlobalSinkState>();
335 auto &global_partition = gsink.global_partition;
336
337 // The bins are contiguous from sorting, so load them one at a time
338 // But they may be constant, so unify.
339 UnifiedVectorFormat bin_unified;
340 bin_vector.ToUnifiedFormat(count: lhs_valid, data&: bin_unified);
341 const auto bins = UnifiedVectorFormat::GetData<hash_t>(format: bin_unified);
342
343 hash_t prev_bin = global_partition.bin_groups.size();
344 optional_ptr<PartitionGlobalHashGroup> hash_group;
345 optional_ptr<OuterJoinMarker> right_outer;
346 // Searching for right <= left
347 SBIterator left(*lhs_global_state, ExpressionType::COMPARE_LESSTHANOREQUALTO);
348 unique_ptr<SBIterator> right;
349 lhs_match_count = 0;
350 const auto sorted_sel = FlatVector::GetData<sel_t>(vector&: lhs_sorted.data[0]);
351 for (idx_t i = 0; i < lhs_valid; ++i) {
352 // idx is the index in the input; i is the index in the sorted keys
353 const auto idx = sorted_sel[i];
354 const auto curr_bin = bins[bin_unified.sel->get_index(idx)];
355 if (!hash_group || curr_bin != prev_bin) {
356 // Grab the next group
357 prev_bin = curr_bin;
358 const auto group_idx = global_partition.bin_groups[curr_bin];
359 if (group_idx >= global_partition.hash_groups.size()) {
360 // No matching partition
361 hash_group = nullptr;
362 right_outer = nullptr;
363 right.reset();
364 continue;
365 }
366 hash_group = global_partition.hash_groups[group_idx].get();
367 right_outer = gsink.right_outers.data() + group_idx;
368 right = make_uniq<SBIterator>(args&: *(hash_group->global_sort), args: ExpressionType::COMPARE_LESSTHANOREQUALTO);
369 }
370 left.SetIndex(i);
371
372 // If right > left, then there is no match
373 if (!right->Compare(other: left)) {
374 continue;
375 }
376
377 // Exponential search forward for a non-matching value using radix iterators
378 // (We use exponential search to avoid thrashing the block manager on large probes)
379 idx_t bound = 1;
380 idx_t begin = right->GetIndex();
381 right->SetIndex(begin + bound);
382 while (right->GetIndex() < hash_group->count) {
383 if (right->Compare(other: left)) {
384 // If right <= left, jump ahead
385 bound *= 2;
386 right->SetIndex(begin + bound);
387 } else {
388 break;
389 }
390 }
391
392 // Binary search for the first non-matching value using radix iterators
393 // The previous value (which we know exists) is the match
394 auto first = begin + bound / 2;
395 auto last = MinValue<idx_t>(a: begin + bound, b: hash_group->count);
396 while (first < last) {
397 const auto mid = first + (last - first) / 2;
398 right->SetIndex(mid);
399 if (right->Compare(other: left)) {
400 // If right <= left, new lower bound
401 first = mid + 1;
402 } else {
403 last = mid;
404 }
405 }
406 right->SetIndex(--first);
407
408 // Check partitions for strict equality
409 if (!op.lhs_partitions.empty() && hash_group->ComparePartitions(left, right: *right)) {
410 continue;
411 }
412
413 // Emit match data
414 right_outer->SetMatch(first);
415 left_outer.SetMatch(idx);
416 if (found_match) {
417 found_match[idx] = true;
418 }
419 if (matches) {
420 matches[idx] = Match(curr_bin, first);
421 }
422 lhs_matched.set_index(idx: lhs_match_count++, loc: idx);
423 }
424}
425
426unique_ptr<OperatorState> PhysicalAsOfJoin::GetOperatorState(ExecutionContext &context) const {
427 auto &config = ClientConfig::GetConfig(context&: context.client);
428 return make_uniq<AsOfLocalState>(args&: context.client, args: *this, args&: config.force_external);
429}
430
431void PhysicalAsOfJoin::ResolveSimpleJoin(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
432 OperatorState &lstate_p) const {
433 auto &lstate = lstate_p.Cast<AsOfLocalState>();
434 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
435
436 // perform the actual join
437 bool found_match[STANDARD_VECTOR_SIZE] = {false};
438 lstate.ResolveJoin(input, found_match);
439
440 // now construct the result based on the join result
441 switch (join_type) {
442 case JoinType::MARK: {
443 PhysicalJoin::ConstructMarkJoinResult(join_keys&: lstate.lhs_keys, left&: input, result&: chunk, found_match, has_null: gsink.has_null);
444 break;
445 }
446 case JoinType::SEMI:
447 PhysicalJoin::ConstructSemiJoinResult(left&: input, result&: chunk, found_match);
448 break;
449 case JoinType::ANTI:
450 PhysicalJoin::ConstructAntiJoinResult(left&: input, result&: chunk, found_match);
451 break;
452 default:
453 throw NotImplementedException("Unimplemented join type for AsOf join");
454 }
455}
456
457OperatorResultType PhysicalAsOfJoin::ResolveComplexJoin(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
458 OperatorState &lstate_p) const {
459 auto &lstate = lstate_p.Cast<AsOfLocalState>();
460 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
461
462 if (!lstate.fetch_next_left) {
463 lstate.fetch_next_left = true;
464 if (lstate.left_outer.Enabled()) {
465 // left join: before we move to the next chunk, see if we need to output any vectors that didn't
466 // have a match found
467 lstate.left_outer.ConstructLeftJoinResult(left&: input, result&: chunk);
468 lstate.left_outer.Reset();
469 }
470 return OperatorResultType::NEED_MORE_INPUT;
471 }
472
473 // perform the actual join
474 AsOfLocalState::Match matches[STANDARD_VECTOR_SIZE];
475 lstate.ResolveJoin(input, found_match: nullptr, matches);
476 lstate.group_payload.Reset();
477 lstate.rhs_payload.Reset();
478
479 auto &global_partition = gsink.global_partition;
480 hash_t scan_bin = global_partition.bin_groups.size();
481 optional_ptr<PartitionGlobalHashGroup> hash_group;
482 unique_ptr<PayloadScanner> scanner;
483 for (idx_t i = 0; i < lstate.lhs_match_count; ++i) {
484 const auto idx = lstate.lhs_matched[i];
485 const auto match_bin = matches[idx].first;
486 const auto match_pos = matches[idx].second;
487 if (match_bin != scan_bin) {
488 // Grab the next group
489 const auto group_idx = global_partition.bin_groups[match_bin];
490 hash_group = global_partition.hash_groups[group_idx].get();
491 scan_bin = match_bin;
492 scanner = make_uniq<PayloadScanner>(args&: *hash_group->global_sort, args: false);
493 lstate.group_payload.Reset();
494 }
495 // Skip to the range containing the match
496 while (match_pos >= scanner->Scanned()) {
497 lstate.group_payload.Reset();
498 scanner->Scan(chunk&: lstate.group_payload);
499 }
500 // Append the individual values
501 // TODO: Batch the copies
502 const auto source_offset = match_pos - (scanner->Scanned() - lstate.group_payload.size());
503 for (idx_t col_idx = 0; col_idx < right_projection_map.size(); ++col_idx) {
504 const auto rhs_idx = right_projection_map[col_idx];
505 auto &source = lstate.group_payload.data[rhs_idx];
506 auto &target = chunk.data[input.ColumnCount() + col_idx];
507 VectorOperations::Copy(source, target, source_count: source_offset + 1, source_offset, target_offset: i);
508 }
509 }
510
511 // Slice the input into the left side
512 chunk.Slice(other&: input, sel: lstate.lhs_matched, count: lstate.lhs_match_count);
513
514 // If we are doing a left join, come back for the NULLs
515 if (lstate.left_outer.Enabled()) {
516 lstate.fetch_next_left = false;
517 return OperatorResultType::HAVE_MORE_OUTPUT;
518 }
519
520 return OperatorResultType::NEED_MORE_INPUT;
521}
522
523OperatorResultType PhysicalAsOfJoin::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
524 GlobalOperatorState &gstate, OperatorState &lstate) const {
525 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
526
527 if (gsink.global_partition.count == 0) {
528 // empty RHS
529 if (!EmptyResultIfRHSIsEmpty()) {
530 ConstructEmptyJoinResult(type: join_type, has_null: gsink.has_null, input, result&: chunk);
531 return OperatorResultType::NEED_MORE_INPUT;
532 } else {
533 return OperatorResultType::FINISHED;
534 }
535 }
536
537 input.Verify();
538 switch (join_type) {
539 case JoinType::SEMI:
540 case JoinType::ANTI:
541 case JoinType::MARK:
542 // simple joins can have max STANDARD_VECTOR_SIZE matches per chunk
543 ResolveSimpleJoin(context, input, chunk, lstate_p&: lstate);
544 return OperatorResultType::NEED_MORE_INPUT;
545 case JoinType::LEFT:
546 case JoinType::INNER:
547 case JoinType::RIGHT:
548 case JoinType::OUTER:
549 return ResolveComplexJoin(context, input, chunk, lstate_p&: lstate);
550 default:
551 throw NotImplementedException("Unimplemented type for as-of join!");
552 }
553}
554
555//===--------------------------------------------------------------------===//
556// Source
557//===--------------------------------------------------------------------===//
558class AsOfGlobalSourceState : public GlobalSourceState {
559public:
560 explicit AsOfGlobalSourceState(PartitionGlobalSinkState &gsink_p) : gsink(gsink_p), next_bin(0) {
561 }
562
563 PartitionGlobalSinkState &gsink;
564 //! The output read position.
565 atomic<idx_t> next_bin;
566
567public:
568 idx_t MaxThreads() override {
569 // If there is only one partition, we have to process it on one thread.
570 if (!gsink.grouping_data) {
571 return 1;
572 }
573
574 // If there is not a lot of data, process serially.
575 if (gsink.count < STANDARD_ROW_GROUPS_SIZE) {
576 return 1;
577 }
578
579 return gsink.hash_groups.size();
580 }
581};
582
583unique_ptr<GlobalSourceState> PhysicalAsOfJoin::GetGlobalSourceState(ClientContext &context) const {
584 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
585 return make_uniq<AsOfGlobalSourceState>(args&: gsink.global_partition);
586}
587
588class AsOfLocalSourceState : public LocalSourceState {
589public:
590 using HashGroupPtr = unique_ptr<PartitionGlobalHashGroup>;
591
592 explicit AsOfLocalSourceState(AsOfGlobalSinkState &gstate_p);
593
594 idx_t GeneratePartition(const idx_t hash_bin);
595
596 AsOfGlobalSinkState &gstate;
597
598 //! The read partition
599 idx_t hash_bin;
600 HashGroupPtr hash_group;
601
602 //! The read cursor
603 unique_ptr<PayloadScanner> scanner;
604 //! Buffer for the inputs
605 DataChunk input_chunk;
606 //! Pointer to the matches
607 const bool *found_match;
608};
609
610AsOfLocalSourceState::AsOfLocalSourceState(AsOfGlobalSinkState &gstate_p) : gstate(gstate_p) {
611 input_chunk.Initialize(allocator&: gstate.global_partition.allocator, types: gstate.global_partition.payload_types);
612}
613
614idx_t AsOfLocalSourceState::GeneratePartition(const idx_t hash_bin_p) {
615 // Get rid of any stale data
616 hash_bin = hash_bin_p;
617
618 hash_group = std::move(gstate.global_partition.hash_groups[hash_bin]);
619 scanner = make_uniq<PayloadScanner>(args&: *hash_group->global_sort);
620 found_match = gstate.right_outers[hash_bin].GetMatches();
621
622 return scanner->Remaining();
623}
624
625unique_ptr<LocalSourceState> PhysicalAsOfJoin::GetLocalSourceState(ExecutionContext &context,
626 GlobalSourceState &gstate) const {
627 auto &gsink = sink_state->Cast<AsOfGlobalSinkState>();
628 return make_uniq<AsOfLocalSourceState>(args&: gsink);
629}
630
631SourceResultType PhysicalAsOfJoin::GetData(ExecutionContext &context, DataChunk &chunk,
632 OperatorSourceInput &input) const {
633 D_ASSERT(IsRightOuterJoin(join_type));
634
635 auto &gsource = input.global_state.Cast<AsOfGlobalSourceState>();
636 auto &lsource = input.local_state.Cast<AsOfLocalSourceState>();
637 auto &gsink = gsource.gsink;
638
639 auto &hash_groups = gsink.hash_groups;
640 const auto bin_count = hash_groups.size();
641
642 DataChunk rhs_chunk;
643 rhs_chunk.Initialize(allocator&: Allocator::Get(context&: context.client), types: gsink.payload_types);
644 SelectionVector rsel(STANDARD_VECTOR_SIZE);
645
646 while (chunk.size() == 0) {
647 // Move to the next bin if we are done.
648 while (!lsource.scanner || !lsource.scanner->Remaining()) {
649 lsource.scanner.reset();
650 lsource.hash_group.reset();
651 auto hash_bin = gsource.next_bin++;
652 if (hash_bin >= bin_count) {
653 return SourceResultType::FINISHED;
654 }
655
656 for (; hash_bin < hash_groups.size(); hash_bin = gsource.next_bin++) {
657 if (hash_groups[hash_bin]) {
658 break;
659 }
660 }
661 lsource.GeneratePartition(hash_bin_p: hash_bin);
662 }
663 const auto rhs_position = lsource.scanner->Scanned();
664 lsource.scanner->Scan(chunk&: rhs_chunk);
665
666 const auto count = rhs_chunk.size();
667 if (count == 0) {
668 return SourceResultType::FINISHED;
669 }
670
671 // figure out which tuples didn't find a match in the RHS
672 auto found_match = lsource.found_match;
673 idx_t result_count = 0;
674 for (idx_t i = 0; i < count; i++) {
675 if (!found_match[rhs_position + i]) {
676 rsel.set_index(idx: result_count++, loc: i);
677 }
678 }
679
680 if (result_count > 0) {
681 // if there were any tuples that didn't find a match, output them
682 const idx_t left_column_count = children[0]->types.size();
683 for (idx_t col_idx = 0; col_idx < left_column_count; ++col_idx) {
684 chunk.data[col_idx].SetVectorType(VectorType::CONSTANT_VECTOR);
685 ConstantVector::SetNull(vector&: chunk.data[col_idx], is_null: true);
686 }
687 for (idx_t col_idx = 0; col_idx < right_projection_map.size(); ++col_idx) {
688 const auto rhs_idx = right_projection_map[col_idx];
689 chunk.data[left_column_count + col_idx].Slice(other&: rhs_chunk.data[rhs_idx], sel: rsel, count: result_count);
690 }
691 chunk.SetCardinality(result_count);
692 break;
693 }
694 }
695
696 return chunk.size() > 0 ? SourceResultType::HAVE_MORE_OUTPUT : SourceResultType::FINISHED;
697}
698
699} // namespace duckdb
700