1#include "duckdb/execution/join_hashtable.hpp"
2
3#include "duckdb/storage/buffer_manager.hpp"
4
5#include "duckdb/common/exception.hpp"
6#include "duckdb/common/types/null_value.hpp"
7#include "duckdb/common/vector_operations/vector_operations.hpp"
8#include "duckdb/common/vector_operations/unary_executor.hpp"
9#include "duckdb/common/operator/comparison_operators.hpp"
10
11using namespace std;
12
13namespace duckdb {
14
15using ScanStructure = JoinHashTable::ScanStructure;
16
17JoinHashTable::JoinHashTable(BufferManager &buffer_manager, vector<JoinCondition> &conditions, vector<TypeId> btypes,
18 JoinType type)
19 : buffer_manager(buffer_manager), build_types(move(btypes)), equality_size(0), condition_size(0), build_size(0),
20 entry_size(0), tuple_size(0), join_type(type), finalized(false), has_null(false), count(0) {
21 for (auto &condition : conditions) {
22 assert(condition.left->return_type == condition.right->return_type);
23 auto type = condition.left->return_type;
24 auto type_size = GetTypeIdSize(type);
25 if (condition.comparison == ExpressionType::COMPARE_EQUAL) {
26 // all equality conditions should be at the front
27 // all other conditions at the back
28 // this assert checks that
29 assert(equality_types.size() == condition_types.size());
30 equality_types.push_back(type);
31 equality_size += type_size;
32 }
33 predicates.push_back(condition.comparison);
34 null_values_are_equal.push_back(condition.null_values_are_equal);
35 assert(!condition.null_values_are_equal ||
36 (condition.null_values_are_equal && condition.comparison == ExpressionType::COMPARE_EQUAL));
37
38 condition_types.push_back(type);
39 condition_size += type_size;
40 }
41 // at least one equality is necessary
42 assert(equality_types.size() > 0);
43
44 if (type == JoinType::ANTI || type == JoinType::SEMI || type == JoinType::MARK) {
45 // for ANTI, SEMI and MARK join, we only need to store the keys
46 build_size = 0;
47 build_types.clear();
48 } else {
49 // otherwise we need to store the entire build side for reconstruction
50 // purposes
51 for (idx_t i = 0; i < build_types.size(); i++) {
52 build_size += GetTypeIdSize(build_types[i]);
53 }
54 }
55 tuple_size = condition_size + build_size;
56 // entry size is the tuple size and the size of the hash/next pointer
57 entry_size = tuple_size + std::max(sizeof(hash_t), sizeof(uintptr_t));
58 // compute the per-block capacity of this HT
59 block_capacity = std::max((idx_t)STANDARD_VECTOR_SIZE, (Storage::BLOCK_ALLOC_SIZE / entry_size) + 1);
60}
61
62JoinHashTable::~JoinHashTable() {
63 if (hash_map) {
64 auto hash_id = hash_map->block_id;
65 hash_map.reset();
66 buffer_manager.DestroyBuffer(hash_id);
67 }
68 pinned_handles.clear();
69 for (auto &block : blocks) {
70 buffer_manager.DestroyBuffer(block.block_id);
71 }
72}
73
74void JoinHashTable::ApplyBitmask(Vector &hashes, idx_t count) {
75 if (hashes.vector_type == VectorType::CONSTANT_VECTOR) {
76 assert(!ConstantVector::IsNull(hashes));
77 auto indices = ConstantVector::GetData<hash_t>(hashes);
78 *indices = *indices & bitmask;
79 } else {
80 hashes.Normalify(count);
81 auto indices = FlatVector::GetData<hash_t>(hashes);
82 for (idx_t i = 0; i < count; i++) {
83 indices[i] &= bitmask;
84 }
85 }
86}
87
88void JoinHashTable::ApplyBitmask(Vector &hashes, const SelectionVector &sel, idx_t count, Vector &pointers) {
89 VectorData hdata;
90 hashes.Orrify(count, hdata);
91
92 auto hash_data = (hash_t *)hdata.data;
93 auto result_data = FlatVector::GetData<data_ptr_t *>(pointers);
94 auto main_ht = (data_ptr_t *)hash_map->node->buffer;
95 for (idx_t i = 0; i < count; i++) {
96 auto rindex = sel.get_index(i);
97 auto hindex = hdata.sel->get_index(rindex);
98 auto hash = hash_data[hindex];
99 result_data[rindex] = main_ht + (hash & bitmask);
100 }
101}
102
103void JoinHashTable::Hash(DataChunk &keys, const SelectionVector &sel, idx_t count, Vector &hashes) {
104 if (count == keys.size()) {
105 // no null values are filtered: use regular hash functions
106 VectorOperations::Hash(keys.data[0], hashes, keys.size());
107 for (idx_t i = 1; i < equality_types.size(); i++) {
108 VectorOperations::CombineHash(hashes, keys.data[i], keys.size());
109 }
110 } else {
111 // null values were filtered: use selection vector
112 VectorOperations::Hash(keys.data[0], hashes, sel, count);
113 for (idx_t i = 1; i < equality_types.size(); i++) {
114 VectorOperations::CombineHash(hashes, keys.data[i], sel, count);
115 }
116 }
117}
118template <class T>
119static void templated_serialize_vdata(VectorData &vdata, const SelectionVector &sel, idx_t count,
120 data_ptr_t key_locations[]) {
121 auto source = (T *)vdata.data;
122 if (vdata.nullmask->any()) {
123 for (idx_t i = 0; i < count; i++) {
124 auto idx = sel.get_index(i);
125 auto source_idx = vdata.sel->get_index(idx);
126
127 auto target = (T *)key_locations[i];
128 if ((*vdata.nullmask)[source_idx]) {
129 *target = NullValue<T>();
130 } else {
131 *target = source[source_idx];
132 }
133 key_locations[i] += sizeof(T);
134 }
135 } else {
136 for (idx_t i = 0; i < count; i++) {
137 auto idx = sel.get_index(i);
138 auto source_idx = vdata.sel->get_index(idx);
139
140 auto target = (T *)key_locations[i];
141 *target = source[source_idx];
142 key_locations[i] += sizeof(T);
143 }
144 }
145}
146
147void JoinHashTable::SerializeVectorData(VectorData &vdata, TypeId type, const SelectionVector &sel, idx_t count,
148 data_ptr_t key_locations[]) {
149 switch (type) {
150 case TypeId::BOOL:
151 case TypeId::INT8:
152 templated_serialize_vdata<int8_t>(vdata, sel, count, key_locations);
153 break;
154 case TypeId::INT16:
155 templated_serialize_vdata<int16_t>(vdata, sel, count, key_locations);
156 break;
157 case TypeId::INT32:
158 templated_serialize_vdata<int32_t>(vdata, sel, count, key_locations);
159 break;
160 case TypeId::INT64:
161 templated_serialize_vdata<int64_t>(vdata, sel, count, key_locations);
162 break;
163 case TypeId::FLOAT:
164 templated_serialize_vdata<float>(vdata, sel, count, key_locations);
165 break;
166 case TypeId::DOUBLE:
167 templated_serialize_vdata<double>(vdata, sel, count, key_locations);
168 break;
169 case TypeId::HASH:
170 templated_serialize_vdata<hash_t>(vdata, sel, count, key_locations);
171 break;
172 case TypeId::VARCHAR: {
173 auto source = (string_t *)vdata.data;
174 for (idx_t i = 0; i < count; i++) {
175 auto idx = sel.get_index(i);
176 auto source_idx = vdata.sel->get_index(idx);
177
178 auto target = (string_t *)key_locations[i];
179 if ((*vdata.nullmask)[source_idx]) {
180 *target = NullValue<string_t>();
181 } else if (source[source_idx].IsInlined()) {
182 *target = source[source_idx];
183 } else {
184 *target = string_heap.AddString(source[source_idx]);
185 }
186 key_locations[i] += sizeof(string_t);
187 }
188 break;
189 }
190 default:
191 throw NotImplementedException("FIXME: unimplemented serialize");
192 }
193}
194
195void JoinHashTable::SerializeVector(Vector &v, idx_t vcount, const SelectionVector &sel, idx_t count,
196 data_ptr_t key_locations[]) {
197 VectorData vdata;
198 v.Orrify(vcount, vdata);
199
200 SerializeVectorData(vdata, v.type, sel, count, key_locations);
201}
202
203idx_t JoinHashTable::AppendToBlock(HTDataBlock &block, BufferHandle &handle, idx_t count, data_ptr_t key_locations[],
204 idx_t remaining) {
205 idx_t append_count = std::min(remaining, block.capacity - block.count);
206 auto dataptr = handle.node->buffer + block.count * entry_size;
207 idx_t offset = count - remaining;
208 for (idx_t i = 0; i < append_count; i++) {
209 key_locations[offset + i] = dataptr;
210 dataptr += entry_size;
211 }
212 block.count += append_count;
213 return append_count;
214}
215
216static idx_t FilterNullValues(VectorData &vdata, const SelectionVector &sel, idx_t count, SelectionVector &result) {
217 auto &nullmask = *vdata.nullmask;
218 idx_t result_count = 0;
219 for (idx_t i = 0; i < count; i++) {
220 auto idx = sel.get_index(i);
221 auto key_idx = vdata.sel->get_index(idx);
222 if (!nullmask[key_idx]) {
223 result.set_index(result_count++, idx);
224 }
225 }
226 return result_count;
227}
228
229idx_t JoinHashTable::PrepareKeys(DataChunk &keys, unique_ptr<VectorData[]> &key_data,
230 const SelectionVector *&current_sel, SelectionVector &sel) {
231 key_data = keys.Orrify();
232
233 // figure out which keys are NULL, and create a selection vector out of them
234 current_sel = &FlatVector::IncrementalSelectionVector;
235 idx_t added_count = keys.size();
236 for (idx_t i = 0; i < keys.column_count(); i++) {
237 if (!null_values_are_equal[i]) {
238 if (!key_data[i].nullmask->any()) {
239 continue;
240 }
241 added_count = FilterNullValues(key_data[i], *current_sel, added_count, sel);
242 // null values are NOT equal for this column, filter them out
243 current_sel = &sel;
244 }
245 }
246 return added_count;
247}
248
249void JoinHashTable::Build(DataChunk &keys, DataChunk &payload) {
250 assert(!finalized);
251 assert(keys.size() == payload.size());
252 if (keys.size() == 0) {
253 return;
254 }
255 // special case: correlated mark join
256 if (join_type == JoinType::MARK && correlated_mark_join_info.correlated_types.size() > 0) {
257 auto &info = correlated_mark_join_info;
258 // Correlated MARK join
259 // for the correlated mark join we need to keep track of COUNT(*) and COUNT(COLUMN) for each of the correlated
260 // columns push into the aggregate hash table
261 assert(info.correlated_counts);
262 info.group_chunk.SetCardinality(keys);
263 for (idx_t i = 0; i < info.correlated_types.size(); i++) {
264 info.group_chunk.data[i].Reference(keys.data[i]);
265 }
266 info.payload_chunk.SetCardinality(keys);
267 for (idx_t i = 0; i < 2; i++) {
268 info.payload_chunk.data[i].Reference(keys.data[info.correlated_types.size()]);
269 }
270 info.correlated_counts->AddChunk(info.group_chunk, info.payload_chunk);
271 }
272
273 // prepare the keys for processing
274 unique_ptr<VectorData[]> key_data;
275 const SelectionVector *current_sel;
276 SelectionVector sel(STANDARD_VECTOR_SIZE);
277 idx_t added_count = PrepareKeys(keys, key_data, current_sel, sel);
278 if (added_count < keys.size()) {
279 has_null = true;
280 }
281 if (added_count == 0) {
282 return;
283 }
284 count += added_count;
285
286 vector<unique_ptr<BufferHandle>> handles;
287 data_ptr_t key_locations[STANDARD_VECTOR_SIZE];
288 // first allocate space of where to serialize the keys and payload columns
289 idx_t remaining = added_count;
290 // first append to the last block (if any)
291 if (blocks.size() != 0) {
292 auto &last_block = blocks.back();
293 if (last_block.count < last_block.capacity) {
294 // last block has space: pin the buffer of this block
295 auto handle = buffer_manager.Pin(last_block.block_id);
296 // now append to the block
297 idx_t append_count = AppendToBlock(last_block, *handle, added_count, key_locations, remaining);
298 remaining -= append_count;
299 handles.push_back(move(handle));
300 }
301 }
302 while (remaining > 0) {
303 // now for the remaining data, allocate new buffers to store the data and append there
304 auto handle = buffer_manager.Allocate(block_capacity * entry_size);
305
306 HTDataBlock new_block;
307 new_block.count = 0;
308 new_block.capacity = block_capacity;
309 new_block.block_id = handle->block_id;
310
311 idx_t append_count = AppendToBlock(new_block, *handle, added_count, key_locations, remaining);
312 remaining -= append_count;
313 handles.push_back(move(handle));
314 blocks.push_back(new_block);
315 }
316
317 // hash the keys and obtain an entry in the list
318 // note that we only hash the keys used in the equality comparison
319 Vector hash_values(TypeId::HASH);
320 Hash(keys, *current_sel, added_count, hash_values);
321
322 // serialize the keys to the key locations
323 for (idx_t i = 0; i < keys.column_count(); i++) {
324 SerializeVectorData(key_data[i], keys.data[i].type, *current_sel, added_count, key_locations);
325 }
326 // now serialize the payload
327 if (build_types.size() > 0) {
328 for (idx_t i = 0; i < payload.column_count(); i++) {
329 SerializeVector(payload.data[i], payload.size(), *current_sel, added_count, key_locations);
330 }
331 }
332 SerializeVector(hash_values, payload.size(), *current_sel, added_count, key_locations);
333}
334
335void JoinHashTable::InsertHashes(Vector &hashes, idx_t count, data_ptr_t key_locations[]) {
336 assert(hashes.type == TypeId::HASH);
337
338 // use bitmask to get position in array
339 ApplyBitmask(hashes, count);
340
341 hashes.Normalify(count);
342
343 assert(hashes.vector_type == VectorType::FLAT_VECTOR);
344 auto pointers = (data_ptr_t *)hash_map->node->buffer;
345 auto indices = FlatVector::GetData<hash_t>(hashes);
346 for (idx_t i = 0; i < count; i++) {
347 auto index = indices[i];
348 // set prev in current key to the value (NOTE: this will be nullptr if
349 // there is none)
350 auto prev_pointer = (data_ptr_t *)(key_locations[i] + tuple_size);
351 *prev_pointer = pointers[index];
352
353 // set pointer to current tuple
354 pointers[index] = key_locations[i];
355 }
356}
357
358void JoinHashTable::Finalize() {
359 // the build has finished, now iterate over all the nodes and construct the final hash table
360 // select a HT that has at least 50% empty space
361 idx_t capacity = NextPowerOfTwo(std::max(count * 2, (idx_t)(Storage::BLOCK_ALLOC_SIZE / sizeof(data_ptr_t)) + 1));
362 // size needs to be a power of 2
363 assert((capacity & (capacity - 1)) == 0);
364 bitmask = capacity - 1;
365
366 // allocate the HT and initialize it with all-zero entries
367 hash_map = buffer_manager.Allocate(capacity * sizeof(data_ptr_t));
368 memset(hash_map->node->buffer, 0, capacity * sizeof(data_ptr_t));
369
370 Vector hashes(TypeId::HASH);
371 auto hash_data = FlatVector::GetData<hash_t>(hashes);
372 data_ptr_t key_locations[STANDARD_VECTOR_SIZE];
373 // now construct the actual hash table; scan the nodes
374 // as we can the nodes we pin all the blocks of the HT and keep them pinned until the HT is destroyed
375 // this is so that we can keep pointers around to the blocks
376 // FIXME: if we cannot keep everything pinned in memory, we could switch to an out-of-memory merge join or so
377 for (auto &block : blocks) {
378 auto handle = buffer_manager.Pin(block.block_id);
379 data_ptr_t dataptr = handle->node->buffer;
380 idx_t entry = 0;
381 while (entry < block.count) {
382 // fetch the next vector of entries from the blocks
383 idx_t next = std::min((idx_t)STANDARD_VECTOR_SIZE, block.count - entry);
384 for (idx_t i = 0; i < next; i++) {
385 hash_data[i] = *((hash_t *)(dataptr + tuple_size));
386 key_locations[i] = dataptr;
387 dataptr += entry_size;
388 }
389 // now insert into the hash table
390 InsertHashes(hashes, next, key_locations);
391
392 entry += next;
393 }
394 pinned_handles.push_back(move(handle));
395 }
396 finalized = true;
397}
398
399unique_ptr<ScanStructure> JoinHashTable::Probe(DataChunk &keys) {
400 assert(count > 0); // should be handled before
401 assert(finalized);
402
403 // set up the scan structure
404 auto ss = make_unique<ScanStructure>(*this);
405
406 if (join_type != JoinType::INNER) {
407 ss->found_match = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]);
408 memset(ss->found_match.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE);
409 }
410
411 // first prepare the keys for probing
412 const SelectionVector *current_sel;
413 ss->count = PrepareKeys(keys, ss->key_data, current_sel, ss->sel_vector);
414 if (ss->count == 0) {
415 return ss;
416 }
417
418 // hash all the keys
419 Vector hashes(TypeId::HASH);
420 Hash(keys, *current_sel, ss->count, hashes);
421
422 // now initialize the pointers of the scan structure based on the hashes
423 ApplyBitmask(hashes, *current_sel, ss->count, ss->pointers);
424
425 // create the selection vector linking to only non-empty entries
426 idx_t count = 0;
427 auto pointers = FlatVector::GetData<data_ptr_t>(ss->pointers);
428 for (idx_t i = 0; i < ss->count; i++) {
429 auto idx = current_sel->get_index(i);
430 auto chain_pointer = (data_ptr_t *)(pointers[idx]);
431 pointers[idx] = *chain_pointer;
432 if (pointers[idx]) {
433 ss->sel_vector.set_index(count++, idx);
434 }
435 }
436 ss->count = count;
437 return ss;
438}
439
440ScanStructure::ScanStructure(JoinHashTable &ht) : sel_vector(STANDARD_VECTOR_SIZE), ht(ht), finished(false) {
441 pointers.Initialize(TypeId::POINTER);
442}
443
444void ScanStructure::Next(DataChunk &keys, DataChunk &left, DataChunk &result) {
445 if (finished) {
446 return;
447 }
448
449 switch (ht.join_type) {
450 case JoinType::INNER:
451 NextInnerJoin(keys, left, result);
452 break;
453 case JoinType::SEMI:
454 NextSemiJoin(keys, left, result);
455 break;
456 case JoinType::MARK:
457 NextMarkJoin(keys, left, result);
458 break;
459 case JoinType::ANTI:
460 NextAntiJoin(keys, left, result);
461 break;
462 case JoinType::LEFT:
463 NextLeftJoin(keys, left, result);
464 break;
465 case JoinType::SINGLE:
466 NextSingleJoin(keys, left, result);
467 break;
468 default:
469 throw Exception("Unhandled join type in JoinHashTable");
470 }
471}
472
473template <bool NO_MATCH_SEL, class T, class OP>
474static idx_t TemplatedGather(VectorData &vdata, Vector &pointers, const SelectionVector &current_sel, idx_t count,
475 idx_t offset, SelectionVector *match_sel, SelectionVector *no_match_sel,
476 idx_t &no_match_count) {
477 idx_t result_count = 0;
478 auto data = (T *)vdata.data;
479 auto ptrs = FlatVector::GetData<uintptr_t>(pointers);
480 for (idx_t i = 0; i < count; i++) {
481 auto idx = current_sel.get_index(i);
482 auto kidx = vdata.sel->get_index(idx);
483 auto gdata = (T *)(ptrs[idx] + offset);
484 if ((*vdata.nullmask)[kidx]) {
485 if (IsNullValue<T>(*gdata)) {
486 match_sel->set_index(result_count++, idx);
487 } else {
488 if (NO_MATCH_SEL) {
489 no_match_sel->set_index(no_match_count++, idx);
490 }
491 }
492 } else {
493 if (OP::template Operation<T>(data[kidx], *gdata)) {
494 match_sel->set_index(result_count++, idx);
495 } else {
496 if (NO_MATCH_SEL) {
497 no_match_sel->set_index(no_match_count++, idx);
498 }
499 }
500 }
501 }
502 return result_count;
503}
504
505template <bool NO_MATCH_SEL, class OP>
506static idx_t GatherSwitch(VectorData &data, TypeId type, Vector &pointers, const SelectionVector &current_sel,
507 idx_t count, idx_t offset, SelectionVector *match_sel, SelectionVector *no_match_sel,
508 idx_t &no_match_count) {
509 switch (type) {
510 case TypeId::BOOL:
511 case TypeId::INT8:
512 return TemplatedGather<NO_MATCH_SEL, int8_t, OP>(data, pointers, current_sel, count, offset, match_sel,
513 no_match_sel, no_match_count);
514 case TypeId::INT16:
515 return TemplatedGather<NO_MATCH_SEL, int16_t, OP>(data, pointers, current_sel, count, offset, match_sel,
516 no_match_sel, no_match_count);
517 case TypeId::INT32:
518 return TemplatedGather<NO_MATCH_SEL, int32_t, OP>(data, pointers, current_sel, count, offset, match_sel,
519 no_match_sel, no_match_count);
520 case TypeId::INT64:
521 return TemplatedGather<NO_MATCH_SEL, int64_t, OP>(data, pointers, current_sel, count, offset, match_sel,
522 no_match_sel, no_match_count);
523 case TypeId::FLOAT:
524 return TemplatedGather<NO_MATCH_SEL, float, OP>(data, pointers, current_sel, count, offset, match_sel,
525 no_match_sel, no_match_count);
526 case TypeId::DOUBLE:
527 return TemplatedGather<NO_MATCH_SEL, double, OP>(data, pointers, current_sel, count, offset, match_sel,
528 no_match_sel, no_match_count);
529 case TypeId::VARCHAR:
530 return TemplatedGather<NO_MATCH_SEL, string_t, OP>(data, pointers, current_sel, count, offset, match_sel,
531 no_match_sel, no_match_count);
532 default:
533 throw NotImplementedException("Unimplemented type for GatherSwitch");
534 }
535}
536
537template <bool NO_MATCH_SEL>
538idx_t ScanStructure::ResolvePredicates(DataChunk &keys, SelectionVector *match_sel, SelectionVector *no_match_sel) {
539 SelectionVector *current_sel = &this->sel_vector;
540 idx_t remaining_count = this->count;
541 idx_t offset = 0;
542 idx_t no_match_count = 0;
543 for (idx_t i = 0; i < ht.predicates.size(); i++) {
544 switch (ht.predicates[i]) {
545 case ExpressionType::COMPARE_EQUAL:
546 remaining_count =
547 GatherSwitch<NO_MATCH_SEL, Equals>(key_data[i], keys.data[i].type, this->pointers, *current_sel,
548 remaining_count, offset, match_sel, no_match_sel, no_match_count);
549 break;
550 case ExpressionType::COMPARE_NOTEQUAL:
551 remaining_count =
552 GatherSwitch<NO_MATCH_SEL, NotEquals>(key_data[i], keys.data[i].type, this->pointers, *current_sel,
553 remaining_count, offset, match_sel, no_match_sel, no_match_count);
554 break;
555 case ExpressionType::COMPARE_GREATERTHAN:
556 remaining_count = GatherSwitch<NO_MATCH_SEL, GreaterThan>(key_data[i], keys.data[i].type, this->pointers,
557 *current_sel, remaining_count, offset, match_sel,
558 no_match_sel, no_match_count);
559 break;
560 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
561 remaining_count = GatherSwitch<NO_MATCH_SEL, GreaterThanEquals>(
562 key_data[i], keys.data[i].type, this->pointers, *current_sel, remaining_count, offset, match_sel,
563 no_match_sel, no_match_count);
564 break;
565 case ExpressionType::COMPARE_LESSTHAN:
566 remaining_count =
567 GatherSwitch<NO_MATCH_SEL, LessThan>(key_data[i], keys.data[i].type, this->pointers, *current_sel,
568 remaining_count, offset, match_sel, no_match_sel, no_match_count);
569 break;
570 case ExpressionType::COMPARE_LESSTHANOREQUALTO:
571 remaining_count = GatherSwitch<NO_MATCH_SEL, LessThanEquals>(key_data[i], keys.data[i].type, this->pointers,
572 *current_sel, remaining_count, offset,
573 match_sel, no_match_sel, no_match_count);
574 break;
575 default:
576 throw NotImplementedException("Unimplemented comparison type for join");
577 }
578 if (remaining_count == 0) {
579 break;
580 }
581 current_sel = match_sel;
582 offset += GetTypeIdSize(keys.data[i].type);
583 }
584 return remaining_count;
585}
586
587idx_t ScanStructure::ResolvePredicates(DataChunk &keys, SelectionVector &match_sel, SelectionVector &no_match_sel) {
588 return ResolvePredicates<true>(keys, &match_sel, &no_match_sel);
589}
590
591idx_t ScanStructure::ResolvePredicates(DataChunk &keys, SelectionVector &match_sel) {
592 return ResolvePredicates<false>(keys, &match_sel, nullptr);
593}
594
595idx_t ScanStructure::ScanInnerJoin(DataChunk &keys, SelectionVector &result_vector) {
596 while (true) {
597 // resolve the predicates for this set of keys
598 idx_t result_count = ResolvePredicates(keys, result_vector);
599
600 // after doing all the comparisons set the found_match vector
601 if (found_match) {
602 for (idx_t i = 0; i < result_count; i++) {
603 auto idx = result_vector.get_index(i);
604 found_match[idx] = true;
605 }
606 }
607 if (result_count > 0) {
608 return result_count;
609 }
610 // no matches found: check the next set of pointers
611 AdvancePointers();
612 if (this->count == 0) {
613 return 0;
614 }
615 }
616}
617
618void ScanStructure::AdvancePointers(const SelectionVector &sel, idx_t sel_count) {
619 // now for all the pointers, we move on to the next set of pointers
620 idx_t new_count = 0;
621 auto ptrs = FlatVector::GetData<data_ptr_t>(this->pointers);
622 for (idx_t i = 0; i < sel_count; i++) {
623 auto idx = sel.get_index(i);
624 auto chain_pointer = (data_ptr_t *)(ptrs[idx] + ht.tuple_size);
625 ptrs[idx] = *chain_pointer;
626 if (ptrs[idx]) {
627 this->sel_vector.set_index(new_count++, idx);
628 }
629 }
630 this->count = new_count;
631}
632
633void ScanStructure::AdvancePointers() {
634 AdvancePointers(this->sel_vector, this->count);
635}
636
637template <class T>
638static void TemplatedGatherResult(Vector &result, uintptr_t *pointers, const SelectionVector &result_vector,
639 const SelectionVector &sel_vector, idx_t count, idx_t offset) {
640 auto rdata = FlatVector::GetData<T>(result);
641 auto &nullmask = FlatVector::Nullmask(result);
642 for (idx_t i = 0; i < count; i++) {
643 auto ridx = result_vector.get_index(i);
644 auto pidx = sel_vector.get_index(i);
645 auto hdata = (T *)(pointers[pidx] + offset);
646 if (IsNullValue<T>(*hdata)) {
647 nullmask[ridx] = true;
648 } else {
649 rdata[ridx] = *hdata;
650 }
651 }
652}
653
654void ScanStructure::GatherResult(Vector &result, const SelectionVector &result_vector,
655 const SelectionVector &sel_vector, idx_t count, idx_t &offset) {
656 result.vector_type = VectorType::FLAT_VECTOR;
657 auto ptrs = FlatVector::GetData<uintptr_t>(pointers);
658 switch (result.type) {
659 case TypeId::BOOL:
660 case TypeId::INT8:
661 TemplatedGatherResult<int8_t>(result, ptrs, result_vector, sel_vector, count, offset);
662 break;
663 case TypeId::INT16:
664 TemplatedGatherResult<int16_t>(result, ptrs, result_vector, sel_vector, count, offset);
665 break;
666 case TypeId::INT32:
667 TemplatedGatherResult<int32_t>(result, ptrs, result_vector, sel_vector, count, offset);
668 break;
669 case TypeId::INT64:
670 TemplatedGatherResult<int64_t>(result, ptrs, result_vector, sel_vector, count, offset);
671 break;
672 case TypeId::FLOAT:
673 TemplatedGatherResult<float>(result, ptrs, result_vector, sel_vector, count, offset);
674 break;
675 case TypeId::DOUBLE:
676 TemplatedGatherResult<double>(result, ptrs, result_vector, sel_vector, count, offset);
677 break;
678 case TypeId::VARCHAR:
679 TemplatedGatherResult<string_t>(result, ptrs, result_vector, sel_vector, count, offset);
680 break;
681 default:
682 throw NotImplementedException("Unimplemented type for ScanStructure::GatherResult");
683 }
684 offset += GetTypeIdSize(result.type);
685}
686
687void ScanStructure::GatherResult(Vector &result, const SelectionVector &sel_vector, idx_t count, idx_t &offset) {
688 GatherResult(result, FlatVector::IncrementalSelectionVector, sel_vector, count, offset);
689}
690
691void ScanStructure::NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &result) {
692 assert(result.column_count() == left.column_count() + ht.build_types.size());
693 if (this->count == 0) {
694 // no pointers left to chase
695 return;
696 }
697
698 SelectionVector result_vector(STANDARD_VECTOR_SIZE);
699
700 idx_t result_count = ScanInnerJoin(keys, result_vector);
701 if (result_count > 0) {
702 // matches were found
703 // construct the result
704 // on the LHS, we create a slice using the result vector
705 result.Slice(left, result_vector, result_count);
706
707 // on the RHS, we need to fetch the data from the hash table
708 idx_t offset = ht.condition_size;
709 for (idx_t i = 0; i < ht.build_types.size(); i++) {
710 auto &vector = result.data[left.column_count() + i];
711 assert(vector.type == ht.build_types[i]);
712 GatherResult(vector, result_vector, result_count, offset);
713 }
714 AdvancePointers();
715 }
716}
717
718void ScanStructure::ScanKeyMatches(DataChunk &keys) {
719 // the semi-join, anti-join and mark-join we handle a differently from the inner join
720 // since there can be at most STANDARD_VECTOR_SIZE results
721 // we handle the entire chunk in one call to Next().
722 // for every pointer, we keep chasing pointers and doing comparisons.
723 // this results in a boolean array indicating whether or not the tuple has a match
724 SelectionVector match_sel(STANDARD_VECTOR_SIZE), no_match_sel(STANDARD_VECTOR_SIZE);
725 while (this->count > 0) {
726 // resolve the predicates for the current set of pointers
727 idx_t match_count = ResolvePredicates(keys, match_sel, no_match_sel);
728 idx_t no_match_count = this->count - match_count;
729
730 // mark each of the matches as found
731 for (idx_t i = 0; i < match_count; i++) {
732 found_match[match_sel.get_index(i)] = true;
733 }
734 // continue searching for the ones where we did not find a match yet
735 AdvancePointers(no_match_sel, no_match_count);
736 }
737}
738
739template <bool MATCH> void ScanStructure::NextSemiOrAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &result) {
740 assert(left.column_count() == result.column_count());
741 assert(keys.size() == left.size());
742 // create the selection vector from the matches that were found
743 SelectionVector sel(STANDARD_VECTOR_SIZE);
744 idx_t result_count = 0;
745 for (idx_t i = 0; i < keys.size(); i++) {
746 if (found_match[i] == MATCH) {
747 // part of the result
748 sel.set_index(result_count++, i);
749 }
750 }
751 // construct the final result
752 if (result_count > 0) {
753 // we only return the columns on the left side
754 // reference the columns of the left side from the result
755 result.Slice(left, sel, result_count);
756 } else {
757 assert(result.size() == 0);
758 }
759}
760
761void ScanStructure::NextSemiJoin(DataChunk &keys, DataChunk &left, DataChunk &result) {
762 // first scan for key matches
763 ScanKeyMatches(keys);
764 // then construct the result from all tuples with a match
765 NextSemiOrAntiJoin<true>(keys, left, result);
766
767 finished = true;
768}
769
770void ScanStructure::NextAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &result) {
771 // first scan for key matches
772 ScanKeyMatches(keys);
773 // then construct the result from all tuples that did not find a match
774 NextSemiOrAntiJoin<false>(keys, left, result);
775
776 finished = true;
777}
778
779void ScanStructure::ConstructMarkJoinResult(DataChunk &join_keys, DataChunk &child, DataChunk &result) {
780 // for the initial set of columns we just reference the left side
781 result.SetCardinality(child);
782 for (idx_t i = 0; i < child.column_count(); i++) {
783 result.data[i].Reference(child.data[i]);
784 }
785 auto &mark_vector = result.data.back();
786 mark_vector.vector_type = VectorType::FLAT_VECTOR;
787 // first we set the NULL values from the join keys
788 // if there is any NULL in the keys, the result is NULL
789 auto bool_result = FlatVector::GetData<bool>(mark_vector);
790 auto &nullmask = FlatVector::Nullmask(mark_vector);
791 for (idx_t col_idx = 0; col_idx < join_keys.column_count(); col_idx++) {
792 if (ht.null_values_are_equal[col_idx]) {
793 continue;
794 }
795 VectorData jdata;
796 join_keys.data[col_idx].Orrify(join_keys.size(), jdata);
797 if (jdata.nullmask->any()) {
798 for (idx_t i = 0; i < join_keys.size(); i++) {
799 auto jidx = jdata.sel->get_index(i);
800 nullmask[i] = (*jdata.nullmask)[jidx];
801 }
802 }
803 }
804 // now set the remaining entries to either true or false based on whether a match was found
805 if (found_match) {
806 for (idx_t i = 0; i < child.size(); i++) {
807 bool_result[i] = found_match[i];
808 }
809 } else {
810 memset(bool_result, 0, sizeof(bool) * child.size());
811 }
812 // if the right side contains NULL values, the result of any FALSE becomes NULL
813 if (ht.has_null) {
814 for (idx_t i = 0; i < child.size(); i++) {
815 if (!bool_result[i]) {
816 nullmask[i] = true;
817 }
818 }
819 }
820}
821
822void ScanStructure::NextMarkJoin(DataChunk &keys, DataChunk &input, DataChunk &result) {
823 assert(result.column_count() == input.column_count() + 1);
824 assert(result.data.back().type == TypeId::BOOL);
825 // this method should only be called for a non-empty HT
826 assert(ht.count > 0);
827
828 ScanKeyMatches(keys);
829 if (ht.correlated_mark_join_info.correlated_types.size() == 0) {
830 ConstructMarkJoinResult(keys, input, result);
831 } else {
832 auto &info = ht.correlated_mark_join_info;
833 // there are correlated columns
834 // first we fetch the counts from the aggregate hashtable corresponding to these entries
835 assert(keys.column_count() == info.group_chunk.column_count() + 1);
836 info.group_chunk.SetCardinality(keys);
837 for (idx_t i = 0; i < info.group_chunk.column_count(); i++) {
838 info.group_chunk.data[i].Reference(keys.data[i]);
839 }
840 info.correlated_counts->FetchAggregates(info.group_chunk, info.result_chunk);
841
842 // for the initial set of columns we just reference the left side
843 result.SetCardinality(input);
844 for (idx_t i = 0; i < input.column_count(); i++) {
845 result.data[i].Reference(input.data[i]);
846 }
847 // create the result matching vector
848 auto &last_key = keys.data.back();
849 auto &result_vector = result.data.back();
850 // first set the nullmask based on whether or not there were NULL values in the join key
851 result_vector.vector_type = VectorType::FLAT_VECTOR;
852 auto bool_result = FlatVector::GetData<bool>(result_vector);
853 auto &nullmask = FlatVector::Nullmask(result_vector);
854 switch (last_key.vector_type) {
855 case VectorType::CONSTANT_VECTOR:
856 if (ConstantVector::IsNull(last_key)) {
857 nullmask.set();
858 }
859 break;
860 case VectorType::FLAT_VECTOR:
861 nullmask = FlatVector::Nullmask(last_key);
862 break;
863 default: {
864 VectorData kdata;
865 last_key.Orrify(keys.size(), kdata);
866 for (idx_t i = 0; i < input.size(); i++) {
867 auto kidx = kdata.sel->get_index(i);
868 ;
869 nullmask[i] = (*kdata.nullmask)[kidx];
870 }
871 break;
872 }
873 }
874
875 auto count_star = FlatVector::GetData<int64_t>(info.result_chunk.data[0]);
876 auto count = FlatVector::GetData<int64_t>(info.result_chunk.data[1]);
877 // set the entries to either true or false based on whether a match was found
878 for (idx_t i = 0; i < input.size(); i++) {
879 assert(count_star[i] >= count[i]);
880 bool_result[i] = found_match ? found_match[i] : false;
881 if (!bool_result[i] && count_star[i] > count[i]) {
882 // RHS has NULL value and result is false: set to null
883 nullmask[i] = true;
884 }
885 if (count_star[i] == 0) {
886 // count == 0, set nullmask to false (we know the result is false now)
887 nullmask[i] = false;
888 }
889 }
890 }
891 finished = true;
892}
893
894void ScanStructure::NextLeftJoin(DataChunk &keys, DataChunk &left, DataChunk &result) {
895 // a LEFT OUTER JOIN is identical to an INNER JOIN except all tuples that do
896 // not have a match must return at least one tuple (with the right side set
897 // to NULL in every column)
898 NextInnerJoin(keys, left, result);
899 if (result.size() == 0) {
900 // no entries left from the normal join
901 // fill in the result of the remaining left tuples
902 // together with NULL values on the right-hand side
903 idx_t remaining_count = 0;
904 SelectionVector sel(STANDARD_VECTOR_SIZE);
905 for (idx_t i = 0; i < left.size(); i++) {
906 if (!found_match[i]) {
907 sel.set_index(remaining_count++, i);
908 }
909 }
910 if (remaining_count > 0) {
911 // have remaining tuples
912 // slice the left side with tuples that did not find a match
913 result.Slice(left, sel, remaining_count);
914
915 // now set the right side to NULL
916 for (idx_t i = left.column_count(); i < result.column_count(); i++) {
917 result.data[i].vector_type = VectorType::CONSTANT_VECTOR;
918 ConstantVector::SetNull(result.data[i], true);
919 }
920 }
921 finished = true;
922 }
923}
924
925void ScanStructure::NextSingleJoin(DataChunk &keys, DataChunk &input, DataChunk &result) {
926 // single join
927 // this join is similar to the semi join except that
928 // (1) we actually return data from the RHS and
929 // (2) we return NULL for that data if there is no match
930 idx_t result_count = 0;
931 SelectionVector result_sel(STANDARD_VECTOR_SIZE);
932 SelectionVector match_sel(STANDARD_VECTOR_SIZE), no_match_sel(STANDARD_VECTOR_SIZE);
933 while (this->count > 0) {
934 // resolve the predicates for the current set of pointers
935 idx_t match_count = ResolvePredicates(keys, match_sel, no_match_sel);
936 idx_t no_match_count = this->count - match_count;
937
938 // mark each of the matches as found
939 for (idx_t i = 0; i < match_count; i++) {
940 // found a match for this index
941 auto index = match_sel.get_index(i);
942 found_match[index] = true;
943 result_sel.set_index(result_count++, index);
944 }
945 // continue searching for the ones where we did not find a match yet
946 AdvancePointers(no_match_sel, no_match_count);
947 }
948 // reference the columns of the left side from the result
949 assert(input.column_count() > 0);
950 for (idx_t i = 0; i < input.column_count(); i++) {
951 result.data[i].Reference(input.data[i]);
952 }
953 // now fetch the data from the RHS
954 idx_t offset = ht.condition_size;
955 for (idx_t i = 0; i < ht.build_types.size(); i++) {
956 auto &vector = result.data[input.column_count() + i];
957 // set NULL entries for every entry that was not found
958 auto &nullmask = FlatVector::Nullmask(vector);
959 nullmask.set();
960 for (idx_t j = 0; j < result_count; j++) {
961 nullmask[result_sel.get_index(j)] = false;
962 }
963 // for the remaining values we fetch the values
964 GatherResult(vector, result_sel, result_sel, result_count, offset);
965 }
966 result.SetCardinality(input.size());
967
968 // like the SEMI, ANTI and MARK join types, the SINGLE join only ever does one pass over the HT per input chunk
969 finished = true;
970}
971
972} // namespace duckdb
973