| 1 | #include <any> |
| 2 | |
| 3 | #include <common/logger_useful.h> |
| 4 | |
| 5 | #include <Columns/ColumnConst.h> |
| 6 | #include <Columns/ColumnString.h> |
| 7 | #include <Columns/ColumnFixedString.h> |
| 8 | #include <Columns/ColumnNullable.h> |
| 9 | |
| 10 | #include <DataTypes/DataTypeNullable.h> |
| 11 | |
| 12 | #include <Interpreters/Join.h> |
| 13 | #include <Interpreters/join_common.h> |
| 14 | #include <Interpreters/AnalyzedJoin.h> |
| 15 | #include <Interpreters/joinDispatch.h> |
| 16 | #include <Interpreters/NullableUtils.h> |
| 17 | |
| 18 | #include <DataStreams/IBlockInputStream.h> |
| 19 | #include <DataStreams/materializeBlock.h> |
| 20 | |
| 21 | #include <Core/ColumnNumbers.h> |
| 22 | #include <Common/typeid_cast.h> |
| 23 | #include <Common/assert_cast.h> |
| 24 | #include <DataTypes/DataTypeLowCardinality.h> |
| 25 | |
| 26 | |
| 27 | namespace DB |
| 28 | { |
| 29 | |
| 30 | namespace ErrorCodes |
| 31 | { |
| 32 | extern const int UNSUPPORTED_JOIN_KEYS; |
| 33 | extern const int LOGICAL_ERROR; |
| 34 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
| 35 | extern const int TYPE_MISMATCH; |
| 36 | extern const int ILLEGAL_COLUMN; |
| 37 | } |
| 38 | |
| 39 | |
| 40 | static ColumnPtr filterWithBlanks(ColumnPtr src_column, const IColumn::Filter & filter, bool inverse_filter = false) |
| 41 | { |
| 42 | ColumnPtr column = src_column->convertToFullColumnIfConst(); |
| 43 | MutableColumnPtr mut_column = column->cloneEmpty(); |
| 44 | mut_column->reserve(column->size()); |
| 45 | |
| 46 | if (inverse_filter) |
| 47 | { |
| 48 | for (size_t row = 0; row < filter.size(); ++row) |
| 49 | { |
| 50 | if (filter[row]) |
| 51 | mut_column->insertDefault(); |
| 52 | else |
| 53 | mut_column->insertFrom(*column, row); |
| 54 | } |
| 55 | } |
| 56 | else |
| 57 | { |
| 58 | for (size_t row = 0; row < filter.size(); ++row) |
| 59 | { |
| 60 | if (filter[row]) |
| 61 | mut_column->insertFrom(*column, row); |
| 62 | else |
| 63 | mut_column->insertDefault(); |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | return mut_column; |
| 68 | } |
| 69 | |
| 70 | static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable) |
| 71 | { |
| 72 | if (nullable) |
| 73 | { |
| 74 | JoinCommon::convertColumnToNullable(column); |
| 75 | } |
| 76 | else |
| 77 | { |
| 78 | /// We have to replace values masked by NULLs with defaults. |
| 79 | if (column.column) |
| 80 | if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(*column.column)) |
| 81 | column.column = filterWithBlanks(column.column, nullable_column->getNullMapColumn().getData(), true); |
| 82 | |
| 83 | JoinCommon::removeColumnNullability(column); |
| 84 | } |
| 85 | |
| 86 | return std::move(column); |
| 87 | } |
| 88 | |
| 89 | static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable, const ColumnUInt8 & negative_null_map) |
| 90 | { |
| 91 | if (nullable) |
| 92 | { |
| 93 | JoinCommon::convertColumnToNullable(column); |
| 94 | if (column.type->isNullable() && negative_null_map.size()) |
| 95 | { |
| 96 | MutableColumnPtr mutable_column = (*std::move(column.column)).mutate(); |
| 97 | assert_cast<ColumnNullable &>(*mutable_column).applyNegatedNullMap(negative_null_map); |
| 98 | column.column = std::move(mutable_column); |
| 99 | } |
| 100 | } |
| 101 | else |
| 102 | JoinCommon::removeColumnNullability(column); |
| 103 | |
| 104 | return std::move(column); |
| 105 | } |
| 106 | |
| 107 | static void changeNullability(MutableColumnPtr & mutable_column) |
| 108 | { |
| 109 | ColumnPtr column = std::move(mutable_column); |
| 110 | if (auto * nullable = checkAndGetColumn<ColumnNullable>(*column)) |
| 111 | column = nullable->getNestedColumnPtr(); |
| 112 | else |
| 113 | column = makeNullable(column); |
| 114 | |
| 115 | mutable_column = (*std::move(column)).mutate(); |
| 116 | } |
| 117 | |
| 118 | |
| 119 | Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_) |
| 120 | : table_join(table_join_) |
| 121 | , kind(table_join->kind()) |
| 122 | , strictness(table_join->strictness()) |
| 123 | , key_names_right(table_join->keyNamesRight()) |
| 124 | , nullable_right_side(table_join->forceNullableRight()) |
| 125 | , nullable_left_side(table_join->forceNullableLeft()) |
| 126 | , any_take_last_row(any_take_last_row_) |
| 127 | , asof_inequality(table_join->getAsofInequality()) |
| 128 | , data(std::make_shared<RightTableData>()) |
| 129 | , log(&Logger::get("Join" )) |
| 130 | { |
| 131 | setSampleBlock(right_sample_block); |
| 132 | } |
| 133 | |
| 134 | |
| 135 | Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) |
| 136 | { |
| 137 | size_t keys_size = key_columns.size(); |
| 138 | |
| 139 | if (keys_size == 0) |
| 140 | return Type::CROSS; |
| 141 | |
| 142 | bool all_fixed = true; |
| 143 | size_t keys_bytes = 0; |
| 144 | key_sizes.resize(keys_size); |
| 145 | for (size_t j = 0; j < keys_size; ++j) |
| 146 | { |
| 147 | if (!key_columns[j]->isFixedAndContiguous()) |
| 148 | { |
| 149 | all_fixed = false; |
| 150 | break; |
| 151 | } |
| 152 | key_sizes[j] = key_columns[j]->sizeOfValueIfFixed(); |
| 153 | keys_bytes += key_sizes[j]; |
| 154 | } |
| 155 | |
| 156 | /// If there is one numeric key that fits in 64 bits |
| 157 | if (keys_size == 1 && key_columns[0]->isNumeric()) |
| 158 | { |
| 159 | size_t size_of_field = key_columns[0]->sizeOfValueIfFixed(); |
| 160 | if (size_of_field == 1) |
| 161 | return Type::key8; |
| 162 | if (size_of_field == 2) |
| 163 | return Type::key16; |
| 164 | if (size_of_field == 4) |
| 165 | return Type::key32; |
| 166 | if (size_of_field == 8) |
| 167 | return Type::key64; |
| 168 | if (size_of_field == 16) |
| 169 | return Type::keys128; |
| 170 | throw Exception("Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16." , ErrorCodes::LOGICAL_ERROR); |
| 171 | } |
| 172 | |
| 173 | /// If the keys fit in N bits, we will use a hash table for N-bit-packed keys |
| 174 | if (all_fixed && keys_bytes <= 16) |
| 175 | return Type::keys128; |
| 176 | if (all_fixed && keys_bytes <= 32) |
| 177 | return Type::keys256; |
| 178 | |
| 179 | /// If there is single string key, use hash table of it's values. |
| 180 | if (keys_size == 1 |
| 181 | && (typeid_cast<const ColumnString *>(key_columns[0]) |
| 182 | || (isColumnConst(*key_columns[0]) && typeid_cast<const ColumnString *>(&assert_cast<const ColumnConst *>(key_columns[0])->getDataColumn())))) |
| 183 | return Type::key_string; |
| 184 | |
| 185 | if (keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0])) |
| 186 | return Type::key_fixed_string; |
| 187 | |
| 188 | /// Otherwise, will use set of cryptographic hashes of unambiguously serialized values. |
| 189 | return Type::hashed; |
| 190 | } |
| 191 | |
| 192 | static const IColumn * extractAsofColumn(const ColumnRawPtrs & key_columns) |
| 193 | { |
| 194 | return key_columns.back(); |
| 195 | } |
| 196 | |
| 197 | template<typename KeyGetter, bool is_asof_join> |
| 198 | static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) |
| 199 | { |
| 200 | if constexpr (is_asof_join) |
| 201 | { |
| 202 | auto key_column_copy = key_columns; |
| 203 | auto key_size_copy = key_sizes; |
| 204 | key_column_copy.pop_back(); |
| 205 | key_size_copy.pop_back(); |
| 206 | return KeyGetter(key_column_copy, key_size_copy, nullptr); |
| 207 | } |
| 208 | else |
| 209 | return KeyGetter(key_columns, key_sizes, nullptr); |
| 210 | } |
| 211 | |
| 212 | template <Join::Type type, typename Value, typename Mapped> |
| 213 | struct KeyGetterForTypeImpl; |
| 214 | |
| 215 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key8, Value, Mapped> |
| 216 | { |
| 217 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false>; |
| 218 | }; |
| 219 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key16, Value, Mapped> |
| 220 | { |
| 221 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false>; |
| 222 | }; |
| 223 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key32, Value, Mapped> |
| 224 | { |
| 225 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false>; |
| 226 | }; |
| 227 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key64, Value, Mapped> |
| 228 | { |
| 229 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false>; |
| 230 | }; |
| 231 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_string, Value, Mapped> |
| 232 | { |
| 233 | using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false>; |
| 234 | }; |
| 235 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_fixed_string, Value, Mapped> |
| 236 | { |
| 237 | using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false>; |
| 238 | }; |
| 239 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys128, Value, Mapped> |
| 240 | { |
| 241 | using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false>; |
| 242 | }; |
| 243 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys256, Value, Mapped> |
| 244 | { |
| 245 | using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false>; |
| 246 | }; |
| 247 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::hashed, Value, Mapped> |
| 248 | { |
| 249 | using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false>; |
| 250 | }; |
| 251 | |
| 252 | template <Join::Type type, typename Data> |
| 253 | struct KeyGetterForType |
| 254 | { |
| 255 | using Value = typename Data::value_type; |
| 256 | using Mapped_t = typename Data::mapped_type; |
| 257 | using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>; |
| 258 | using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type; |
| 259 | }; |
| 260 | |
| 261 | |
| 262 | void Join::init(Type type_) |
| 263 | { |
| 264 | data->type = type_; |
| 265 | |
| 266 | if (kind == ASTTableJoin::Kind::Cross) |
| 267 | return; |
| 268 | joinDispatchInit(kind, strictness, data->maps); |
| 269 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { map.create(data->type); }); |
| 270 | } |
| 271 | |
| 272 | size_t Join::getTotalRowCount() const |
| 273 | { |
| 274 | size_t res = 0; |
| 275 | |
| 276 | if (data->type == Type::CROSS) |
| 277 | { |
| 278 | for (const auto & block : data->blocks) |
| 279 | res += block.rows(); |
| 280 | } |
| 281 | else |
| 282 | { |
| 283 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { res += map.getTotalRowCount(data->type); }); |
| 284 | } |
| 285 | |
| 286 | return res; |
| 287 | } |
| 288 | |
| 289 | size_t Join::getTotalByteCount() const |
| 290 | { |
| 291 | size_t res = 0; |
| 292 | |
| 293 | if (data->type == Type::CROSS) |
| 294 | { |
| 295 | for (const auto & block : data->blocks) |
| 296 | res += block.bytes(); |
| 297 | } |
| 298 | else |
| 299 | { |
| 300 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { res += map.getTotalByteCountImpl(data->type); }); |
| 301 | res += data->pool.size(); |
| 302 | } |
| 303 | |
| 304 | return res; |
| 305 | } |
| 306 | |
| 307 | void Join::setSampleBlock(const Block & block) |
| 308 | { |
| 309 | /// You have to restore this lock if you call the fuction outside of ctor. |
| 310 | //std::unique_lock lock(rwlock); |
| 311 | |
| 312 | LOG_DEBUG(log, "setSampleBlock: " << block.dumpStructure()); |
| 313 | |
| 314 | if (!empty()) |
| 315 | return; |
| 316 | |
| 317 | ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add); |
| 318 | |
| 319 | initRightBlockStructure(); |
| 320 | initRequiredRightKeys(); |
| 321 | |
| 322 | JoinCommon::createMissedColumns(sample_block_with_columns_to_add); |
| 323 | if (nullable_right_side) |
| 324 | JoinCommon::convertColumnsToNullable(sample_block_with_columns_to_add); |
| 325 | |
| 326 | if (strictness == ASTTableJoin::Strictness::Asof) |
| 327 | { |
| 328 | if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner) |
| 329 | throw Exception("ASOF only supports LEFT and INNER as base joins" , ErrorCodes::NOT_IMPLEMENTED); |
| 330 | |
| 331 | const IColumn * asof_column = key_columns.back(); |
| 332 | size_t asof_size; |
| 333 | |
| 334 | asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size); |
| 335 | if (!asof_type) |
| 336 | { |
| 337 | std::string msg = "ASOF join not supported for type: " ; |
| 338 | msg += asof_column->getFamilyName(); |
| 339 | throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD); |
| 340 | } |
| 341 | |
| 342 | key_columns.pop_back(); |
| 343 | |
| 344 | if (key_columns.empty()) |
| 345 | throw Exception("ASOF join cannot be done without a joining column" , ErrorCodes::LOGICAL_ERROR); |
| 346 | |
| 347 | /// this is going to set up the appropriate hash table for the direct lookup part of the join |
| 348 | /// However, this does not depend on the size of the asof join key (as that goes into the BST) |
| 349 | /// Therefore, add it back in such that it can be extracted appropriately from the full stored |
| 350 | /// key_columns and key_sizes |
| 351 | init(chooseMethod(key_columns, key_sizes)); |
| 352 | key_sizes.push_back(asof_size); |
| 353 | } |
| 354 | else |
| 355 | { |
| 356 | /// Choose data structure to use for JOIN. |
| 357 | init(chooseMethod(key_columns, key_sizes)); |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | namespace |
| 362 | { |
| 363 | /// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN. |
| 364 | template <typename Map, typename KeyGetter> |
| 365 | struct Inserter |
| 366 | { |
| 367 | static ALWAYS_INLINE void insertOne(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, |
| 368 | Arena & pool) |
| 369 | { |
| 370 | auto emplace_result = key_getter.emplaceKey(map, i, pool); |
| 371 | |
| 372 | if (emplace_result.isInserted() || join.anyTakeLastRow()) |
| 373 | new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); |
| 374 | } |
| 375 | |
| 376 | static ALWAYS_INLINE void insertAll(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) |
| 377 | { |
| 378 | auto emplace_result = key_getter.emplaceKey(map, i, pool); |
| 379 | |
| 380 | if (emplace_result.isInserted()) |
| 381 | new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); |
| 382 | else |
| 383 | { |
| 384 | /// The first element of the list is stored in the value of the hash table, the rest in the pool. |
| 385 | emplace_result.getMapped().insert({stored_block, i}, pool); |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | static ALWAYS_INLINE void insertAsof(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, |
| 390 | const IColumn * asof_column) |
| 391 | { |
| 392 | auto emplace_result = key_getter.emplaceKey(map, i, pool); |
| 393 | typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); |
| 394 | |
| 395 | if (emplace_result.isInserted()) |
| 396 | time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); |
| 397 | time_series_map->insert(join.getAsofType(), asof_column, stored_block, i); |
| 398 | } |
| 399 | }; |
| 400 | |
| 401 | |
| 402 | template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map> |
| 403 | void NO_INLINE insertFromBlockImplTypeCase( |
| 404 | Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, |
| 405 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) |
| 406 | { |
| 407 | [[maybe_unused]] constexpr bool mapped_one = std::is_same_v<typename Map::mapped_type, JoinStuff::MappedOne> || |
| 408 | std::is_same_v<typename Map::mapped_type, JoinStuff::MappedOneFlagged>; |
| 409 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; |
| 410 | |
| 411 | const IColumn * asof_column [[maybe_unused]] = nullptr; |
| 412 | if constexpr (is_asof_join) |
| 413 | asof_column = extractAsofColumn(key_columns); |
| 414 | |
| 415 | auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes); |
| 416 | |
| 417 | for (size_t i = 0; i < rows; ++i) |
| 418 | { |
| 419 | if (has_null_map && (*null_map)[i]) |
| 420 | continue; |
| 421 | |
| 422 | if constexpr (is_asof_join) |
| 423 | Inserter<Map, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, asof_column); |
| 424 | else if constexpr (mapped_one) |
| 425 | Inserter<Map, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool); |
| 426 | else |
| 427 | Inserter<Map, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool); |
| 428 | } |
| 429 | } |
| 430 | |
| 431 | |
| 432 | template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> |
| 433 | void insertFromBlockImplType( |
| 434 | Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, |
| 435 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) |
| 436 | { |
| 437 | if (null_map) |
| 438 | insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); |
| 439 | else |
| 440 | insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); |
| 441 | } |
| 442 | |
| 443 | |
| 444 | template <ASTTableJoin::Strictness STRICTNESS, typename Maps> |
| 445 | void insertFromBlockImpl( |
| 446 | Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, |
| 447 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) |
| 448 | { |
| 449 | switch (type) |
| 450 | { |
| 451 | case Join::Type::EMPTY: break; |
| 452 | case Join::Type::CROSS: break; /// Do nothing. We have already saved block, and it is enough. |
| 453 | |
| 454 | #define M(TYPE) \ |
| 455 | case Join::Type::TYPE: \ |
| 456 | insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\ |
| 457 | join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \ |
| 458 | break; |
| 459 | APPLY_FOR_JOIN_VARIANTS(M) |
| 460 | #undef M |
| 461 | } |
| 462 | } |
| 463 | } |
| 464 | |
| 465 | void Join::initRequiredRightKeys() |
| 466 | { |
| 467 | const Names & left_keys = table_join->keyNamesLeft(); |
| 468 | const Names & right_keys = table_join->keyNamesRight(); |
| 469 | NameSet required_keys(table_join->requiredRightKeys().begin(), table_join->requiredRightKeys().end()); |
| 470 | |
| 471 | for (size_t i = 0; i < right_keys.size(); ++i) |
| 472 | { |
| 473 | const String & right_key_name = right_keys[i]; |
| 474 | |
| 475 | if (required_keys.count(right_key_name) && !required_right_keys.has(right_key_name)) |
| 476 | { |
| 477 | const auto & right_key = right_table_keys.getByName(right_key_name); |
| 478 | required_right_keys.insert(right_key); |
| 479 | required_right_keys_sources.push_back(left_keys[i]); |
| 480 | } |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | void Join::initRightBlockStructure() |
| 485 | { |
| 486 | auto & saved_block_sample = data->sample_block; |
| 487 | |
| 488 | if (isRightOrFull(kind)) |
| 489 | { |
| 490 | /// Save keys for NonJoinedBlockInputStream |
| 491 | saved_block_sample = right_table_keys.cloneEmpty(); |
| 492 | } |
| 493 | else if (strictness == ASTTableJoin::Strictness::Asof) |
| 494 | { |
| 495 | /// Save ASOF key |
| 496 | saved_block_sample.insert(right_table_keys.safeGetByPosition(right_table_keys.columns() - 1)); |
| 497 | } |
| 498 | |
| 499 | /// Save non key columns |
| 500 | for (auto & column : sample_block_with_columns_to_add) |
| 501 | saved_block_sample.insert(column); |
| 502 | |
| 503 | if (nullable_right_side) |
| 504 | JoinCommon::convertColumnsToNullable(saved_block_sample, (isFull(kind) ? right_table_keys.columns() : 0)); |
| 505 | } |
| 506 | |
| 507 | Block Join::structureRightBlock(const Block & block) const |
| 508 | { |
| 509 | Block structured_block; |
| 510 | for (auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) |
| 511 | { |
| 512 | ColumnWithTypeAndName column = block.getByName(sample_column.name); |
| 513 | if (sample_column.column->isNullable()) |
| 514 | JoinCommon::convertColumnToNullable(column); |
| 515 | structured_block.insert(column); |
| 516 | } |
| 517 | |
| 518 | return structured_block; |
| 519 | } |
| 520 | |
| 521 | bool Join::addJoinedBlock(const Block & source_block) |
| 522 | { |
| 523 | if (empty()) |
| 524 | throw Exception("Logical error: Join was not initialized" , ErrorCodes::LOGICAL_ERROR); |
| 525 | |
| 526 | /// There's no optimization for right side const columns. Remove constness if any. |
| 527 | Block block = materializeBlock(source_block); |
| 528 | size_t rows = block.rows(); |
| 529 | |
| 530 | ColumnRawPtrs key_columns = JoinCommon::materializeColumnsInplace(block, key_names_right); |
| 531 | |
| 532 | /// We will insert to the map only keys, where all components are not NULL. |
| 533 | ConstNullMapPtr null_map{}; |
| 534 | ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); |
| 535 | |
| 536 | /// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream |
| 537 | UInt8 save_nullmap = 0; |
| 538 | if (isRightOrFull(kind) && null_map) |
| 539 | { |
| 540 | for (size_t i = 0; !save_nullmap && i < null_map->size(); ++i) |
| 541 | save_nullmap |= (*null_map)[i]; |
| 542 | } |
| 543 | |
| 544 | Block structured_block = structureRightBlock(block); |
| 545 | size_t total_rows = 0; |
| 546 | size_t total_bytes = 0; |
| 547 | |
| 548 | { |
| 549 | std::unique_lock lock(data->rwlock); |
| 550 | |
| 551 | data->blocks.emplace_back(std::move(structured_block)); |
| 552 | Block * stored_block = &data->blocks.back(); |
| 553 | |
| 554 | if (rows) |
| 555 | data->empty = false; |
| 556 | |
| 557 | if (kind != ASTTableJoin::Kind::Cross) |
| 558 | { |
| 559 | joinDispatch(kind, strictness, data->maps, [&](auto, auto strictness_, auto & map) |
| 560 | { |
| 561 | insertFromBlockImpl<strictness_>(*this, data->type, map, rows, key_columns, key_sizes, stored_block, null_map, data->pool); |
| 562 | }); |
| 563 | } |
| 564 | |
| 565 | if (save_nullmap) |
| 566 | data->blocks_nullmaps.emplace_back(stored_block, null_map_holder); |
| 567 | |
| 568 | /// TODO: Do not calculate them every time |
| 569 | total_rows = getTotalRowCount(); |
| 570 | total_bytes = getTotalByteCount(); |
| 571 | } |
| 572 | |
| 573 | return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN" , ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); |
| 574 | } |
| 575 | |
| 576 | |
| 577 | namespace |
| 578 | { |
| 579 | |
| 580 | class AddedColumns |
| 581 | { |
| 582 | public: |
| 583 | using TypeAndNames = std::vector<std::pair<decltype(ColumnWithTypeAndName::type), decltype(ColumnWithTypeAndName::name)>>; |
| 584 | |
| 585 | AddedColumns(const Block & sample_block_with_columns_to_add, |
| 586 | const Block & block_with_columns_to_add, |
| 587 | const Block & block, |
| 588 | const Block & saved_block_sample, |
| 589 | const ColumnsWithTypeAndName & , |
| 590 | const Join & join_, |
| 591 | const ColumnRawPtrs & key_columns_, |
| 592 | const Sizes & key_sizes_) |
| 593 | : join(join_) |
| 594 | , key_columns(key_columns_) |
| 595 | , key_sizes(key_sizes_) |
| 596 | , rows_to_add(block.rows()) |
| 597 | , need_filter(false) |
| 598 | { |
| 599 | size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); |
| 600 | |
| 601 | columns.reserve(num_columns_to_add); |
| 602 | type_name.reserve(num_columns_to_add); |
| 603 | right_indexes.reserve(num_columns_to_add); |
| 604 | |
| 605 | for (size_t i = 0; i < num_columns_to_add; ++i) |
| 606 | { |
| 607 | const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i); |
| 608 | |
| 609 | /// Don't insert column if it's in left block or not explicitly required. |
| 610 | if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) |
| 611 | addColumn(src_column); |
| 612 | } |
| 613 | |
| 614 | for (auto & : extras) |
| 615 | addColumn(extra); |
| 616 | |
| 617 | for (auto & tn : type_name) |
| 618 | right_indexes.push_back(saved_block_sample.getPositionByName(tn.second)); |
| 619 | } |
| 620 | |
| 621 | size_t size() const { return columns.size(); } |
| 622 | |
| 623 | ColumnWithTypeAndName moveColumn(size_t i) |
| 624 | { |
| 625 | return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].first, type_name[i].second); |
| 626 | } |
| 627 | |
| 628 | template <bool has_defaults> |
| 629 | void appendFromBlock(const Block & block, size_t row_num) |
| 630 | { |
| 631 | if constexpr (has_defaults) |
| 632 | applyLazyDefaults(); |
| 633 | |
| 634 | for (size_t j = 0; j < right_indexes.size(); ++j) |
| 635 | columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); |
| 636 | } |
| 637 | |
| 638 | void appendDefaultRow() |
| 639 | { |
| 640 | ++lazy_defaults_count; |
| 641 | } |
| 642 | |
| 643 | void applyLazyDefaults() |
| 644 | { |
| 645 | if (lazy_defaults_count) |
| 646 | { |
| 647 | for (size_t j = 0; j < right_indexes.size(); ++j) |
| 648 | columns[j]->insertManyDefaults(lazy_defaults_count); |
| 649 | lazy_defaults_count = 0; |
| 650 | } |
| 651 | } |
| 652 | |
| 653 | const Join & join; |
| 654 | const ColumnRawPtrs & key_columns; |
| 655 | const Sizes & key_sizes; |
| 656 | size_t rows_to_add; |
| 657 | std::unique_ptr<IColumn::Offsets> offsets_to_replicate; |
| 658 | bool need_filter; |
| 659 | |
| 660 | private: |
| 661 | TypeAndNames type_name; |
| 662 | MutableColumns columns; |
| 663 | std::vector<size_t> right_indexes; |
| 664 | size_t lazy_defaults_count = 0; |
| 665 | |
| 666 | void addColumn(const ColumnWithTypeAndName & src_column) |
| 667 | { |
| 668 | columns.push_back(src_column.column->cloneEmpty()); |
| 669 | columns.back()->reserve(src_column.column->size()); |
| 670 | type_name.emplace_back(src_column.type, src_column.name); |
| 671 | } |
| 672 | }; |
| 673 | |
| 674 | template <typename Map, bool add_missing> |
| 675 | void addFoundRowAll(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset) |
| 676 | { |
| 677 | if constexpr (add_missing) |
| 678 | added.applyLazyDefaults(); |
| 679 | |
| 680 | for (auto it = mapped.begin(); it.ok(); ++it) |
| 681 | { |
| 682 | added.appendFromBlock<false>(*it->block, it->row_num); |
| 683 | ++current_offset; |
| 684 | } |
| 685 | }; |
| 686 | |
| 687 | template <bool add_missing, bool need_offset> |
| 688 | void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]]) |
| 689 | { |
| 690 | if constexpr (add_missing) |
| 691 | { |
| 692 | added.appendDefaultRow(); |
| 693 | if constexpr (need_offset) |
| 694 | ++current_offset; |
| 695 | } |
| 696 | } |
| 697 | |
| 698 | template <bool need_filter> |
| 699 | void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]) |
| 700 | { |
| 701 | if constexpr (need_filter) |
| 702 | filter[pos] = 1; |
| 703 | } |
| 704 | |
| 705 | |
| 706 | /// Joins right table columns which indexes are present in right_indexes using specified map. |
| 707 | /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). |
| 708 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool has_null_map> |
| 709 | NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added_columns, const ConstNullMapPtr & null_map [[maybe_unused]]) |
| 710 | { |
| 711 | constexpr bool is_any_join = STRICTNESS == ASTTableJoin::Strictness::Any; |
| 712 | constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All; |
| 713 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; |
| 714 | constexpr bool is_semi_join = STRICTNESS == ASTTableJoin::Strictness::Semi; |
| 715 | constexpr bool is_anti_join = STRICTNESS == ASTTableJoin::Strictness::Anti; |
| 716 | constexpr bool left = KIND == ASTTableJoin::Kind::Left; |
| 717 | constexpr bool right = KIND == ASTTableJoin::Kind::Right; |
| 718 | constexpr bool full = KIND == ASTTableJoin::Kind::Full; |
| 719 | |
| 720 | constexpr bool add_missing = (left || full) && !is_semi_join; |
| 721 | constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right); |
| 722 | |
| 723 | size_t rows = added_columns.rows_to_add; |
| 724 | IColumn::Filter filter; |
| 725 | if constexpr (need_filter) |
| 726 | filter = IColumn::Filter(rows, 0); |
| 727 | |
| 728 | Arena pool; |
| 729 | |
| 730 | if constexpr (need_replication) |
| 731 | added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows); |
| 732 | |
| 733 | const IColumn * asof_column [[maybe_unused]] = nullptr; |
| 734 | if constexpr (is_asof_join) |
| 735 | asof_column = extractAsofColumn(added_columns.key_columns); |
| 736 | |
| 737 | auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(added_columns.key_columns, added_columns.key_sizes); |
| 738 | |
| 739 | IColumn::Offset current_offset = 0; |
| 740 | |
| 741 | for (size_t i = 0; i < rows; ++i) |
| 742 | { |
| 743 | if constexpr (has_null_map) |
| 744 | { |
| 745 | if ((*null_map)[i]) |
| 746 | { |
| 747 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); |
| 748 | |
| 749 | if constexpr (need_replication) |
| 750 | (*added_columns.offsets_to_replicate)[i] = current_offset; |
| 751 | continue; |
| 752 | } |
| 753 | } |
| 754 | |
| 755 | auto find_result = key_getter.findKey(map, i, pool); |
| 756 | |
| 757 | if (find_result.isFound()) |
| 758 | { |
| 759 | auto & mapped = find_result.getMapped(); |
| 760 | |
| 761 | if constexpr (is_asof_join) |
| 762 | { |
| 763 | const Join & join = added_columns.join; |
| 764 | if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i)) |
| 765 | { |
| 766 | setUsed<need_filter>(filter, i); |
| 767 | mapped.setUsed(); |
| 768 | added_columns.appendFromBlock<add_missing>(*found->block, found->row_num); |
| 769 | } |
| 770 | else |
| 771 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); |
| 772 | } |
| 773 | else if constexpr (is_all_join) |
| 774 | { |
| 775 | setUsed<need_filter>(filter, i); |
| 776 | mapped.setUsed(); |
| 777 | addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); |
| 778 | } |
| 779 | else if constexpr ((is_any_join || is_semi_join) && right) |
| 780 | { |
| 781 | /// Use first appered left key + it needs left columns replication |
| 782 | if (mapped.setUsedOnce()) |
| 783 | { |
| 784 | setUsed<need_filter>(filter, i); |
| 785 | addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); |
| 786 | } |
| 787 | } |
| 788 | else if constexpr (is_any_join && KIND == ASTTableJoin::Kind::Inner) |
| 789 | { |
| 790 | /// Use first appered left key only |
| 791 | if (mapped.setUsedOnce()) |
| 792 | { |
| 793 | setUsed<need_filter>(filter, i); |
| 794 | added_columns.appendFromBlock<add_missing>(*mapped.block, mapped.row_num); |
| 795 | } |
| 796 | } |
| 797 | else if constexpr (is_any_join && full) |
| 798 | { |
| 799 | /// TODO |
| 800 | } |
| 801 | else if constexpr (is_anti_join) |
| 802 | { |
| 803 | if constexpr (right) |
| 804 | mapped.setUsed(); |
| 805 | } |
| 806 | else /// ANY LEFT, SEMI LEFT, old ANY (RightAny) |
| 807 | { |
| 808 | setUsed<need_filter>(filter, i); |
| 809 | mapped.setUsed(); |
| 810 | added_columns.appendFromBlock<add_missing>(*mapped.block, mapped.row_num); |
| 811 | } |
| 812 | } |
| 813 | else |
| 814 | { |
| 815 | if constexpr (is_anti_join && left) |
| 816 | setUsed<need_filter>(filter, i); |
| 817 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); |
| 818 | } |
| 819 | |
| 820 | if constexpr (need_replication) |
| 821 | (*added_columns.offsets_to_replicate)[i] = current_offset; |
| 822 | } |
| 823 | |
| 824 | added_columns.applyLazyDefaults(); |
| 825 | return filter; |
| 826 | } |
| 827 | |
| 828 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> |
| 829 | IColumn::Filter joinRightColumnsSwitchNullability(const Map & map, AddedColumns & added_columns, const ConstNullMapPtr & null_map) |
| 830 | { |
| 831 | if (added_columns.need_filter) |
| 832 | { |
| 833 | if (null_map) |
| 834 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, true, true>(map, added_columns, null_map); |
| 835 | else |
| 836 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, true, false>(map, added_columns, nullptr); |
| 837 | } |
| 838 | else |
| 839 | { |
| 840 | if (null_map) |
| 841 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, false, true>(map, added_columns, null_map); |
| 842 | else |
| 843 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, false, false>(map, added_columns, nullptr); |
| 844 | } |
| 845 | } |
| 846 | |
| 847 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> |
| 848 | IColumn::Filter switchJoinRightColumns(const Maps & maps_, AddedColumns & added_columns, Join::Type type, const ConstNullMapPtr & null_map) |
| 849 | { |
| 850 | switch (type) |
| 851 | { |
| 852 | #define M(TYPE) \ |
| 853 | case Join::Type::TYPE: \ |
| 854 | return joinRightColumnsSwitchNullability<KIND, STRICTNESS,\ |
| 855 | typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\ |
| 856 | *maps_.TYPE, added_columns, null_map);\ |
| 857 | break; |
| 858 | APPLY_FOR_JOIN_VARIANTS(M) |
| 859 | #undef M |
| 860 | |
| 861 | default: |
| 862 | throw Exception("Unsupported JOIN keys. Type: " + toString(static_cast<UInt32>(type)), ErrorCodes::UNSUPPORTED_JOIN_KEYS); |
| 863 | } |
| 864 | } |
| 865 | |
| 866 | } /// nameless |
| 867 | |
| 868 | |
| 869 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> |
| 870 | void Join::joinBlockImpl( |
| 871 | Block & block, |
| 872 | const Names & key_names_left, |
| 873 | const Block & block_with_columns_to_add, |
| 874 | const Maps & maps_) const |
| 875 | { |
| 876 | constexpr bool is_any_join = STRICTNESS == ASTTableJoin::Strictness::Any; |
| 877 | constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All; |
| 878 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; |
| 879 | constexpr bool is_semi_join = STRICTNESS == ASTTableJoin::Strictness::Semi; |
| 880 | constexpr bool is_anti_join = STRICTNESS == ASTTableJoin::Strictness::Anti; |
| 881 | |
| 882 | constexpr bool left = KIND == ASTTableJoin::Kind::Left; |
| 883 | constexpr bool right = KIND == ASTTableJoin::Kind::Right; |
| 884 | constexpr bool inner = KIND == ASTTableJoin::Kind::Inner; |
| 885 | constexpr bool full = KIND == ASTTableJoin::Kind::Full; |
| 886 | |
| 887 | constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right); |
| 888 | constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left)); |
| 889 | |
| 890 | /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them. |
| 891 | Columns materialized_keys = JoinCommon::materializeColumns(block, key_names_left); |
| 892 | ColumnRawPtrs key_columns = JoinCommon::getRawPointers(materialized_keys); |
| 893 | |
| 894 | /// Keys with NULL value in any column won't join to anything. |
| 895 | ConstNullMapPtr null_map{}; |
| 896 | ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); |
| 897 | |
| 898 | size_t existing_columns = block.columns(); |
| 899 | |
| 900 | /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. |
| 901 | * Because if they are constants, then in the "not joined" rows, they may have different values |
| 902 | * - default values, which can differ from the values of these constants. |
| 903 | */ |
| 904 | if constexpr (right || full) |
| 905 | { |
| 906 | materializeBlockInplace(block); |
| 907 | |
| 908 | if (nullable_left_side) |
| 909 | JoinCommon::convertColumnsToNullable(block); |
| 910 | } |
| 911 | |
| 912 | /** For LEFT/INNER JOIN, the saved blocks do not contain keys. |
| 913 | * For FULL/RIGHT JOIN, the saved blocks contain keys; |
| 914 | * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. |
| 915 | * For ASOF, the last column is used as the ASOF column |
| 916 | */ |
| 917 | ColumnsWithTypeAndName ; |
| 918 | if constexpr (is_asof_join) |
| 919 | extras.push_back(right_table_keys.getByName(key_names_right.back())); |
| 920 | |
| 921 | AddedColumns added_columns(sample_block_with_columns_to_add, block_with_columns_to_add, block, savedBlockSample(), |
| 922 | extras, *this, key_columns, key_sizes); |
| 923 | bool has_required_right_keys = (required_right_keys.columns() != 0); |
| 924 | added_columns.need_filter = need_filter || has_required_right_keys; |
| 925 | |
| 926 | IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, null_map); |
| 927 | |
| 928 | for (size_t i = 0; i < added_columns.size(); ++i) |
| 929 | block.insert(added_columns.moveColumn(i)); |
| 930 | |
| 931 | std::vector<size_t> right_keys_to_replicate [[maybe_unused]]; |
| 932 | |
| 933 | if constexpr (need_filter) |
| 934 | { |
| 935 | /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. |
| 936 | for (size_t i = 0; i < existing_columns; ++i) |
| 937 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1); |
| 938 | |
| 939 | /// Add join key columns from right block if needed. |
| 940 | for (size_t i = 0; i < required_right_keys.columns(); ++i) |
| 941 | { |
| 942 | const auto & right_key = required_right_keys.getByPosition(i); |
| 943 | const auto & left_name = required_right_keys_sources[i]; |
| 944 | |
| 945 | const auto & col = block.getByName(left_name); |
| 946 | bool is_nullable = nullable_right_side || right_key.type->isNullable(); |
| 947 | block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable)); |
| 948 | } |
| 949 | } |
| 950 | else if (has_required_right_keys) |
| 951 | { |
| 952 | /// Some trash to represent IColumn::Filter as ColumnUInt8 needed for ColumnNullable::applyNullMap() |
| 953 | auto null_map_filter_ptr = ColumnUInt8::create(); |
| 954 | ColumnUInt8 & null_map_filter = assert_cast<ColumnUInt8 &>(*null_map_filter_ptr); |
| 955 | null_map_filter.getData().swap(row_filter); |
| 956 | const IColumn::Filter & filter = null_map_filter.getData(); |
| 957 | |
| 958 | /// Add join key columns from right block if needed. |
| 959 | for (size_t i = 0; i < required_right_keys.columns(); ++i) |
| 960 | { |
| 961 | const auto & right_key = required_right_keys.getByPosition(i); |
| 962 | const auto & left_name = required_right_keys_sources[i]; |
| 963 | |
| 964 | const auto & col = block.getByName(left_name); |
| 965 | bool is_nullable = nullable_right_side || right_key.type->isNullable(); |
| 966 | |
| 967 | ColumnPtr thin_column = filterWithBlanks(col.column, filter); |
| 968 | block.insert(correctNullability({thin_column, col.type, right_key.name}, is_nullable, null_map_filter)); |
| 969 | |
| 970 | if constexpr (need_replication) |
| 971 | right_keys_to_replicate.push_back(block.getPositionByName(right_key.name)); |
| 972 | } |
| 973 | } |
| 974 | |
| 975 | if constexpr (need_replication) |
| 976 | { |
| 977 | std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate; |
| 978 | |
| 979 | /// If ALL ... JOIN - we replicate all the columns except the new ones. |
| 980 | for (size_t i = 0; i < existing_columns; ++i) |
| 981 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate); |
| 982 | |
| 983 | /// Replicate additional right keys |
| 984 | for (size_t pos : right_keys_to_replicate) |
| 985 | block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate); |
| 986 | } |
| 987 | } |
| 988 | |
| 989 | |
| 990 | void Join::joinBlockImplCross(Block & block) const |
| 991 | { |
| 992 | /// Add new columns to the block. |
| 993 | size_t num_existing_columns = block.columns(); |
| 994 | size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); |
| 995 | |
| 996 | size_t rows_left = block.rows(); |
| 997 | |
| 998 | ColumnRawPtrs src_left_columns(num_existing_columns); |
| 999 | MutableColumns dst_columns(num_existing_columns + num_columns_to_add); |
| 1000 | |
| 1001 | for (size_t i = 0; i < num_existing_columns; ++i) |
| 1002 | { |
| 1003 | src_left_columns[i] = block.getByPosition(i).column.get(); |
| 1004 | dst_columns[i] = src_left_columns[i]->cloneEmpty(); |
| 1005 | } |
| 1006 | |
| 1007 | for (size_t i = 0; i < num_columns_to_add; ++i) |
| 1008 | { |
| 1009 | const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); |
| 1010 | dst_columns[num_existing_columns + i] = src_column.column->cloneEmpty(); |
| 1011 | block.insert(src_column); |
| 1012 | } |
| 1013 | |
| 1014 | /// NOTE It would be better to use `reserve`, as well as `replicate` methods to duplicate the values of the left block. |
| 1015 | |
| 1016 | for (size_t i = 0; i < rows_left; ++i) |
| 1017 | { |
| 1018 | for (const Block & block_right : data->blocks) |
| 1019 | { |
| 1020 | size_t rows_right = block_right.rows(); |
| 1021 | |
| 1022 | for (size_t col_num = 0; col_num < num_existing_columns; ++col_num) |
| 1023 | for (size_t j = 0; j < rows_right; ++j) |
| 1024 | dst_columns[col_num]->insertFrom(*src_left_columns[col_num], i); |
| 1025 | |
| 1026 | for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num) |
| 1027 | { |
| 1028 | const IColumn * column_right = block_right.getByPosition(col_num).column.get(); |
| 1029 | |
| 1030 | for (size_t j = 0; j < rows_right; ++j) |
| 1031 | dst_columns[num_existing_columns + col_num]->insertFrom(*column_right, j); |
| 1032 | } |
| 1033 | } |
| 1034 | } |
| 1035 | |
| 1036 | block = block.cloneWithColumns(std::move(dst_columns)); |
| 1037 | } |
| 1038 | |
| 1039 | static void checkTypeOfKey(const Block & block_left, const Block & block_right) |
| 1040 | { |
| 1041 | auto & [c1, left_type_origin, left_name] = block_left.safeGetByPosition(0); |
| 1042 | auto & [c2, right_type_origin, right_name] = block_right.safeGetByPosition(0); |
| 1043 | auto left_type = removeNullable(left_type_origin); |
| 1044 | auto right_type = removeNullable(right_type_origin); |
| 1045 | |
| 1046 | if (!left_type->equals(*right_type)) |
| 1047 | throw Exception("Type mismatch of columns to joinGet by: " |
| 1048 | + left_name + " " + left_type->getName() + " at left, " |
| 1049 | + right_name + " " + right_type->getName() + " at right" , |
| 1050 | ErrorCodes::TYPE_MISMATCH); |
| 1051 | } |
| 1052 | |
| 1053 | |
| 1054 | DataTypePtr Join::joinGetReturnType(const String & column_name) const |
| 1055 | { |
| 1056 | std::shared_lock lock(data->rwlock); |
| 1057 | |
| 1058 | if (!sample_block_with_columns_to_add.has(column_name)) |
| 1059 | throw Exception("StorageJoin doesn't contain column " + column_name, ErrorCodes::LOGICAL_ERROR); |
| 1060 | return sample_block_with_columns_to_add.getByName(column_name).type; |
| 1061 | } |
| 1062 | |
| 1063 | |
| 1064 | template <typename Maps> |
| 1065 | void Join::joinGetImpl(Block & block, const String & column_name, const Maps & maps_) const |
| 1066 | { |
| 1067 | joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::RightAny>( |
| 1068 | block, {block.getByPosition(0).name}, {sample_block_with_columns_to_add.getByName(column_name)}, maps_); |
| 1069 | } |
| 1070 | |
| 1071 | |
| 1072 | // TODO: support composite key |
| 1073 | // TODO: return multiple columns as named tuple |
| 1074 | // TODO: return array of values when strictness == ASTTableJoin::Strictness::All |
| 1075 | void Join::joinGet(Block & block, const String & column_name) const |
| 1076 | { |
| 1077 | std::shared_lock lock(data->rwlock); |
| 1078 | |
| 1079 | if (key_names_right.size() != 1) |
| 1080 | throw Exception("joinGet only supports StorageJoin containing exactly one key" , ErrorCodes::LOGICAL_ERROR); |
| 1081 | |
| 1082 | checkTypeOfKey(block, right_table_keys); |
| 1083 | |
| 1084 | if ((strictness == ASTTableJoin::Strictness::Any || strictness == ASTTableJoin::Strictness::RightAny) && |
| 1085 | kind == ASTTableJoin::Kind::Left) |
| 1086 | { |
| 1087 | joinGetImpl(block, column_name, std::get<MapsOne>(data->maps)); |
| 1088 | } |
| 1089 | else |
| 1090 | throw Exception("joinGet only supports StorageJoin of type Left Any" , ErrorCodes::LOGICAL_ERROR); |
| 1091 | } |
| 1092 | |
| 1093 | |
| 1094 | void Join::joinBlock(Block & block) |
| 1095 | { |
| 1096 | std::shared_lock lock(data->rwlock); |
| 1097 | |
| 1098 | const Names & key_names_left = table_join->keyNamesLeft(); |
| 1099 | JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right); |
| 1100 | |
| 1101 | if (joinDispatch(kind, strictness, data->maps, [&](auto kind_, auto strictness_, auto & map) |
| 1102 | { |
| 1103 | joinBlockImpl<kind_, strictness_>(block, key_names_left, sample_block_with_columns_to_add, map); |
| 1104 | })) |
| 1105 | { |
| 1106 | /// Joined |
| 1107 | } |
| 1108 | else if (kind == ASTTableJoin::Kind::Cross) |
| 1109 | joinBlockImplCross(block); |
| 1110 | else |
| 1111 | throw Exception("Logical error: unknown combination of JOIN" , ErrorCodes::LOGICAL_ERROR); |
| 1112 | } |
| 1113 | |
| 1114 | |
| 1115 | void Join::joinTotals(Block & block) const |
| 1116 | { |
| 1117 | JoinCommon::joinTotals(totals, sample_block_with_columns_to_add, key_names_right, block); |
| 1118 | } |
| 1119 | |
| 1120 | |
| 1121 | template <typename Mapped> |
| 1122 | struct AdderNonJoined |
| 1123 | { |
| 1124 | static void add(const Mapped & mapped, size_t & rows_added, MutableColumns & columns_right) |
| 1125 | { |
| 1126 | constexpr bool mapped_asof = std::is_same_v<Mapped, JoinStuff::MappedAsof>; |
| 1127 | [[maybe_unused]] constexpr bool mapped_one = std::is_same_v<Mapped, JoinStuff::MappedOne> || std::is_same_v<Mapped, JoinStuff::MappedOneFlagged>; |
| 1128 | |
| 1129 | if constexpr (mapped_asof) |
| 1130 | { |
| 1131 | /// Do nothing |
| 1132 | } |
| 1133 | else if constexpr (mapped_one) |
| 1134 | { |
| 1135 | for (size_t j = 0; j < columns_right.size(); ++j) |
| 1136 | { |
| 1137 | const auto & mapped_column = mapped.block->getByPosition(j).column; |
| 1138 | columns_right[j]->insertFrom(*mapped_column, mapped.row_num); |
| 1139 | } |
| 1140 | |
| 1141 | ++rows_added; |
| 1142 | } |
| 1143 | else |
| 1144 | { |
| 1145 | for (auto it = mapped.begin(); it.ok(); ++it) |
| 1146 | { |
| 1147 | for (size_t j = 0; j < columns_right.size(); ++j) |
| 1148 | { |
| 1149 | const auto & mapped_column = it->block->getByPosition(j).column; |
| 1150 | columns_right[j]->insertFrom(*mapped_column, it->row_num); |
| 1151 | } |
| 1152 | |
| 1153 | ++rows_added; |
| 1154 | } |
| 1155 | } |
| 1156 | } |
| 1157 | }; |
| 1158 | |
| 1159 | |
| 1160 | /// Stream from not joined earlier rows of the right table. |
| 1161 | class NonJoinedBlockInputStream : public IBlockInputStream |
| 1162 | { |
| 1163 | public: |
| 1164 | NonJoinedBlockInputStream(const Join & parent_, const Block & result_sample_block_, UInt64 max_block_size_) |
| 1165 | : parent(parent_) |
| 1166 | , max_block_size(max_block_size_) |
| 1167 | , result_sample_block(materializeBlock(result_sample_block_)) |
| 1168 | { |
| 1169 | bool remap_keys = parent.table_join->hasUsing(); |
| 1170 | std::unordered_map<size_t, size_t> left_to_right_key_remap; |
| 1171 | |
| 1172 | for (size_t i = 0; i < parent.table_join->keyNamesLeft().size(); ++i) |
| 1173 | { |
| 1174 | const String & left_key_name = parent.table_join->keyNamesLeft()[i]; |
| 1175 | const String & right_key_name = parent.table_join->keyNamesRight()[i]; |
| 1176 | |
| 1177 | size_t left_key_pos = result_sample_block.getPositionByName(left_key_name); |
| 1178 | size_t right_key_pos = parent.savedBlockSample().getPositionByName(right_key_name); |
| 1179 | |
| 1180 | if (remap_keys && !parent.required_right_keys.has(right_key_name)) |
| 1181 | left_to_right_key_remap[left_key_pos] = right_key_pos; |
| 1182 | } |
| 1183 | |
| 1184 | /// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys |
| 1185 | size_t left_columns_count = result_sample_block.columns() - |
| 1186 | parent.sample_block_with_columns_to_add.columns() - parent.required_right_keys.columns(); |
| 1187 | |
| 1188 | for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos) |
| 1189 | { |
| 1190 | /// We need right 'x' for 'RIGHT JOIN ... USING(x)'. |
| 1191 | if (left_to_right_key_remap.count(left_pos)) |
| 1192 | { |
| 1193 | size_t right_key_pos = left_to_right_key_remap[left_pos]; |
| 1194 | setRightIndex(right_key_pos, left_pos); |
| 1195 | } |
| 1196 | else |
| 1197 | column_indices_left.emplace_back(left_pos); |
| 1198 | } |
| 1199 | |
| 1200 | const auto & saved_block_sample = parent.savedBlockSample(); |
| 1201 | for (size_t right_pos = 0; right_pos < saved_block_sample.columns(); ++right_pos) |
| 1202 | { |
| 1203 | const String & name = saved_block_sample.getByPosition(right_pos).name; |
| 1204 | if (!result_sample_block.has(name)) |
| 1205 | continue; |
| 1206 | |
| 1207 | size_t result_position = result_sample_block.getPositionByName(name); |
| 1208 | |
| 1209 | /// Don't remap left keys twice. We need only qualified right keys here |
| 1210 | if (result_position < left_columns_count) |
| 1211 | continue; |
| 1212 | |
| 1213 | setRightIndex(right_pos, result_position); |
| 1214 | } |
| 1215 | |
| 1216 | if (column_indices_left.size() + column_indices_right.size() + same_result_keys.size() != result_sample_block.columns()) |
| 1217 | throw Exception("Error in columns mapping in RIGHT|FULL JOIN. Left: " + toString(column_indices_left.size()) + |
| 1218 | ", right: " + toString(column_indices_right.size()) + |
| 1219 | ", same: " + toString(same_result_keys.size()) + |
| 1220 | ", result: " + toString(result_sample_block.columns()), |
| 1221 | ErrorCodes::LOGICAL_ERROR); |
| 1222 | } |
| 1223 | |
| 1224 | String getName() const override { return "NonJoined" ; } |
| 1225 | |
| 1226 | Block () const override { return result_sample_block; } |
| 1227 | |
| 1228 | |
| 1229 | protected: |
| 1230 | Block readImpl() override |
| 1231 | { |
| 1232 | if (parent.data->blocks.empty()) |
| 1233 | return Block(); |
| 1234 | return createBlock(); |
| 1235 | } |
| 1236 | |
| 1237 | private: |
| 1238 | const Join & parent; |
| 1239 | UInt64 max_block_size; |
| 1240 | |
| 1241 | Block result_sample_block; |
| 1242 | /// Indices of columns in result_sample_block that should be generated |
| 1243 | std::vector<size_t> column_indices_left; |
| 1244 | /// Indices of columns that come from the right-side table: right_pos -> result_pos |
| 1245 | std::unordered_map<size_t, size_t> column_indices_right; |
| 1246 | /// |
| 1247 | std::unordered_map<size_t, size_t> same_result_keys; |
| 1248 | /// Which right columns (saved in parent) need nullability change before placing them in result block |
| 1249 | std::vector<size_t> right_nullability_changes; |
| 1250 | |
| 1251 | std::any position; |
| 1252 | std::optional<Join::BlockNullmapList::const_iterator> nulls_position; |
| 1253 | |
| 1254 | void setRightIndex(size_t right_pos, size_t result_position) |
| 1255 | { |
| 1256 | if (!column_indices_right.count(right_pos)) |
| 1257 | { |
| 1258 | column_indices_right[right_pos] = result_position; |
| 1259 | |
| 1260 | if (hasNullabilityChange(right_pos, result_position)) |
| 1261 | right_nullability_changes.push_back(right_pos); |
| 1262 | } |
| 1263 | else |
| 1264 | same_result_keys[result_position] = column_indices_right[right_pos]; |
| 1265 | } |
| 1266 | |
| 1267 | bool hasNullabilityChange(size_t right_pos, size_t result_pos) const |
| 1268 | { |
| 1269 | const auto & src = parent.savedBlockSample().getByPosition(right_pos).column; |
| 1270 | const auto & dst = result_sample_block.getByPosition(result_pos).column; |
| 1271 | return src->isNullable() != dst->isNullable(); |
| 1272 | } |
| 1273 | |
| 1274 | Block createBlock() |
| 1275 | { |
| 1276 | MutableColumns columns_right = parent.savedBlockSample().cloneEmptyColumns(); |
| 1277 | |
| 1278 | size_t rows_added = 0; |
| 1279 | |
| 1280 | auto fill_callback = [&](auto, auto strictness, auto & map) |
| 1281 | { |
| 1282 | rows_added = fillColumnsFromMap<strictness>(map, columns_right); |
| 1283 | }; |
| 1284 | |
| 1285 | if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, fill_callback)) |
| 1286 | throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)" , ErrorCodes::LOGICAL_ERROR); |
| 1287 | |
| 1288 | fillNullsFromBlocks(columns_right, rows_added); |
| 1289 | |
| 1290 | if (!rows_added) |
| 1291 | return {}; |
| 1292 | |
| 1293 | for (size_t pos : right_nullability_changes) |
| 1294 | changeNullability(columns_right[pos]); |
| 1295 | |
| 1296 | Block res = result_sample_block.cloneEmpty(); |
| 1297 | |
| 1298 | /// @note it's possible to make ColumnConst here and materialize it later |
| 1299 | for (size_t pos : column_indices_left) |
| 1300 | res.getByPosition(pos).column = res.getByPosition(pos).column->cloneResized(rows_added); |
| 1301 | |
| 1302 | for (auto & pr : column_indices_right) |
| 1303 | { |
| 1304 | auto & right_column = columns_right[pr.first]; |
| 1305 | auto & result_column = res.getByPosition(pr.second).column; |
| 1306 | #ifndef NDEBUG |
| 1307 | if (result_column->getName() != right_column->getName()) |
| 1308 | throw Exception("Wrong columns assign in RIGHT|FULL JOIN: " + result_column->getName() + |
| 1309 | " " + right_column->getName(), ErrorCodes::LOGICAL_ERROR); |
| 1310 | #endif |
| 1311 | result_column = std::move(right_column); |
| 1312 | } |
| 1313 | |
| 1314 | for (auto & pr : same_result_keys) |
| 1315 | { |
| 1316 | auto & src_column = res.getByPosition(pr.second).column; |
| 1317 | auto & dst_column = res.getByPosition(pr.first).column; |
| 1318 | |
| 1319 | if (src_column->isNullable() && !dst_column->isNullable()) |
| 1320 | { |
| 1321 | auto * nullable = checkAndGetColumn<ColumnNullable>(*src_column); |
| 1322 | dst_column = nullable->getNestedColumnPtr(); |
| 1323 | } |
| 1324 | else if (!src_column->isNullable() && dst_column->isNullable()) |
| 1325 | dst_column = makeNullable(src_column); |
| 1326 | else |
| 1327 | dst_column = src_column; |
| 1328 | } |
| 1329 | |
| 1330 | return res; |
| 1331 | } |
| 1332 | |
| 1333 | template <ASTTableJoin::Strictness STRICTNESS, typename Maps> |
| 1334 | size_t fillColumnsFromMap(const Maps & maps, MutableColumns & columns_keys_and_right) |
| 1335 | { |
| 1336 | switch (parent.data->type) |
| 1337 | { |
| 1338 | #define M(TYPE) \ |
| 1339 | case Join::Type::TYPE: \ |
| 1340 | return fillColumns<STRICTNESS>(*maps.TYPE, columns_keys_and_right); |
| 1341 | APPLY_FOR_JOIN_VARIANTS(M) |
| 1342 | #undef M |
| 1343 | default: |
| 1344 | throw Exception("Unsupported JOIN keys. Type: " + toString(static_cast<UInt32>(parent.data->type)), |
| 1345 | ErrorCodes::UNSUPPORTED_JOIN_KEYS); |
| 1346 | } |
| 1347 | |
| 1348 | __builtin_unreachable(); |
| 1349 | } |
| 1350 | |
| 1351 | template <ASTTableJoin::Strictness STRICTNESS, typename Map> |
| 1352 | size_t fillColumns(const Map & map, MutableColumns & columns_keys_and_right) |
| 1353 | { |
| 1354 | using Mapped = typename Map::mapped_type; |
| 1355 | using Iterator = typename Map::const_iterator; |
| 1356 | |
| 1357 | size_t rows_added = 0; |
| 1358 | |
| 1359 | if (!position.has_value()) |
| 1360 | position = std::make_any<Iterator>(map.begin()); |
| 1361 | |
| 1362 | Iterator & it = std::any_cast<Iterator &>(position); |
| 1363 | auto end = map.end(); |
| 1364 | |
| 1365 | for (; it != end; ++it) |
| 1366 | { |
| 1367 | const Mapped & mapped = it->getMapped(); |
| 1368 | |
| 1369 | if (mapped.getUsed()) |
| 1370 | continue; |
| 1371 | |
| 1372 | AdderNonJoined<Mapped>::add(mapped, rows_added, columns_keys_and_right); |
| 1373 | |
| 1374 | if (rows_added >= max_block_size) |
| 1375 | { |
| 1376 | ++it; |
| 1377 | break; |
| 1378 | } |
| 1379 | } |
| 1380 | |
| 1381 | return rows_added; |
| 1382 | } |
| 1383 | |
| 1384 | void fillNullsFromBlocks(MutableColumns & columns_keys_and_right, size_t & rows_added) |
| 1385 | { |
| 1386 | if (!nulls_position.has_value()) |
| 1387 | nulls_position = parent.data->blocks_nullmaps.begin(); |
| 1388 | |
| 1389 | auto end = parent.data->blocks_nullmaps.end(); |
| 1390 | |
| 1391 | for (auto & it = *nulls_position; it != end && rows_added < max_block_size; ++it) |
| 1392 | { |
| 1393 | const Block * block = it->first; |
| 1394 | const NullMap & nullmap = assert_cast<const ColumnUInt8 &>(*it->second).getData(); |
| 1395 | |
| 1396 | for (size_t row = 0; row < nullmap.size(); ++row) |
| 1397 | { |
| 1398 | if (nullmap[row]) |
| 1399 | { |
| 1400 | for (size_t col = 0; col < columns_keys_and_right.size(); ++col) |
| 1401 | columns_keys_and_right[col]->insertFrom(*block->getByPosition(col).column, row); |
| 1402 | ++rows_added; |
| 1403 | } |
| 1404 | } |
| 1405 | } |
| 1406 | } |
| 1407 | }; |
| 1408 | |
| 1409 | |
| 1410 | BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const |
| 1411 | { |
| 1412 | if (table_join->strictness() == ASTTableJoin::Strictness::Asof || |
| 1413 | table_join->strictness() == ASTTableJoin::Strictness::Semi) |
| 1414 | return {}; |
| 1415 | |
| 1416 | if (isRightOrFull(table_join->kind())) |
| 1417 | return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size); |
| 1418 | return {}; |
| 1419 | } |
| 1420 | |
| 1421 | |
| 1422 | bool Join::hasStreamWithNonJoinedRows() const |
| 1423 | { |
| 1424 | if (table_join->strictness() == ASTTableJoin::Strictness::Asof || |
| 1425 | table_join->strictness() == ASTTableJoin::Strictness::Semi) |
| 1426 | return false; |
| 1427 | |
| 1428 | return isRightOrFull(table_join->kind()); |
| 1429 | } |
| 1430 | |
| 1431 | } |
| 1432 | |