1 | #include <any> |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <Columns/ColumnConst.h> |
6 | #include <Columns/ColumnString.h> |
7 | #include <Columns/ColumnFixedString.h> |
8 | #include <Columns/ColumnNullable.h> |
9 | |
10 | #include <DataTypes/DataTypeNullable.h> |
11 | |
12 | #include <Interpreters/Join.h> |
13 | #include <Interpreters/join_common.h> |
14 | #include <Interpreters/AnalyzedJoin.h> |
15 | #include <Interpreters/joinDispatch.h> |
16 | #include <Interpreters/NullableUtils.h> |
17 | |
18 | #include <DataStreams/IBlockInputStream.h> |
19 | #include <DataStreams/materializeBlock.h> |
20 | |
21 | #include <Core/ColumnNumbers.h> |
22 | #include <Common/typeid_cast.h> |
23 | #include <Common/assert_cast.h> |
24 | #include <DataTypes/DataTypeLowCardinality.h> |
25 | |
26 | |
27 | namespace DB |
28 | { |
29 | |
30 | namespace ErrorCodes |
31 | { |
32 | extern const int UNSUPPORTED_JOIN_KEYS; |
33 | extern const int LOGICAL_ERROR; |
34 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
35 | extern const int TYPE_MISMATCH; |
36 | extern const int ILLEGAL_COLUMN; |
37 | } |
38 | |
39 | |
40 | static ColumnPtr filterWithBlanks(ColumnPtr src_column, const IColumn::Filter & filter, bool inverse_filter = false) |
41 | { |
42 | ColumnPtr column = src_column->convertToFullColumnIfConst(); |
43 | MutableColumnPtr mut_column = column->cloneEmpty(); |
44 | mut_column->reserve(column->size()); |
45 | |
46 | if (inverse_filter) |
47 | { |
48 | for (size_t row = 0; row < filter.size(); ++row) |
49 | { |
50 | if (filter[row]) |
51 | mut_column->insertDefault(); |
52 | else |
53 | mut_column->insertFrom(*column, row); |
54 | } |
55 | } |
56 | else |
57 | { |
58 | for (size_t row = 0; row < filter.size(); ++row) |
59 | { |
60 | if (filter[row]) |
61 | mut_column->insertFrom(*column, row); |
62 | else |
63 | mut_column->insertDefault(); |
64 | } |
65 | } |
66 | |
67 | return mut_column; |
68 | } |
69 | |
70 | static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable) |
71 | { |
72 | if (nullable) |
73 | { |
74 | JoinCommon::convertColumnToNullable(column); |
75 | } |
76 | else |
77 | { |
78 | /// We have to replace values masked by NULLs with defaults. |
79 | if (column.column) |
80 | if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(*column.column)) |
81 | column.column = filterWithBlanks(column.column, nullable_column->getNullMapColumn().getData(), true); |
82 | |
83 | JoinCommon::removeColumnNullability(column); |
84 | } |
85 | |
86 | return std::move(column); |
87 | } |
88 | |
89 | static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable, const ColumnUInt8 & negative_null_map) |
90 | { |
91 | if (nullable) |
92 | { |
93 | JoinCommon::convertColumnToNullable(column); |
94 | if (column.type->isNullable() && negative_null_map.size()) |
95 | { |
96 | MutableColumnPtr mutable_column = (*std::move(column.column)).mutate(); |
97 | assert_cast<ColumnNullable &>(*mutable_column).applyNegatedNullMap(negative_null_map); |
98 | column.column = std::move(mutable_column); |
99 | } |
100 | } |
101 | else |
102 | JoinCommon::removeColumnNullability(column); |
103 | |
104 | return std::move(column); |
105 | } |
106 | |
107 | static void changeNullability(MutableColumnPtr & mutable_column) |
108 | { |
109 | ColumnPtr column = std::move(mutable_column); |
110 | if (auto * nullable = checkAndGetColumn<ColumnNullable>(*column)) |
111 | column = nullable->getNestedColumnPtr(); |
112 | else |
113 | column = makeNullable(column); |
114 | |
115 | mutable_column = (*std::move(column)).mutate(); |
116 | } |
117 | |
118 | |
119 | Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_) |
120 | : table_join(table_join_) |
121 | , kind(table_join->kind()) |
122 | , strictness(table_join->strictness()) |
123 | , key_names_right(table_join->keyNamesRight()) |
124 | , nullable_right_side(table_join->forceNullableRight()) |
125 | , nullable_left_side(table_join->forceNullableLeft()) |
126 | , any_take_last_row(any_take_last_row_) |
127 | , asof_inequality(table_join->getAsofInequality()) |
128 | , data(std::make_shared<RightTableData>()) |
129 | , log(&Logger::get("Join" )) |
130 | { |
131 | setSampleBlock(right_sample_block); |
132 | } |
133 | |
134 | |
135 | Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) |
136 | { |
137 | size_t keys_size = key_columns.size(); |
138 | |
139 | if (keys_size == 0) |
140 | return Type::CROSS; |
141 | |
142 | bool all_fixed = true; |
143 | size_t keys_bytes = 0; |
144 | key_sizes.resize(keys_size); |
145 | for (size_t j = 0; j < keys_size; ++j) |
146 | { |
147 | if (!key_columns[j]->isFixedAndContiguous()) |
148 | { |
149 | all_fixed = false; |
150 | break; |
151 | } |
152 | key_sizes[j] = key_columns[j]->sizeOfValueIfFixed(); |
153 | keys_bytes += key_sizes[j]; |
154 | } |
155 | |
156 | /// If there is one numeric key that fits in 64 bits |
157 | if (keys_size == 1 && key_columns[0]->isNumeric()) |
158 | { |
159 | size_t size_of_field = key_columns[0]->sizeOfValueIfFixed(); |
160 | if (size_of_field == 1) |
161 | return Type::key8; |
162 | if (size_of_field == 2) |
163 | return Type::key16; |
164 | if (size_of_field == 4) |
165 | return Type::key32; |
166 | if (size_of_field == 8) |
167 | return Type::key64; |
168 | if (size_of_field == 16) |
169 | return Type::keys128; |
170 | throw Exception("Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16." , ErrorCodes::LOGICAL_ERROR); |
171 | } |
172 | |
173 | /// If the keys fit in N bits, we will use a hash table for N-bit-packed keys |
174 | if (all_fixed && keys_bytes <= 16) |
175 | return Type::keys128; |
176 | if (all_fixed && keys_bytes <= 32) |
177 | return Type::keys256; |
178 | |
179 | /// If there is single string key, use hash table of it's values. |
180 | if (keys_size == 1 |
181 | && (typeid_cast<const ColumnString *>(key_columns[0]) |
182 | || (isColumnConst(*key_columns[0]) && typeid_cast<const ColumnString *>(&assert_cast<const ColumnConst *>(key_columns[0])->getDataColumn())))) |
183 | return Type::key_string; |
184 | |
185 | if (keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0])) |
186 | return Type::key_fixed_string; |
187 | |
188 | /// Otherwise, will use set of cryptographic hashes of unambiguously serialized values. |
189 | return Type::hashed; |
190 | } |
191 | |
192 | static const IColumn * extractAsofColumn(const ColumnRawPtrs & key_columns) |
193 | { |
194 | return key_columns.back(); |
195 | } |
196 | |
197 | template<typename KeyGetter, bool is_asof_join> |
198 | static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) |
199 | { |
200 | if constexpr (is_asof_join) |
201 | { |
202 | auto key_column_copy = key_columns; |
203 | auto key_size_copy = key_sizes; |
204 | key_column_copy.pop_back(); |
205 | key_size_copy.pop_back(); |
206 | return KeyGetter(key_column_copy, key_size_copy, nullptr); |
207 | } |
208 | else |
209 | return KeyGetter(key_columns, key_sizes, nullptr); |
210 | } |
211 | |
212 | template <Join::Type type, typename Value, typename Mapped> |
213 | struct KeyGetterForTypeImpl; |
214 | |
215 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key8, Value, Mapped> |
216 | { |
217 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false>; |
218 | }; |
219 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key16, Value, Mapped> |
220 | { |
221 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false>; |
222 | }; |
223 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key32, Value, Mapped> |
224 | { |
225 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false>; |
226 | }; |
227 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key64, Value, Mapped> |
228 | { |
229 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false>; |
230 | }; |
231 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_string, Value, Mapped> |
232 | { |
233 | using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false>; |
234 | }; |
235 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_fixed_string, Value, Mapped> |
236 | { |
237 | using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false>; |
238 | }; |
239 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys128, Value, Mapped> |
240 | { |
241 | using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false>; |
242 | }; |
243 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys256, Value, Mapped> |
244 | { |
245 | using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false>; |
246 | }; |
247 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::hashed, Value, Mapped> |
248 | { |
249 | using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false>; |
250 | }; |
251 | |
252 | template <Join::Type type, typename Data> |
253 | struct KeyGetterForType |
254 | { |
255 | using Value = typename Data::value_type; |
256 | using Mapped_t = typename Data::mapped_type; |
257 | using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>; |
258 | using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type; |
259 | }; |
260 | |
261 | |
262 | void Join::init(Type type_) |
263 | { |
264 | data->type = type_; |
265 | |
266 | if (kind == ASTTableJoin::Kind::Cross) |
267 | return; |
268 | joinDispatchInit(kind, strictness, data->maps); |
269 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { map.create(data->type); }); |
270 | } |
271 | |
272 | size_t Join::getTotalRowCount() const |
273 | { |
274 | size_t res = 0; |
275 | |
276 | if (data->type == Type::CROSS) |
277 | { |
278 | for (const auto & block : data->blocks) |
279 | res += block.rows(); |
280 | } |
281 | else |
282 | { |
283 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { res += map.getTotalRowCount(data->type); }); |
284 | } |
285 | |
286 | return res; |
287 | } |
288 | |
289 | size_t Join::getTotalByteCount() const |
290 | { |
291 | size_t res = 0; |
292 | |
293 | if (data->type == Type::CROSS) |
294 | { |
295 | for (const auto & block : data->blocks) |
296 | res += block.bytes(); |
297 | } |
298 | else |
299 | { |
300 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { res += map.getTotalByteCountImpl(data->type); }); |
301 | res += data->pool.size(); |
302 | } |
303 | |
304 | return res; |
305 | } |
306 | |
307 | void Join::setSampleBlock(const Block & block) |
308 | { |
309 | /// You have to restore this lock if you call the fuction outside of ctor. |
310 | //std::unique_lock lock(rwlock); |
311 | |
312 | LOG_DEBUG(log, "setSampleBlock: " << block.dumpStructure()); |
313 | |
314 | if (!empty()) |
315 | return; |
316 | |
317 | ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add); |
318 | |
319 | initRightBlockStructure(); |
320 | initRequiredRightKeys(); |
321 | |
322 | JoinCommon::createMissedColumns(sample_block_with_columns_to_add); |
323 | if (nullable_right_side) |
324 | JoinCommon::convertColumnsToNullable(sample_block_with_columns_to_add); |
325 | |
326 | if (strictness == ASTTableJoin::Strictness::Asof) |
327 | { |
328 | if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner) |
329 | throw Exception("ASOF only supports LEFT and INNER as base joins" , ErrorCodes::NOT_IMPLEMENTED); |
330 | |
331 | const IColumn * asof_column = key_columns.back(); |
332 | size_t asof_size; |
333 | |
334 | asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size); |
335 | if (!asof_type) |
336 | { |
337 | std::string msg = "ASOF join not supported for type: " ; |
338 | msg += asof_column->getFamilyName(); |
339 | throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD); |
340 | } |
341 | |
342 | key_columns.pop_back(); |
343 | |
344 | if (key_columns.empty()) |
345 | throw Exception("ASOF join cannot be done without a joining column" , ErrorCodes::LOGICAL_ERROR); |
346 | |
347 | /// this is going to set up the appropriate hash table for the direct lookup part of the join |
348 | /// However, this does not depend on the size of the asof join key (as that goes into the BST) |
349 | /// Therefore, add it back in such that it can be extracted appropriately from the full stored |
350 | /// key_columns and key_sizes |
351 | init(chooseMethod(key_columns, key_sizes)); |
352 | key_sizes.push_back(asof_size); |
353 | } |
354 | else |
355 | { |
356 | /// Choose data structure to use for JOIN. |
357 | init(chooseMethod(key_columns, key_sizes)); |
358 | } |
359 | } |
360 | |
361 | namespace |
362 | { |
363 | /// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN. |
364 | template <typename Map, typename KeyGetter> |
365 | struct Inserter |
366 | { |
367 | static ALWAYS_INLINE void insertOne(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, |
368 | Arena & pool) |
369 | { |
370 | auto emplace_result = key_getter.emplaceKey(map, i, pool); |
371 | |
372 | if (emplace_result.isInserted() || join.anyTakeLastRow()) |
373 | new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); |
374 | } |
375 | |
376 | static ALWAYS_INLINE void insertAll(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) |
377 | { |
378 | auto emplace_result = key_getter.emplaceKey(map, i, pool); |
379 | |
380 | if (emplace_result.isInserted()) |
381 | new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); |
382 | else |
383 | { |
384 | /// The first element of the list is stored in the value of the hash table, the rest in the pool. |
385 | emplace_result.getMapped().insert({stored_block, i}, pool); |
386 | } |
387 | } |
388 | |
389 | static ALWAYS_INLINE void insertAsof(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, |
390 | const IColumn * asof_column) |
391 | { |
392 | auto emplace_result = key_getter.emplaceKey(map, i, pool); |
393 | typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); |
394 | |
395 | if (emplace_result.isInserted()) |
396 | time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); |
397 | time_series_map->insert(join.getAsofType(), asof_column, stored_block, i); |
398 | } |
399 | }; |
400 | |
401 | |
402 | template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map> |
403 | void NO_INLINE insertFromBlockImplTypeCase( |
404 | Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, |
405 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) |
406 | { |
407 | [[maybe_unused]] constexpr bool mapped_one = std::is_same_v<typename Map::mapped_type, JoinStuff::MappedOne> || |
408 | std::is_same_v<typename Map::mapped_type, JoinStuff::MappedOneFlagged>; |
409 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; |
410 | |
411 | const IColumn * asof_column [[maybe_unused]] = nullptr; |
412 | if constexpr (is_asof_join) |
413 | asof_column = extractAsofColumn(key_columns); |
414 | |
415 | auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes); |
416 | |
417 | for (size_t i = 0; i < rows; ++i) |
418 | { |
419 | if (has_null_map && (*null_map)[i]) |
420 | continue; |
421 | |
422 | if constexpr (is_asof_join) |
423 | Inserter<Map, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, asof_column); |
424 | else if constexpr (mapped_one) |
425 | Inserter<Map, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool); |
426 | else |
427 | Inserter<Map, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool); |
428 | } |
429 | } |
430 | |
431 | |
432 | template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> |
433 | void insertFromBlockImplType( |
434 | Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, |
435 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) |
436 | { |
437 | if (null_map) |
438 | insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); |
439 | else |
440 | insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); |
441 | } |
442 | |
443 | |
444 | template <ASTTableJoin::Strictness STRICTNESS, typename Maps> |
445 | void insertFromBlockImpl( |
446 | Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, |
447 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) |
448 | { |
449 | switch (type) |
450 | { |
451 | case Join::Type::EMPTY: break; |
452 | case Join::Type::CROSS: break; /// Do nothing. We have already saved block, and it is enough. |
453 | |
454 | #define M(TYPE) \ |
455 | case Join::Type::TYPE: \ |
456 | insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\ |
457 | join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \ |
458 | break; |
459 | APPLY_FOR_JOIN_VARIANTS(M) |
460 | #undef M |
461 | } |
462 | } |
463 | } |
464 | |
465 | void Join::initRequiredRightKeys() |
466 | { |
467 | const Names & left_keys = table_join->keyNamesLeft(); |
468 | const Names & right_keys = table_join->keyNamesRight(); |
469 | NameSet required_keys(table_join->requiredRightKeys().begin(), table_join->requiredRightKeys().end()); |
470 | |
471 | for (size_t i = 0; i < right_keys.size(); ++i) |
472 | { |
473 | const String & right_key_name = right_keys[i]; |
474 | |
475 | if (required_keys.count(right_key_name) && !required_right_keys.has(right_key_name)) |
476 | { |
477 | const auto & right_key = right_table_keys.getByName(right_key_name); |
478 | required_right_keys.insert(right_key); |
479 | required_right_keys_sources.push_back(left_keys[i]); |
480 | } |
481 | } |
482 | } |
483 | |
484 | void Join::initRightBlockStructure() |
485 | { |
486 | auto & saved_block_sample = data->sample_block; |
487 | |
488 | if (isRightOrFull(kind)) |
489 | { |
490 | /// Save keys for NonJoinedBlockInputStream |
491 | saved_block_sample = right_table_keys.cloneEmpty(); |
492 | } |
493 | else if (strictness == ASTTableJoin::Strictness::Asof) |
494 | { |
495 | /// Save ASOF key |
496 | saved_block_sample.insert(right_table_keys.safeGetByPosition(right_table_keys.columns() - 1)); |
497 | } |
498 | |
499 | /// Save non key columns |
500 | for (auto & column : sample_block_with_columns_to_add) |
501 | saved_block_sample.insert(column); |
502 | |
503 | if (nullable_right_side) |
504 | JoinCommon::convertColumnsToNullable(saved_block_sample, (isFull(kind) ? right_table_keys.columns() : 0)); |
505 | } |
506 | |
507 | Block Join::structureRightBlock(const Block & block) const |
508 | { |
509 | Block structured_block; |
510 | for (auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) |
511 | { |
512 | ColumnWithTypeAndName column = block.getByName(sample_column.name); |
513 | if (sample_column.column->isNullable()) |
514 | JoinCommon::convertColumnToNullable(column); |
515 | structured_block.insert(column); |
516 | } |
517 | |
518 | return structured_block; |
519 | } |
520 | |
521 | bool Join::addJoinedBlock(const Block & source_block) |
522 | { |
523 | if (empty()) |
524 | throw Exception("Logical error: Join was not initialized" , ErrorCodes::LOGICAL_ERROR); |
525 | |
526 | /// There's no optimization for right side const columns. Remove constness if any. |
527 | Block block = materializeBlock(source_block); |
528 | size_t rows = block.rows(); |
529 | |
530 | ColumnRawPtrs key_columns = JoinCommon::materializeColumnsInplace(block, key_names_right); |
531 | |
532 | /// We will insert to the map only keys, where all components are not NULL. |
533 | ConstNullMapPtr null_map{}; |
534 | ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); |
535 | |
536 | /// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream |
537 | UInt8 save_nullmap = 0; |
538 | if (isRightOrFull(kind) && null_map) |
539 | { |
540 | for (size_t i = 0; !save_nullmap && i < null_map->size(); ++i) |
541 | save_nullmap |= (*null_map)[i]; |
542 | } |
543 | |
544 | Block structured_block = structureRightBlock(block); |
545 | size_t total_rows = 0; |
546 | size_t total_bytes = 0; |
547 | |
548 | { |
549 | std::unique_lock lock(data->rwlock); |
550 | |
551 | data->blocks.emplace_back(std::move(structured_block)); |
552 | Block * stored_block = &data->blocks.back(); |
553 | |
554 | if (rows) |
555 | data->empty = false; |
556 | |
557 | if (kind != ASTTableJoin::Kind::Cross) |
558 | { |
559 | joinDispatch(kind, strictness, data->maps, [&](auto, auto strictness_, auto & map) |
560 | { |
561 | insertFromBlockImpl<strictness_>(*this, data->type, map, rows, key_columns, key_sizes, stored_block, null_map, data->pool); |
562 | }); |
563 | } |
564 | |
565 | if (save_nullmap) |
566 | data->blocks_nullmaps.emplace_back(stored_block, null_map_holder); |
567 | |
568 | /// TODO: Do not calculate them every time |
569 | total_rows = getTotalRowCount(); |
570 | total_bytes = getTotalByteCount(); |
571 | } |
572 | |
573 | return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN" , ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); |
574 | } |
575 | |
576 | |
577 | namespace |
578 | { |
579 | |
580 | class AddedColumns |
581 | { |
582 | public: |
583 | using TypeAndNames = std::vector<std::pair<decltype(ColumnWithTypeAndName::type), decltype(ColumnWithTypeAndName::name)>>; |
584 | |
585 | AddedColumns(const Block & sample_block_with_columns_to_add, |
586 | const Block & block_with_columns_to_add, |
587 | const Block & block, |
588 | const Block & saved_block_sample, |
589 | const ColumnsWithTypeAndName & , |
590 | const Join & join_, |
591 | const ColumnRawPtrs & key_columns_, |
592 | const Sizes & key_sizes_) |
593 | : join(join_) |
594 | , key_columns(key_columns_) |
595 | , key_sizes(key_sizes_) |
596 | , rows_to_add(block.rows()) |
597 | , need_filter(false) |
598 | { |
599 | size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); |
600 | |
601 | columns.reserve(num_columns_to_add); |
602 | type_name.reserve(num_columns_to_add); |
603 | right_indexes.reserve(num_columns_to_add); |
604 | |
605 | for (size_t i = 0; i < num_columns_to_add; ++i) |
606 | { |
607 | const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i); |
608 | |
609 | /// Don't insert column if it's in left block or not explicitly required. |
610 | if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) |
611 | addColumn(src_column); |
612 | } |
613 | |
614 | for (auto & : extras) |
615 | addColumn(extra); |
616 | |
617 | for (auto & tn : type_name) |
618 | right_indexes.push_back(saved_block_sample.getPositionByName(tn.second)); |
619 | } |
620 | |
621 | size_t size() const { return columns.size(); } |
622 | |
623 | ColumnWithTypeAndName moveColumn(size_t i) |
624 | { |
625 | return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].first, type_name[i].second); |
626 | } |
627 | |
628 | template <bool has_defaults> |
629 | void appendFromBlock(const Block & block, size_t row_num) |
630 | { |
631 | if constexpr (has_defaults) |
632 | applyLazyDefaults(); |
633 | |
634 | for (size_t j = 0; j < right_indexes.size(); ++j) |
635 | columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); |
636 | } |
637 | |
638 | void appendDefaultRow() |
639 | { |
640 | ++lazy_defaults_count; |
641 | } |
642 | |
643 | void applyLazyDefaults() |
644 | { |
645 | if (lazy_defaults_count) |
646 | { |
647 | for (size_t j = 0; j < right_indexes.size(); ++j) |
648 | columns[j]->insertManyDefaults(lazy_defaults_count); |
649 | lazy_defaults_count = 0; |
650 | } |
651 | } |
652 | |
653 | const Join & join; |
654 | const ColumnRawPtrs & key_columns; |
655 | const Sizes & key_sizes; |
656 | size_t rows_to_add; |
657 | std::unique_ptr<IColumn::Offsets> offsets_to_replicate; |
658 | bool need_filter; |
659 | |
660 | private: |
661 | TypeAndNames type_name; |
662 | MutableColumns columns; |
663 | std::vector<size_t> right_indexes; |
664 | size_t lazy_defaults_count = 0; |
665 | |
666 | void addColumn(const ColumnWithTypeAndName & src_column) |
667 | { |
668 | columns.push_back(src_column.column->cloneEmpty()); |
669 | columns.back()->reserve(src_column.column->size()); |
670 | type_name.emplace_back(src_column.type, src_column.name); |
671 | } |
672 | }; |
673 | |
674 | template <typename Map, bool add_missing> |
675 | void addFoundRowAll(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset) |
676 | { |
677 | if constexpr (add_missing) |
678 | added.applyLazyDefaults(); |
679 | |
680 | for (auto it = mapped.begin(); it.ok(); ++it) |
681 | { |
682 | added.appendFromBlock<false>(*it->block, it->row_num); |
683 | ++current_offset; |
684 | } |
685 | }; |
686 | |
687 | template <bool add_missing, bool need_offset> |
688 | void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]]) |
689 | { |
690 | if constexpr (add_missing) |
691 | { |
692 | added.appendDefaultRow(); |
693 | if constexpr (need_offset) |
694 | ++current_offset; |
695 | } |
696 | } |
697 | |
698 | template <bool need_filter> |
699 | void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]) |
700 | { |
701 | if constexpr (need_filter) |
702 | filter[pos] = 1; |
703 | } |
704 | |
705 | |
706 | /// Joins right table columns which indexes are present in right_indexes using specified map. |
707 | /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). |
708 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool has_null_map> |
709 | NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added_columns, const ConstNullMapPtr & null_map [[maybe_unused]]) |
710 | { |
711 | constexpr bool is_any_join = STRICTNESS == ASTTableJoin::Strictness::Any; |
712 | constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All; |
713 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; |
714 | constexpr bool is_semi_join = STRICTNESS == ASTTableJoin::Strictness::Semi; |
715 | constexpr bool is_anti_join = STRICTNESS == ASTTableJoin::Strictness::Anti; |
716 | constexpr bool left = KIND == ASTTableJoin::Kind::Left; |
717 | constexpr bool right = KIND == ASTTableJoin::Kind::Right; |
718 | constexpr bool full = KIND == ASTTableJoin::Kind::Full; |
719 | |
720 | constexpr bool add_missing = (left || full) && !is_semi_join; |
721 | constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right); |
722 | |
723 | size_t rows = added_columns.rows_to_add; |
724 | IColumn::Filter filter; |
725 | if constexpr (need_filter) |
726 | filter = IColumn::Filter(rows, 0); |
727 | |
728 | Arena pool; |
729 | |
730 | if constexpr (need_replication) |
731 | added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows); |
732 | |
733 | const IColumn * asof_column [[maybe_unused]] = nullptr; |
734 | if constexpr (is_asof_join) |
735 | asof_column = extractAsofColumn(added_columns.key_columns); |
736 | |
737 | auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(added_columns.key_columns, added_columns.key_sizes); |
738 | |
739 | IColumn::Offset current_offset = 0; |
740 | |
741 | for (size_t i = 0; i < rows; ++i) |
742 | { |
743 | if constexpr (has_null_map) |
744 | { |
745 | if ((*null_map)[i]) |
746 | { |
747 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); |
748 | |
749 | if constexpr (need_replication) |
750 | (*added_columns.offsets_to_replicate)[i] = current_offset; |
751 | continue; |
752 | } |
753 | } |
754 | |
755 | auto find_result = key_getter.findKey(map, i, pool); |
756 | |
757 | if (find_result.isFound()) |
758 | { |
759 | auto & mapped = find_result.getMapped(); |
760 | |
761 | if constexpr (is_asof_join) |
762 | { |
763 | const Join & join = added_columns.join; |
764 | if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i)) |
765 | { |
766 | setUsed<need_filter>(filter, i); |
767 | mapped.setUsed(); |
768 | added_columns.appendFromBlock<add_missing>(*found->block, found->row_num); |
769 | } |
770 | else |
771 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); |
772 | } |
773 | else if constexpr (is_all_join) |
774 | { |
775 | setUsed<need_filter>(filter, i); |
776 | mapped.setUsed(); |
777 | addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); |
778 | } |
779 | else if constexpr ((is_any_join || is_semi_join) && right) |
780 | { |
781 | /// Use first appered left key + it needs left columns replication |
782 | if (mapped.setUsedOnce()) |
783 | { |
784 | setUsed<need_filter>(filter, i); |
785 | addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); |
786 | } |
787 | } |
788 | else if constexpr (is_any_join && KIND == ASTTableJoin::Kind::Inner) |
789 | { |
790 | /// Use first appered left key only |
791 | if (mapped.setUsedOnce()) |
792 | { |
793 | setUsed<need_filter>(filter, i); |
794 | added_columns.appendFromBlock<add_missing>(*mapped.block, mapped.row_num); |
795 | } |
796 | } |
797 | else if constexpr (is_any_join && full) |
798 | { |
799 | /// TODO |
800 | } |
801 | else if constexpr (is_anti_join) |
802 | { |
803 | if constexpr (right) |
804 | mapped.setUsed(); |
805 | } |
806 | else /// ANY LEFT, SEMI LEFT, old ANY (RightAny) |
807 | { |
808 | setUsed<need_filter>(filter, i); |
809 | mapped.setUsed(); |
810 | added_columns.appendFromBlock<add_missing>(*mapped.block, mapped.row_num); |
811 | } |
812 | } |
813 | else |
814 | { |
815 | if constexpr (is_anti_join && left) |
816 | setUsed<need_filter>(filter, i); |
817 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); |
818 | } |
819 | |
820 | if constexpr (need_replication) |
821 | (*added_columns.offsets_to_replicate)[i] = current_offset; |
822 | } |
823 | |
824 | added_columns.applyLazyDefaults(); |
825 | return filter; |
826 | } |
827 | |
828 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> |
829 | IColumn::Filter joinRightColumnsSwitchNullability(const Map & map, AddedColumns & added_columns, const ConstNullMapPtr & null_map) |
830 | { |
831 | if (added_columns.need_filter) |
832 | { |
833 | if (null_map) |
834 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, true, true>(map, added_columns, null_map); |
835 | else |
836 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, true, false>(map, added_columns, nullptr); |
837 | } |
838 | else |
839 | { |
840 | if (null_map) |
841 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, false, true>(map, added_columns, null_map); |
842 | else |
843 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, false, false>(map, added_columns, nullptr); |
844 | } |
845 | } |
846 | |
847 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> |
848 | IColumn::Filter switchJoinRightColumns(const Maps & maps_, AddedColumns & added_columns, Join::Type type, const ConstNullMapPtr & null_map) |
849 | { |
850 | switch (type) |
851 | { |
852 | #define M(TYPE) \ |
853 | case Join::Type::TYPE: \ |
854 | return joinRightColumnsSwitchNullability<KIND, STRICTNESS,\ |
855 | typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\ |
856 | *maps_.TYPE, added_columns, null_map);\ |
857 | break; |
858 | APPLY_FOR_JOIN_VARIANTS(M) |
859 | #undef M |
860 | |
861 | default: |
862 | throw Exception("Unsupported JOIN keys. Type: " + toString(static_cast<UInt32>(type)), ErrorCodes::UNSUPPORTED_JOIN_KEYS); |
863 | } |
864 | } |
865 | |
866 | } /// nameless |
867 | |
868 | |
869 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> |
870 | void Join::joinBlockImpl( |
871 | Block & block, |
872 | const Names & key_names_left, |
873 | const Block & block_with_columns_to_add, |
874 | const Maps & maps_) const |
875 | { |
876 | constexpr bool is_any_join = STRICTNESS == ASTTableJoin::Strictness::Any; |
877 | constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All; |
878 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; |
879 | constexpr bool is_semi_join = STRICTNESS == ASTTableJoin::Strictness::Semi; |
880 | constexpr bool is_anti_join = STRICTNESS == ASTTableJoin::Strictness::Anti; |
881 | |
882 | constexpr bool left = KIND == ASTTableJoin::Kind::Left; |
883 | constexpr bool right = KIND == ASTTableJoin::Kind::Right; |
884 | constexpr bool inner = KIND == ASTTableJoin::Kind::Inner; |
885 | constexpr bool full = KIND == ASTTableJoin::Kind::Full; |
886 | |
887 | constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right); |
888 | constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left)); |
889 | |
890 | /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them. |
891 | Columns materialized_keys = JoinCommon::materializeColumns(block, key_names_left); |
892 | ColumnRawPtrs key_columns = JoinCommon::getRawPointers(materialized_keys); |
893 | |
894 | /// Keys with NULL value in any column won't join to anything. |
895 | ConstNullMapPtr null_map{}; |
896 | ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); |
897 | |
898 | size_t existing_columns = block.columns(); |
899 | |
900 | /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. |
901 | * Because if they are constants, then in the "not joined" rows, they may have different values |
902 | * - default values, which can differ from the values of these constants. |
903 | */ |
904 | if constexpr (right || full) |
905 | { |
906 | materializeBlockInplace(block); |
907 | |
908 | if (nullable_left_side) |
909 | JoinCommon::convertColumnsToNullable(block); |
910 | } |
911 | |
912 | /** For LEFT/INNER JOIN, the saved blocks do not contain keys. |
913 | * For FULL/RIGHT JOIN, the saved blocks contain keys; |
914 | * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. |
915 | * For ASOF, the last column is used as the ASOF column |
916 | */ |
917 | ColumnsWithTypeAndName ; |
918 | if constexpr (is_asof_join) |
919 | extras.push_back(right_table_keys.getByName(key_names_right.back())); |
920 | |
921 | AddedColumns added_columns(sample_block_with_columns_to_add, block_with_columns_to_add, block, savedBlockSample(), |
922 | extras, *this, key_columns, key_sizes); |
923 | bool has_required_right_keys = (required_right_keys.columns() != 0); |
924 | added_columns.need_filter = need_filter || has_required_right_keys; |
925 | |
926 | IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, null_map); |
927 | |
928 | for (size_t i = 0; i < added_columns.size(); ++i) |
929 | block.insert(added_columns.moveColumn(i)); |
930 | |
931 | std::vector<size_t> right_keys_to_replicate [[maybe_unused]]; |
932 | |
933 | if constexpr (need_filter) |
934 | { |
935 | /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. |
936 | for (size_t i = 0; i < existing_columns; ++i) |
937 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1); |
938 | |
939 | /// Add join key columns from right block if needed. |
940 | for (size_t i = 0; i < required_right_keys.columns(); ++i) |
941 | { |
942 | const auto & right_key = required_right_keys.getByPosition(i); |
943 | const auto & left_name = required_right_keys_sources[i]; |
944 | |
945 | const auto & col = block.getByName(left_name); |
946 | bool is_nullable = nullable_right_side || right_key.type->isNullable(); |
947 | block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable)); |
948 | } |
949 | } |
950 | else if (has_required_right_keys) |
951 | { |
952 | /// Some trash to represent IColumn::Filter as ColumnUInt8 needed for ColumnNullable::applyNullMap() |
953 | auto null_map_filter_ptr = ColumnUInt8::create(); |
954 | ColumnUInt8 & null_map_filter = assert_cast<ColumnUInt8 &>(*null_map_filter_ptr); |
955 | null_map_filter.getData().swap(row_filter); |
956 | const IColumn::Filter & filter = null_map_filter.getData(); |
957 | |
958 | /// Add join key columns from right block if needed. |
959 | for (size_t i = 0; i < required_right_keys.columns(); ++i) |
960 | { |
961 | const auto & right_key = required_right_keys.getByPosition(i); |
962 | const auto & left_name = required_right_keys_sources[i]; |
963 | |
964 | const auto & col = block.getByName(left_name); |
965 | bool is_nullable = nullable_right_side || right_key.type->isNullable(); |
966 | |
967 | ColumnPtr thin_column = filterWithBlanks(col.column, filter); |
968 | block.insert(correctNullability({thin_column, col.type, right_key.name}, is_nullable, null_map_filter)); |
969 | |
970 | if constexpr (need_replication) |
971 | right_keys_to_replicate.push_back(block.getPositionByName(right_key.name)); |
972 | } |
973 | } |
974 | |
975 | if constexpr (need_replication) |
976 | { |
977 | std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate; |
978 | |
979 | /// If ALL ... JOIN - we replicate all the columns except the new ones. |
980 | for (size_t i = 0; i < existing_columns; ++i) |
981 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate); |
982 | |
983 | /// Replicate additional right keys |
984 | for (size_t pos : right_keys_to_replicate) |
985 | block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate); |
986 | } |
987 | } |
988 | |
989 | |
990 | void Join::joinBlockImplCross(Block & block) const |
991 | { |
992 | /// Add new columns to the block. |
993 | size_t num_existing_columns = block.columns(); |
994 | size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); |
995 | |
996 | size_t rows_left = block.rows(); |
997 | |
998 | ColumnRawPtrs src_left_columns(num_existing_columns); |
999 | MutableColumns dst_columns(num_existing_columns + num_columns_to_add); |
1000 | |
1001 | for (size_t i = 0; i < num_existing_columns; ++i) |
1002 | { |
1003 | src_left_columns[i] = block.getByPosition(i).column.get(); |
1004 | dst_columns[i] = src_left_columns[i]->cloneEmpty(); |
1005 | } |
1006 | |
1007 | for (size_t i = 0; i < num_columns_to_add; ++i) |
1008 | { |
1009 | const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); |
1010 | dst_columns[num_existing_columns + i] = src_column.column->cloneEmpty(); |
1011 | block.insert(src_column); |
1012 | } |
1013 | |
1014 | /// NOTE It would be better to use `reserve`, as well as `replicate` methods to duplicate the values of the left block. |
1015 | |
1016 | for (size_t i = 0; i < rows_left; ++i) |
1017 | { |
1018 | for (const Block & block_right : data->blocks) |
1019 | { |
1020 | size_t rows_right = block_right.rows(); |
1021 | |
1022 | for (size_t col_num = 0; col_num < num_existing_columns; ++col_num) |
1023 | for (size_t j = 0; j < rows_right; ++j) |
1024 | dst_columns[col_num]->insertFrom(*src_left_columns[col_num], i); |
1025 | |
1026 | for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num) |
1027 | { |
1028 | const IColumn * column_right = block_right.getByPosition(col_num).column.get(); |
1029 | |
1030 | for (size_t j = 0; j < rows_right; ++j) |
1031 | dst_columns[num_existing_columns + col_num]->insertFrom(*column_right, j); |
1032 | } |
1033 | } |
1034 | } |
1035 | |
1036 | block = block.cloneWithColumns(std::move(dst_columns)); |
1037 | } |
1038 | |
1039 | static void checkTypeOfKey(const Block & block_left, const Block & block_right) |
1040 | { |
1041 | auto & [c1, left_type_origin, left_name] = block_left.safeGetByPosition(0); |
1042 | auto & [c2, right_type_origin, right_name] = block_right.safeGetByPosition(0); |
1043 | auto left_type = removeNullable(left_type_origin); |
1044 | auto right_type = removeNullable(right_type_origin); |
1045 | |
1046 | if (!left_type->equals(*right_type)) |
1047 | throw Exception("Type mismatch of columns to joinGet by: " |
1048 | + left_name + " " + left_type->getName() + " at left, " |
1049 | + right_name + " " + right_type->getName() + " at right" , |
1050 | ErrorCodes::TYPE_MISMATCH); |
1051 | } |
1052 | |
1053 | |
1054 | DataTypePtr Join::joinGetReturnType(const String & column_name) const |
1055 | { |
1056 | std::shared_lock lock(data->rwlock); |
1057 | |
1058 | if (!sample_block_with_columns_to_add.has(column_name)) |
1059 | throw Exception("StorageJoin doesn't contain column " + column_name, ErrorCodes::LOGICAL_ERROR); |
1060 | return sample_block_with_columns_to_add.getByName(column_name).type; |
1061 | } |
1062 | |
1063 | |
1064 | template <typename Maps> |
1065 | void Join::joinGetImpl(Block & block, const String & column_name, const Maps & maps_) const |
1066 | { |
1067 | joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::RightAny>( |
1068 | block, {block.getByPosition(0).name}, {sample_block_with_columns_to_add.getByName(column_name)}, maps_); |
1069 | } |
1070 | |
1071 | |
1072 | // TODO: support composite key |
1073 | // TODO: return multiple columns as named tuple |
1074 | // TODO: return array of values when strictness == ASTTableJoin::Strictness::All |
1075 | void Join::joinGet(Block & block, const String & column_name) const |
1076 | { |
1077 | std::shared_lock lock(data->rwlock); |
1078 | |
1079 | if (key_names_right.size() != 1) |
1080 | throw Exception("joinGet only supports StorageJoin containing exactly one key" , ErrorCodes::LOGICAL_ERROR); |
1081 | |
1082 | checkTypeOfKey(block, right_table_keys); |
1083 | |
1084 | if ((strictness == ASTTableJoin::Strictness::Any || strictness == ASTTableJoin::Strictness::RightAny) && |
1085 | kind == ASTTableJoin::Kind::Left) |
1086 | { |
1087 | joinGetImpl(block, column_name, std::get<MapsOne>(data->maps)); |
1088 | } |
1089 | else |
1090 | throw Exception("joinGet only supports StorageJoin of type Left Any" , ErrorCodes::LOGICAL_ERROR); |
1091 | } |
1092 | |
1093 | |
1094 | void Join::joinBlock(Block & block) |
1095 | { |
1096 | std::shared_lock lock(data->rwlock); |
1097 | |
1098 | const Names & key_names_left = table_join->keyNamesLeft(); |
1099 | JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right); |
1100 | |
1101 | if (joinDispatch(kind, strictness, data->maps, [&](auto kind_, auto strictness_, auto & map) |
1102 | { |
1103 | joinBlockImpl<kind_, strictness_>(block, key_names_left, sample_block_with_columns_to_add, map); |
1104 | })) |
1105 | { |
1106 | /// Joined |
1107 | } |
1108 | else if (kind == ASTTableJoin::Kind::Cross) |
1109 | joinBlockImplCross(block); |
1110 | else |
1111 | throw Exception("Logical error: unknown combination of JOIN" , ErrorCodes::LOGICAL_ERROR); |
1112 | } |
1113 | |
1114 | |
1115 | void Join::joinTotals(Block & block) const |
1116 | { |
1117 | JoinCommon::joinTotals(totals, sample_block_with_columns_to_add, key_names_right, block); |
1118 | } |
1119 | |
1120 | |
1121 | template <typename Mapped> |
1122 | struct AdderNonJoined |
1123 | { |
1124 | static void add(const Mapped & mapped, size_t & rows_added, MutableColumns & columns_right) |
1125 | { |
1126 | constexpr bool mapped_asof = std::is_same_v<Mapped, JoinStuff::MappedAsof>; |
1127 | [[maybe_unused]] constexpr bool mapped_one = std::is_same_v<Mapped, JoinStuff::MappedOne> || std::is_same_v<Mapped, JoinStuff::MappedOneFlagged>; |
1128 | |
1129 | if constexpr (mapped_asof) |
1130 | { |
1131 | /// Do nothing |
1132 | } |
1133 | else if constexpr (mapped_one) |
1134 | { |
1135 | for (size_t j = 0; j < columns_right.size(); ++j) |
1136 | { |
1137 | const auto & mapped_column = mapped.block->getByPosition(j).column; |
1138 | columns_right[j]->insertFrom(*mapped_column, mapped.row_num); |
1139 | } |
1140 | |
1141 | ++rows_added; |
1142 | } |
1143 | else |
1144 | { |
1145 | for (auto it = mapped.begin(); it.ok(); ++it) |
1146 | { |
1147 | for (size_t j = 0; j < columns_right.size(); ++j) |
1148 | { |
1149 | const auto & mapped_column = it->block->getByPosition(j).column; |
1150 | columns_right[j]->insertFrom(*mapped_column, it->row_num); |
1151 | } |
1152 | |
1153 | ++rows_added; |
1154 | } |
1155 | } |
1156 | } |
1157 | }; |
1158 | |
1159 | |
1160 | /// Stream from not joined earlier rows of the right table. |
1161 | class NonJoinedBlockInputStream : public IBlockInputStream |
1162 | { |
1163 | public: |
1164 | NonJoinedBlockInputStream(const Join & parent_, const Block & result_sample_block_, UInt64 max_block_size_) |
1165 | : parent(parent_) |
1166 | , max_block_size(max_block_size_) |
1167 | , result_sample_block(materializeBlock(result_sample_block_)) |
1168 | { |
1169 | bool remap_keys = parent.table_join->hasUsing(); |
1170 | std::unordered_map<size_t, size_t> left_to_right_key_remap; |
1171 | |
1172 | for (size_t i = 0; i < parent.table_join->keyNamesLeft().size(); ++i) |
1173 | { |
1174 | const String & left_key_name = parent.table_join->keyNamesLeft()[i]; |
1175 | const String & right_key_name = parent.table_join->keyNamesRight()[i]; |
1176 | |
1177 | size_t left_key_pos = result_sample_block.getPositionByName(left_key_name); |
1178 | size_t right_key_pos = parent.savedBlockSample().getPositionByName(right_key_name); |
1179 | |
1180 | if (remap_keys && !parent.required_right_keys.has(right_key_name)) |
1181 | left_to_right_key_remap[left_key_pos] = right_key_pos; |
1182 | } |
1183 | |
1184 | /// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys |
1185 | size_t left_columns_count = result_sample_block.columns() - |
1186 | parent.sample_block_with_columns_to_add.columns() - parent.required_right_keys.columns(); |
1187 | |
1188 | for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos) |
1189 | { |
1190 | /// We need right 'x' for 'RIGHT JOIN ... USING(x)'. |
1191 | if (left_to_right_key_remap.count(left_pos)) |
1192 | { |
1193 | size_t right_key_pos = left_to_right_key_remap[left_pos]; |
1194 | setRightIndex(right_key_pos, left_pos); |
1195 | } |
1196 | else |
1197 | column_indices_left.emplace_back(left_pos); |
1198 | } |
1199 | |
1200 | const auto & saved_block_sample = parent.savedBlockSample(); |
1201 | for (size_t right_pos = 0; right_pos < saved_block_sample.columns(); ++right_pos) |
1202 | { |
1203 | const String & name = saved_block_sample.getByPosition(right_pos).name; |
1204 | if (!result_sample_block.has(name)) |
1205 | continue; |
1206 | |
1207 | size_t result_position = result_sample_block.getPositionByName(name); |
1208 | |
1209 | /// Don't remap left keys twice. We need only qualified right keys here |
1210 | if (result_position < left_columns_count) |
1211 | continue; |
1212 | |
1213 | setRightIndex(right_pos, result_position); |
1214 | } |
1215 | |
1216 | if (column_indices_left.size() + column_indices_right.size() + same_result_keys.size() != result_sample_block.columns()) |
1217 | throw Exception("Error in columns mapping in RIGHT|FULL JOIN. Left: " + toString(column_indices_left.size()) + |
1218 | ", right: " + toString(column_indices_right.size()) + |
1219 | ", same: " + toString(same_result_keys.size()) + |
1220 | ", result: " + toString(result_sample_block.columns()), |
1221 | ErrorCodes::LOGICAL_ERROR); |
1222 | } |
1223 | |
1224 | String getName() const override { return "NonJoined" ; } |
1225 | |
1226 | Block () const override { return result_sample_block; } |
1227 | |
1228 | |
1229 | protected: |
1230 | Block readImpl() override |
1231 | { |
1232 | if (parent.data->blocks.empty()) |
1233 | return Block(); |
1234 | return createBlock(); |
1235 | } |
1236 | |
1237 | private: |
1238 | const Join & parent; |
1239 | UInt64 max_block_size; |
1240 | |
1241 | Block result_sample_block; |
1242 | /// Indices of columns in result_sample_block that should be generated |
1243 | std::vector<size_t> column_indices_left; |
1244 | /// Indices of columns that come from the right-side table: right_pos -> result_pos |
1245 | std::unordered_map<size_t, size_t> column_indices_right; |
1246 | /// |
1247 | std::unordered_map<size_t, size_t> same_result_keys; |
1248 | /// Which right columns (saved in parent) need nullability change before placing them in result block |
1249 | std::vector<size_t> right_nullability_changes; |
1250 | |
1251 | std::any position; |
1252 | std::optional<Join::BlockNullmapList::const_iterator> nulls_position; |
1253 | |
1254 | void setRightIndex(size_t right_pos, size_t result_position) |
1255 | { |
1256 | if (!column_indices_right.count(right_pos)) |
1257 | { |
1258 | column_indices_right[right_pos] = result_position; |
1259 | |
1260 | if (hasNullabilityChange(right_pos, result_position)) |
1261 | right_nullability_changes.push_back(right_pos); |
1262 | } |
1263 | else |
1264 | same_result_keys[result_position] = column_indices_right[right_pos]; |
1265 | } |
1266 | |
1267 | bool hasNullabilityChange(size_t right_pos, size_t result_pos) const |
1268 | { |
1269 | const auto & src = parent.savedBlockSample().getByPosition(right_pos).column; |
1270 | const auto & dst = result_sample_block.getByPosition(result_pos).column; |
1271 | return src->isNullable() != dst->isNullable(); |
1272 | } |
1273 | |
1274 | Block createBlock() |
1275 | { |
1276 | MutableColumns columns_right = parent.savedBlockSample().cloneEmptyColumns(); |
1277 | |
1278 | size_t rows_added = 0; |
1279 | |
1280 | auto fill_callback = [&](auto, auto strictness, auto & map) |
1281 | { |
1282 | rows_added = fillColumnsFromMap<strictness>(map, columns_right); |
1283 | }; |
1284 | |
1285 | if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, fill_callback)) |
1286 | throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)" , ErrorCodes::LOGICAL_ERROR); |
1287 | |
1288 | fillNullsFromBlocks(columns_right, rows_added); |
1289 | |
1290 | if (!rows_added) |
1291 | return {}; |
1292 | |
1293 | for (size_t pos : right_nullability_changes) |
1294 | changeNullability(columns_right[pos]); |
1295 | |
1296 | Block res = result_sample_block.cloneEmpty(); |
1297 | |
1298 | /// @note it's possible to make ColumnConst here and materialize it later |
1299 | for (size_t pos : column_indices_left) |
1300 | res.getByPosition(pos).column = res.getByPosition(pos).column->cloneResized(rows_added); |
1301 | |
1302 | for (auto & pr : column_indices_right) |
1303 | { |
1304 | auto & right_column = columns_right[pr.first]; |
1305 | auto & result_column = res.getByPosition(pr.second).column; |
1306 | #ifndef NDEBUG |
1307 | if (result_column->getName() != right_column->getName()) |
1308 | throw Exception("Wrong columns assign in RIGHT|FULL JOIN: " + result_column->getName() + |
1309 | " " + right_column->getName(), ErrorCodes::LOGICAL_ERROR); |
1310 | #endif |
1311 | result_column = std::move(right_column); |
1312 | } |
1313 | |
1314 | for (auto & pr : same_result_keys) |
1315 | { |
1316 | auto & src_column = res.getByPosition(pr.second).column; |
1317 | auto & dst_column = res.getByPosition(pr.first).column; |
1318 | |
1319 | if (src_column->isNullable() && !dst_column->isNullable()) |
1320 | { |
1321 | auto * nullable = checkAndGetColumn<ColumnNullable>(*src_column); |
1322 | dst_column = nullable->getNestedColumnPtr(); |
1323 | } |
1324 | else if (!src_column->isNullable() && dst_column->isNullable()) |
1325 | dst_column = makeNullable(src_column); |
1326 | else |
1327 | dst_column = src_column; |
1328 | } |
1329 | |
1330 | return res; |
1331 | } |
1332 | |
1333 | template <ASTTableJoin::Strictness STRICTNESS, typename Maps> |
1334 | size_t fillColumnsFromMap(const Maps & maps, MutableColumns & columns_keys_and_right) |
1335 | { |
1336 | switch (parent.data->type) |
1337 | { |
1338 | #define M(TYPE) \ |
1339 | case Join::Type::TYPE: \ |
1340 | return fillColumns<STRICTNESS>(*maps.TYPE, columns_keys_and_right); |
1341 | APPLY_FOR_JOIN_VARIANTS(M) |
1342 | #undef M |
1343 | default: |
1344 | throw Exception("Unsupported JOIN keys. Type: " + toString(static_cast<UInt32>(parent.data->type)), |
1345 | ErrorCodes::UNSUPPORTED_JOIN_KEYS); |
1346 | } |
1347 | |
1348 | __builtin_unreachable(); |
1349 | } |
1350 | |
1351 | template <ASTTableJoin::Strictness STRICTNESS, typename Map> |
1352 | size_t fillColumns(const Map & map, MutableColumns & columns_keys_and_right) |
1353 | { |
1354 | using Mapped = typename Map::mapped_type; |
1355 | using Iterator = typename Map::const_iterator; |
1356 | |
1357 | size_t rows_added = 0; |
1358 | |
1359 | if (!position.has_value()) |
1360 | position = std::make_any<Iterator>(map.begin()); |
1361 | |
1362 | Iterator & it = std::any_cast<Iterator &>(position); |
1363 | auto end = map.end(); |
1364 | |
1365 | for (; it != end; ++it) |
1366 | { |
1367 | const Mapped & mapped = it->getMapped(); |
1368 | |
1369 | if (mapped.getUsed()) |
1370 | continue; |
1371 | |
1372 | AdderNonJoined<Mapped>::add(mapped, rows_added, columns_keys_and_right); |
1373 | |
1374 | if (rows_added >= max_block_size) |
1375 | { |
1376 | ++it; |
1377 | break; |
1378 | } |
1379 | } |
1380 | |
1381 | return rows_added; |
1382 | } |
1383 | |
1384 | void fillNullsFromBlocks(MutableColumns & columns_keys_and_right, size_t & rows_added) |
1385 | { |
1386 | if (!nulls_position.has_value()) |
1387 | nulls_position = parent.data->blocks_nullmaps.begin(); |
1388 | |
1389 | auto end = parent.data->blocks_nullmaps.end(); |
1390 | |
1391 | for (auto & it = *nulls_position; it != end && rows_added < max_block_size; ++it) |
1392 | { |
1393 | const Block * block = it->first; |
1394 | const NullMap & nullmap = assert_cast<const ColumnUInt8 &>(*it->second).getData(); |
1395 | |
1396 | for (size_t row = 0; row < nullmap.size(); ++row) |
1397 | { |
1398 | if (nullmap[row]) |
1399 | { |
1400 | for (size_t col = 0; col < columns_keys_and_right.size(); ++col) |
1401 | columns_keys_and_right[col]->insertFrom(*block->getByPosition(col).column, row); |
1402 | ++rows_added; |
1403 | } |
1404 | } |
1405 | } |
1406 | } |
1407 | }; |
1408 | |
1409 | |
1410 | BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const |
1411 | { |
1412 | if (table_join->strictness() == ASTTableJoin::Strictness::Asof || |
1413 | table_join->strictness() == ASTTableJoin::Strictness::Semi) |
1414 | return {}; |
1415 | |
1416 | if (isRightOrFull(table_join->kind())) |
1417 | return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size); |
1418 | return {}; |
1419 | } |
1420 | |
1421 | |
1422 | bool Join::hasStreamWithNonJoinedRows() const |
1423 | { |
1424 | if (table_join->strictness() == ASTTableJoin::Strictness::Asof || |
1425 | table_join->strictness() == ASTTableJoin::Strictness::Semi) |
1426 | return false; |
1427 | |
1428 | return isRightOrFull(table_join->kind()); |
1429 | } |
1430 | |
1431 | } |
1432 | |