| 1 | #include <any> | 
|---|
| 2 |  | 
|---|
| 3 | #include <common/logger_useful.h> | 
|---|
| 4 |  | 
|---|
| 5 | #include <Columns/ColumnConst.h> | 
|---|
| 6 | #include <Columns/ColumnString.h> | 
|---|
| 7 | #include <Columns/ColumnFixedString.h> | 
|---|
| 8 | #include <Columns/ColumnNullable.h> | 
|---|
| 9 |  | 
|---|
| 10 | #include <DataTypes/DataTypeNullable.h> | 
|---|
| 11 |  | 
|---|
| 12 | #include <Interpreters/Join.h> | 
|---|
| 13 | #include <Interpreters/join_common.h> | 
|---|
| 14 | #include <Interpreters/AnalyzedJoin.h> | 
|---|
| 15 | #include <Interpreters/joinDispatch.h> | 
|---|
| 16 | #include <Interpreters/NullableUtils.h> | 
|---|
| 17 |  | 
|---|
| 18 | #include <DataStreams/IBlockInputStream.h> | 
|---|
| 19 | #include <DataStreams/materializeBlock.h> | 
|---|
| 20 |  | 
|---|
| 21 | #include <Core/ColumnNumbers.h> | 
|---|
| 22 | #include <Common/typeid_cast.h> | 
|---|
| 23 | #include <Common/assert_cast.h> | 
|---|
| 24 | #include <DataTypes/DataTypeLowCardinality.h> | 
|---|
| 25 |  | 
|---|
| 26 |  | 
|---|
| 27 | namespace DB | 
|---|
| 28 | { | 
|---|
| 29 |  | 
|---|
| 30 | namespace ErrorCodes | 
|---|
| 31 | { | 
|---|
| 32 | extern const int UNSUPPORTED_JOIN_KEYS; | 
|---|
| 33 | extern const int LOGICAL_ERROR; | 
|---|
| 34 | extern const int SET_SIZE_LIMIT_EXCEEDED; | 
|---|
| 35 | extern const int TYPE_MISMATCH; | 
|---|
| 36 | extern const int ILLEGAL_COLUMN; | 
|---|
| 37 | } | 
|---|
| 38 |  | 
|---|
| 39 |  | 
|---|
| 40 | static ColumnPtr filterWithBlanks(ColumnPtr src_column, const IColumn::Filter & filter, bool inverse_filter = false) | 
|---|
| 41 | { | 
|---|
| 42 | ColumnPtr column = src_column->convertToFullColumnIfConst(); | 
|---|
| 43 | MutableColumnPtr mut_column = column->cloneEmpty(); | 
|---|
| 44 | mut_column->reserve(column->size()); | 
|---|
| 45 |  | 
|---|
| 46 | if (inverse_filter) | 
|---|
| 47 | { | 
|---|
| 48 | for (size_t row = 0; row < filter.size(); ++row) | 
|---|
| 49 | { | 
|---|
| 50 | if (filter[row]) | 
|---|
| 51 | mut_column->insertDefault(); | 
|---|
| 52 | else | 
|---|
| 53 | mut_column->insertFrom(*column, row); | 
|---|
| 54 | } | 
|---|
| 55 | } | 
|---|
| 56 | else | 
|---|
| 57 | { | 
|---|
| 58 | for (size_t row = 0; row < filter.size(); ++row) | 
|---|
| 59 | { | 
|---|
| 60 | if (filter[row]) | 
|---|
| 61 | mut_column->insertFrom(*column, row); | 
|---|
| 62 | else | 
|---|
| 63 | mut_column->insertDefault(); | 
|---|
| 64 | } | 
|---|
| 65 | } | 
|---|
| 66 |  | 
|---|
| 67 | return mut_column; | 
|---|
| 68 | } | 
|---|
| 69 |  | 
|---|
| 70 | static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable) | 
|---|
| 71 | { | 
|---|
| 72 | if (nullable) | 
|---|
| 73 | { | 
|---|
| 74 | JoinCommon::convertColumnToNullable(column); | 
|---|
| 75 | } | 
|---|
| 76 | else | 
|---|
| 77 | { | 
|---|
| 78 | /// We have to replace values masked by NULLs with defaults. | 
|---|
| 79 | if (column.column) | 
|---|
| 80 | if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(*column.column)) | 
|---|
| 81 | column.column = filterWithBlanks(column.column, nullable_column->getNullMapColumn().getData(), true); | 
|---|
| 82 |  | 
|---|
| 83 | JoinCommon::removeColumnNullability(column); | 
|---|
| 84 | } | 
|---|
| 85 |  | 
|---|
| 86 | return std::move(column); | 
|---|
| 87 | } | 
|---|
| 88 |  | 
|---|
| 89 | static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable, const ColumnUInt8 & negative_null_map) | 
|---|
| 90 | { | 
|---|
| 91 | if (nullable) | 
|---|
| 92 | { | 
|---|
| 93 | JoinCommon::convertColumnToNullable(column); | 
|---|
| 94 | if (column.type->isNullable() && negative_null_map.size()) | 
|---|
| 95 | { | 
|---|
| 96 | MutableColumnPtr mutable_column = (*std::move(column.column)).mutate(); | 
|---|
| 97 | assert_cast<ColumnNullable &>(*mutable_column).applyNegatedNullMap(negative_null_map); | 
|---|
| 98 | column.column = std::move(mutable_column); | 
|---|
| 99 | } | 
|---|
| 100 | } | 
|---|
| 101 | else | 
|---|
| 102 | JoinCommon::removeColumnNullability(column); | 
|---|
| 103 |  | 
|---|
| 104 | return std::move(column); | 
|---|
| 105 | } | 
|---|
| 106 |  | 
|---|
| 107 | static void changeNullability(MutableColumnPtr & mutable_column) | 
|---|
| 108 | { | 
|---|
| 109 | ColumnPtr column = std::move(mutable_column); | 
|---|
| 110 | if (auto * nullable = checkAndGetColumn<ColumnNullable>(*column)) | 
|---|
| 111 | column = nullable->getNestedColumnPtr(); | 
|---|
| 112 | else | 
|---|
| 113 | column = makeNullable(column); | 
|---|
| 114 |  | 
|---|
| 115 | mutable_column = (*std::move(column)).mutate(); | 
|---|
| 116 | } | 
|---|
| 117 |  | 
|---|
| 118 |  | 
|---|
| 119 | Join::Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_) | 
|---|
| 120 | : table_join(table_join_) | 
|---|
| 121 | , kind(table_join->kind()) | 
|---|
| 122 | , strictness(table_join->strictness()) | 
|---|
| 123 | , key_names_right(table_join->keyNamesRight()) | 
|---|
| 124 | , nullable_right_side(table_join->forceNullableRight()) | 
|---|
| 125 | , nullable_left_side(table_join->forceNullableLeft()) | 
|---|
| 126 | , any_take_last_row(any_take_last_row_) | 
|---|
| 127 | , asof_inequality(table_join->getAsofInequality()) | 
|---|
| 128 | , data(std::make_shared<RightTableData>()) | 
|---|
| 129 | , log(&Logger::get( "Join")) | 
|---|
| 130 | { | 
|---|
| 131 | setSampleBlock(right_sample_block); | 
|---|
| 132 | } | 
|---|
| 133 |  | 
|---|
| 134 |  | 
|---|
| 135 | Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) | 
|---|
| 136 | { | 
|---|
| 137 | size_t keys_size = key_columns.size(); | 
|---|
| 138 |  | 
|---|
| 139 | if (keys_size == 0) | 
|---|
| 140 | return Type::CROSS; | 
|---|
| 141 |  | 
|---|
| 142 | bool all_fixed = true; | 
|---|
| 143 | size_t keys_bytes = 0; | 
|---|
| 144 | key_sizes.resize(keys_size); | 
|---|
| 145 | for (size_t j = 0; j < keys_size; ++j) | 
|---|
| 146 | { | 
|---|
| 147 | if (!key_columns[j]->isFixedAndContiguous()) | 
|---|
| 148 | { | 
|---|
| 149 | all_fixed = false; | 
|---|
| 150 | break; | 
|---|
| 151 | } | 
|---|
| 152 | key_sizes[j] = key_columns[j]->sizeOfValueIfFixed(); | 
|---|
| 153 | keys_bytes += key_sizes[j]; | 
|---|
| 154 | } | 
|---|
| 155 |  | 
|---|
| 156 | /// If there is one numeric key that fits in 64 bits | 
|---|
| 157 | if (keys_size == 1 && key_columns[0]->isNumeric()) | 
|---|
| 158 | { | 
|---|
| 159 | size_t size_of_field = key_columns[0]->sizeOfValueIfFixed(); | 
|---|
| 160 | if (size_of_field == 1) | 
|---|
| 161 | return Type::key8; | 
|---|
| 162 | if (size_of_field == 2) | 
|---|
| 163 | return Type::key16; | 
|---|
| 164 | if (size_of_field == 4) | 
|---|
| 165 | return Type::key32; | 
|---|
| 166 | if (size_of_field == 8) | 
|---|
| 167 | return Type::key64; | 
|---|
| 168 | if (size_of_field == 16) | 
|---|
| 169 | return Type::keys128; | 
|---|
| 170 | throw Exception( "Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 171 | } | 
|---|
| 172 |  | 
|---|
| 173 | /// If the keys fit in N bits, we will use a hash table for N-bit-packed keys | 
|---|
| 174 | if (all_fixed && keys_bytes <= 16) | 
|---|
| 175 | return Type::keys128; | 
|---|
| 176 | if (all_fixed && keys_bytes <= 32) | 
|---|
| 177 | return Type::keys256; | 
|---|
| 178 |  | 
|---|
| 179 | /// If there is single string key, use hash table of it's values. | 
|---|
| 180 | if (keys_size == 1 | 
|---|
| 181 | && (typeid_cast<const ColumnString *>(key_columns[0]) | 
|---|
| 182 | || (isColumnConst(*key_columns[0]) && typeid_cast<const ColumnString *>(&assert_cast<const ColumnConst *>(key_columns[0])->getDataColumn())))) | 
|---|
| 183 | return Type::key_string; | 
|---|
| 184 |  | 
|---|
| 185 | if (keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0])) | 
|---|
| 186 | return Type::key_fixed_string; | 
|---|
| 187 |  | 
|---|
| 188 | /// Otherwise, will use set of cryptographic hashes of unambiguously serialized values. | 
|---|
| 189 | return Type::hashed; | 
|---|
| 190 | } | 
|---|
| 191 |  | 
|---|
| 192 | static const IColumn * extractAsofColumn(const ColumnRawPtrs & key_columns) | 
|---|
| 193 | { | 
|---|
| 194 | return key_columns.back(); | 
|---|
| 195 | } | 
|---|
| 196 |  | 
|---|
| 197 | template<typename KeyGetter, bool is_asof_join> | 
|---|
| 198 | static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) | 
|---|
| 199 | { | 
|---|
| 200 | if constexpr (is_asof_join) | 
|---|
| 201 | { | 
|---|
| 202 | auto key_column_copy = key_columns; | 
|---|
| 203 | auto key_size_copy = key_sizes; | 
|---|
| 204 | key_column_copy.pop_back(); | 
|---|
| 205 | key_size_copy.pop_back(); | 
|---|
| 206 | return KeyGetter(key_column_copy, key_size_copy, nullptr); | 
|---|
| 207 | } | 
|---|
| 208 | else | 
|---|
| 209 | return KeyGetter(key_columns, key_sizes, nullptr); | 
|---|
| 210 | } | 
|---|
| 211 |  | 
|---|
| 212 | template <Join::Type type, typename Value, typename Mapped> | 
|---|
| 213 | struct KeyGetterForTypeImpl; | 
|---|
| 214 |  | 
|---|
| 215 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key8, Value, Mapped> | 
|---|
| 216 | { | 
|---|
| 217 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false>; | 
|---|
| 218 | }; | 
|---|
| 219 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key16, Value, Mapped> | 
|---|
| 220 | { | 
|---|
| 221 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false>; | 
|---|
| 222 | }; | 
|---|
| 223 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key32, Value, Mapped> | 
|---|
| 224 | { | 
|---|
| 225 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false>; | 
|---|
| 226 | }; | 
|---|
| 227 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key64, Value, Mapped> | 
|---|
| 228 | { | 
|---|
| 229 | using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false>; | 
|---|
| 230 | }; | 
|---|
| 231 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_string, Value, Mapped> | 
|---|
| 232 | { | 
|---|
| 233 | using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false>; | 
|---|
| 234 | }; | 
|---|
| 235 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_fixed_string, Value, Mapped> | 
|---|
| 236 | { | 
|---|
| 237 | using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false>; | 
|---|
| 238 | }; | 
|---|
| 239 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys128, Value, Mapped> | 
|---|
| 240 | { | 
|---|
| 241 | using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false>; | 
|---|
| 242 | }; | 
|---|
| 243 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys256, Value, Mapped> | 
|---|
| 244 | { | 
|---|
| 245 | using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false>; | 
|---|
| 246 | }; | 
|---|
| 247 | template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::hashed, Value, Mapped> | 
|---|
| 248 | { | 
|---|
| 249 | using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false>; | 
|---|
| 250 | }; | 
|---|
| 251 |  | 
|---|
| 252 | template <Join::Type type, typename Data> | 
|---|
| 253 | struct KeyGetterForType | 
|---|
| 254 | { | 
|---|
| 255 | using Value = typename Data::value_type; | 
|---|
| 256 | using Mapped_t = typename Data::mapped_type; | 
|---|
| 257 | using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>; | 
|---|
| 258 | using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type; | 
|---|
| 259 | }; | 
|---|
| 260 |  | 
|---|
| 261 |  | 
|---|
| 262 | void Join::init(Type type_) | 
|---|
| 263 | { | 
|---|
| 264 | data->type = type_; | 
|---|
| 265 |  | 
|---|
| 266 | if (kind == ASTTableJoin::Kind::Cross) | 
|---|
| 267 | return; | 
|---|
| 268 | joinDispatchInit(kind, strictness, data->maps); | 
|---|
| 269 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { map.create(data->type); }); | 
|---|
| 270 | } | 
|---|
| 271 |  | 
|---|
| 272 | size_t Join::getTotalRowCount() const | 
|---|
| 273 | { | 
|---|
| 274 | size_t res = 0; | 
|---|
| 275 |  | 
|---|
| 276 | if (data->type == Type::CROSS) | 
|---|
| 277 | { | 
|---|
| 278 | for (const auto & block : data->blocks) | 
|---|
| 279 | res += block.rows(); | 
|---|
| 280 | } | 
|---|
| 281 | else | 
|---|
| 282 | { | 
|---|
| 283 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { res += map.getTotalRowCount(data->type); }); | 
|---|
| 284 | } | 
|---|
| 285 |  | 
|---|
| 286 | return res; | 
|---|
| 287 | } | 
|---|
| 288 |  | 
|---|
| 289 | size_t Join::getTotalByteCount() const | 
|---|
| 290 | { | 
|---|
| 291 | size_t res = 0; | 
|---|
| 292 |  | 
|---|
| 293 | if (data->type == Type::CROSS) | 
|---|
| 294 | { | 
|---|
| 295 | for (const auto & block : data->blocks) | 
|---|
| 296 | res += block.bytes(); | 
|---|
| 297 | } | 
|---|
| 298 | else | 
|---|
| 299 | { | 
|---|
| 300 | joinDispatch(kind, strictness, data->maps, [&](auto, auto, auto & map) { res += map.getTotalByteCountImpl(data->type); }); | 
|---|
| 301 | res += data->pool.size(); | 
|---|
| 302 | } | 
|---|
| 303 |  | 
|---|
| 304 | return res; | 
|---|
| 305 | } | 
|---|
| 306 |  | 
|---|
| 307 | void Join::setSampleBlock(const Block & block) | 
|---|
| 308 | { | 
|---|
| 309 | /// You have to restore this lock if you call the fuction outside of ctor. | 
|---|
| 310 | //std::unique_lock lock(rwlock); | 
|---|
| 311 |  | 
|---|
| 312 | LOG_DEBUG(log, "setSampleBlock: "<< block.dumpStructure()); | 
|---|
| 313 |  | 
|---|
| 314 | if (!empty()) | 
|---|
| 315 | return; | 
|---|
| 316 |  | 
|---|
| 317 | ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add); | 
|---|
| 318 |  | 
|---|
| 319 | initRightBlockStructure(); | 
|---|
| 320 | initRequiredRightKeys(); | 
|---|
| 321 |  | 
|---|
| 322 | JoinCommon::createMissedColumns(sample_block_with_columns_to_add); | 
|---|
| 323 | if (nullable_right_side) | 
|---|
| 324 | JoinCommon::convertColumnsToNullable(sample_block_with_columns_to_add); | 
|---|
| 325 |  | 
|---|
| 326 | if (strictness == ASTTableJoin::Strictness::Asof) | 
|---|
| 327 | { | 
|---|
| 328 | if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner) | 
|---|
| 329 | throw Exception( "ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 330 |  | 
|---|
| 331 | const IColumn * asof_column = key_columns.back(); | 
|---|
| 332 | size_t asof_size; | 
|---|
| 333 |  | 
|---|
| 334 | asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size); | 
|---|
| 335 | if (!asof_type) | 
|---|
| 336 | { | 
|---|
| 337 | std::string msg = "ASOF join not supported for type: "; | 
|---|
| 338 | msg += asof_column->getFamilyName(); | 
|---|
| 339 | throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD); | 
|---|
| 340 | } | 
|---|
| 341 |  | 
|---|
| 342 | key_columns.pop_back(); | 
|---|
| 343 |  | 
|---|
| 344 | if (key_columns.empty()) | 
|---|
| 345 | throw Exception( "ASOF join cannot be done without a joining column", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 346 |  | 
|---|
| 347 | /// this is going to set up the appropriate hash table for the direct lookup part of the join | 
|---|
| 348 | /// However, this does not depend on the size of the asof join key (as that goes into the BST) | 
|---|
| 349 | /// Therefore, add it back in such that it can be extracted appropriately from the full stored | 
|---|
| 350 | /// key_columns and key_sizes | 
|---|
| 351 | init(chooseMethod(key_columns, key_sizes)); | 
|---|
| 352 | key_sizes.push_back(asof_size); | 
|---|
| 353 | } | 
|---|
| 354 | else | 
|---|
| 355 | { | 
|---|
| 356 | /// Choose data structure to use for JOIN. | 
|---|
| 357 | init(chooseMethod(key_columns, key_sizes)); | 
|---|
| 358 | } | 
|---|
| 359 | } | 
|---|
| 360 |  | 
|---|
| 361 | namespace | 
|---|
| 362 | { | 
|---|
| 363 | /// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN. | 
|---|
| 364 | template <typename Map, typename KeyGetter> | 
|---|
| 365 | struct Inserter | 
|---|
| 366 | { | 
|---|
| 367 | static ALWAYS_INLINE void insertOne(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, | 
|---|
| 368 | Arena & pool) | 
|---|
| 369 | { | 
|---|
| 370 | auto emplace_result = key_getter.emplaceKey(map, i, pool); | 
|---|
| 371 |  | 
|---|
| 372 | if (emplace_result.isInserted() || join.anyTakeLastRow()) | 
|---|
| 373 | new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); | 
|---|
| 374 | } | 
|---|
| 375 |  | 
|---|
| 376 | static ALWAYS_INLINE void insertAll(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) | 
|---|
| 377 | { | 
|---|
| 378 | auto emplace_result = key_getter.emplaceKey(map, i, pool); | 
|---|
| 379 |  | 
|---|
| 380 | if (emplace_result.isInserted()) | 
|---|
| 381 | new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); | 
|---|
| 382 | else | 
|---|
| 383 | { | 
|---|
| 384 | /// The first element of the list is stored in the value of the hash table, the rest in the pool. | 
|---|
| 385 | emplace_result.getMapped().insert({stored_block, i}, pool); | 
|---|
| 386 | } | 
|---|
| 387 | } | 
|---|
| 388 |  | 
|---|
| 389 | static ALWAYS_INLINE void insertAsof(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, | 
|---|
| 390 | const IColumn * asof_column) | 
|---|
| 391 | { | 
|---|
| 392 | auto emplace_result = key_getter.emplaceKey(map, i, pool); | 
|---|
| 393 | typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); | 
|---|
| 394 |  | 
|---|
| 395 | if (emplace_result.isInserted()) | 
|---|
| 396 | time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); | 
|---|
| 397 | time_series_map->insert(join.getAsofType(), asof_column, stored_block, i); | 
|---|
| 398 | } | 
|---|
| 399 | }; | 
|---|
| 400 |  | 
|---|
| 401 |  | 
|---|
| 402 | template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map> | 
|---|
| 403 | void NO_INLINE insertFromBlockImplTypeCase( | 
|---|
| 404 | Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, | 
|---|
| 405 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) | 
|---|
| 406 | { | 
|---|
| 407 | [[maybe_unused]] constexpr bool mapped_one = std::is_same_v<typename Map::mapped_type, JoinStuff::MappedOne> || | 
|---|
| 408 | std::is_same_v<typename Map::mapped_type, JoinStuff::MappedOneFlagged>; | 
|---|
| 409 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; | 
|---|
| 410 |  | 
|---|
| 411 | const IColumn * asof_column [[maybe_unused]] = nullptr; | 
|---|
| 412 | if constexpr (is_asof_join) | 
|---|
| 413 | asof_column = extractAsofColumn(key_columns); | 
|---|
| 414 |  | 
|---|
| 415 | auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes); | 
|---|
| 416 |  | 
|---|
| 417 | for (size_t i = 0; i < rows; ++i) | 
|---|
| 418 | { | 
|---|
| 419 | if (has_null_map && (*null_map)[i]) | 
|---|
| 420 | continue; | 
|---|
| 421 |  | 
|---|
| 422 | if constexpr (is_asof_join) | 
|---|
| 423 | Inserter<Map, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, asof_column); | 
|---|
| 424 | else if constexpr (mapped_one) | 
|---|
| 425 | Inserter<Map, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool); | 
|---|
| 426 | else | 
|---|
| 427 | Inserter<Map, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool); | 
|---|
| 428 | } | 
|---|
| 429 | } | 
|---|
| 430 |  | 
|---|
| 431 |  | 
|---|
| 432 | template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> | 
|---|
| 433 | void insertFromBlockImplType( | 
|---|
| 434 | Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, | 
|---|
| 435 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) | 
|---|
| 436 | { | 
|---|
| 437 | if (null_map) | 
|---|
| 438 | insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); | 
|---|
| 439 | else | 
|---|
| 440 | insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); | 
|---|
| 441 | } | 
|---|
| 442 |  | 
|---|
| 443 |  | 
|---|
| 444 | template <ASTTableJoin::Strictness STRICTNESS, typename Maps> | 
|---|
| 445 | void insertFromBlockImpl( | 
|---|
| 446 | Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, | 
|---|
| 447 | const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) | 
|---|
| 448 | { | 
|---|
| 449 | switch (type) | 
|---|
| 450 | { | 
|---|
| 451 | case Join::Type::EMPTY:            break; | 
|---|
| 452 | case Join::Type::CROSS:            break;    /// Do nothing. We have already saved block, and it is enough. | 
|---|
| 453 |  | 
|---|
| 454 | #define M(TYPE) \ | 
|---|
| 455 | case Join::Type::TYPE: \ | 
|---|
| 456 | insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\ | 
|---|
| 457 | join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \ | 
|---|
| 458 | break; | 
|---|
| 459 | APPLY_FOR_JOIN_VARIANTS(M) | 
|---|
| 460 | #undef M | 
|---|
| 461 | } | 
|---|
| 462 | } | 
|---|
| 463 | } | 
|---|
| 464 |  | 
|---|
| 465 | void Join::initRequiredRightKeys() | 
|---|
| 466 | { | 
|---|
| 467 | const Names & left_keys = table_join->keyNamesLeft(); | 
|---|
| 468 | const Names & right_keys = table_join->keyNamesRight(); | 
|---|
| 469 | NameSet required_keys(table_join->requiredRightKeys().begin(), table_join->requiredRightKeys().end()); | 
|---|
| 470 |  | 
|---|
| 471 | for (size_t i = 0; i < right_keys.size(); ++i) | 
|---|
| 472 | { | 
|---|
| 473 | const String & right_key_name = right_keys[i]; | 
|---|
| 474 |  | 
|---|
| 475 | if (required_keys.count(right_key_name) && !required_right_keys.has(right_key_name)) | 
|---|
| 476 | { | 
|---|
| 477 | const auto & right_key = right_table_keys.getByName(right_key_name); | 
|---|
| 478 | required_right_keys.insert(right_key); | 
|---|
| 479 | required_right_keys_sources.push_back(left_keys[i]); | 
|---|
| 480 | } | 
|---|
| 481 | } | 
|---|
| 482 | } | 
|---|
| 483 |  | 
|---|
| 484 | void Join::initRightBlockStructure() | 
|---|
| 485 | { | 
|---|
| 486 | auto & saved_block_sample = data->sample_block; | 
|---|
| 487 |  | 
|---|
| 488 | if (isRightOrFull(kind)) | 
|---|
| 489 | { | 
|---|
| 490 | /// Save keys for NonJoinedBlockInputStream | 
|---|
| 491 | saved_block_sample = right_table_keys.cloneEmpty(); | 
|---|
| 492 | } | 
|---|
| 493 | else if (strictness == ASTTableJoin::Strictness::Asof) | 
|---|
| 494 | { | 
|---|
| 495 | /// Save ASOF key | 
|---|
| 496 | saved_block_sample.insert(right_table_keys.safeGetByPosition(right_table_keys.columns() - 1)); | 
|---|
| 497 | } | 
|---|
| 498 |  | 
|---|
| 499 | /// Save non key columns | 
|---|
| 500 | for (auto & column : sample_block_with_columns_to_add) | 
|---|
| 501 | saved_block_sample.insert(column); | 
|---|
| 502 |  | 
|---|
| 503 | if (nullable_right_side) | 
|---|
| 504 | JoinCommon::convertColumnsToNullable(saved_block_sample, (isFull(kind) ? right_table_keys.columns() : 0)); | 
|---|
| 505 | } | 
|---|
| 506 |  | 
|---|
| 507 | Block Join::structureRightBlock(const Block & block) const | 
|---|
| 508 | { | 
|---|
| 509 | Block structured_block; | 
|---|
| 510 | for (auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) | 
|---|
| 511 | { | 
|---|
| 512 | ColumnWithTypeAndName column = block.getByName(sample_column.name); | 
|---|
| 513 | if (sample_column.column->isNullable()) | 
|---|
| 514 | JoinCommon::convertColumnToNullable(column); | 
|---|
| 515 | structured_block.insert(column); | 
|---|
| 516 | } | 
|---|
| 517 |  | 
|---|
| 518 | return structured_block; | 
|---|
| 519 | } | 
|---|
| 520 |  | 
|---|
| 521 | bool Join::addJoinedBlock(const Block & source_block) | 
|---|
| 522 | { | 
|---|
| 523 | if (empty()) | 
|---|
| 524 | throw Exception( "Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 525 |  | 
|---|
| 526 | /// There's no optimization for right side const columns. Remove constness if any. | 
|---|
| 527 | Block block = materializeBlock(source_block); | 
|---|
| 528 | size_t rows = block.rows(); | 
|---|
| 529 |  | 
|---|
| 530 | ColumnRawPtrs key_columns = JoinCommon::materializeColumnsInplace(block, key_names_right); | 
|---|
| 531 |  | 
|---|
| 532 | /// We will insert to the map only keys, where all components are not NULL. | 
|---|
| 533 | ConstNullMapPtr null_map{}; | 
|---|
| 534 | ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); | 
|---|
| 535 |  | 
|---|
| 536 | /// If RIGHT or FULL save blocks with nulls for NonJoinedBlockInputStream | 
|---|
| 537 | UInt8 save_nullmap = 0; | 
|---|
| 538 | if (isRightOrFull(kind) && null_map) | 
|---|
| 539 | { | 
|---|
| 540 | for (size_t i = 0; !save_nullmap && i < null_map->size(); ++i) | 
|---|
| 541 | save_nullmap |= (*null_map)[i]; | 
|---|
| 542 | } | 
|---|
| 543 |  | 
|---|
| 544 | Block structured_block = structureRightBlock(block); | 
|---|
| 545 | size_t total_rows = 0; | 
|---|
| 546 | size_t total_bytes = 0; | 
|---|
| 547 |  | 
|---|
| 548 | { | 
|---|
| 549 | std::unique_lock lock(data->rwlock); | 
|---|
| 550 |  | 
|---|
| 551 | data->blocks.emplace_back(std::move(structured_block)); | 
|---|
| 552 | Block * stored_block = &data->blocks.back(); | 
|---|
| 553 |  | 
|---|
| 554 | if (rows) | 
|---|
| 555 | data->empty = false; | 
|---|
| 556 |  | 
|---|
| 557 | if (kind != ASTTableJoin::Kind::Cross) | 
|---|
| 558 | { | 
|---|
| 559 | joinDispatch(kind, strictness, data->maps, [&](auto, auto strictness_, auto & map) | 
|---|
| 560 | { | 
|---|
| 561 | insertFromBlockImpl<strictness_>(*this, data->type, map, rows, key_columns, key_sizes, stored_block, null_map, data->pool); | 
|---|
| 562 | }); | 
|---|
| 563 | } | 
|---|
| 564 |  | 
|---|
| 565 | if (save_nullmap) | 
|---|
| 566 | data->blocks_nullmaps.emplace_back(stored_block, null_map_holder); | 
|---|
| 567 |  | 
|---|
| 568 | /// TODO: Do not calculate them every time | 
|---|
| 569 | total_rows = getTotalRowCount(); | 
|---|
| 570 | total_bytes = getTotalByteCount(); | 
|---|
| 571 | } | 
|---|
| 572 |  | 
|---|
| 573 | return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); | 
|---|
| 574 | } | 
|---|
| 575 |  | 
|---|
| 576 |  | 
|---|
| 577 | namespace | 
|---|
| 578 | { | 
|---|
| 579 |  | 
|---|
| 580 | class AddedColumns | 
|---|
| 581 | { | 
|---|
| 582 | public: | 
|---|
| 583 | using TypeAndNames = std::vector<std::pair<decltype(ColumnWithTypeAndName::type), decltype(ColumnWithTypeAndName::name)>>; | 
|---|
| 584 |  | 
|---|
| 585 | AddedColumns(const Block & sample_block_with_columns_to_add, | 
|---|
| 586 | const Block & block_with_columns_to_add, | 
|---|
| 587 | const Block & block, | 
|---|
| 588 | const Block & saved_block_sample, | 
|---|
| 589 | const ColumnsWithTypeAndName & , | 
|---|
| 590 | const Join & join_, | 
|---|
| 591 | const ColumnRawPtrs & key_columns_, | 
|---|
| 592 | const Sizes & key_sizes_) | 
|---|
| 593 | : join(join_) | 
|---|
| 594 | , key_columns(key_columns_) | 
|---|
| 595 | , key_sizes(key_sizes_) | 
|---|
| 596 | , rows_to_add(block.rows()) | 
|---|
| 597 | , need_filter(false) | 
|---|
| 598 | { | 
|---|
| 599 | size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); | 
|---|
| 600 |  | 
|---|
| 601 | columns.reserve(num_columns_to_add); | 
|---|
| 602 | type_name.reserve(num_columns_to_add); | 
|---|
| 603 | right_indexes.reserve(num_columns_to_add); | 
|---|
| 604 |  | 
|---|
| 605 | for (size_t i = 0; i < num_columns_to_add; ++i) | 
|---|
| 606 | { | 
|---|
| 607 | const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i); | 
|---|
| 608 |  | 
|---|
| 609 | /// Don't insert column if it's in left block or not explicitly required. | 
|---|
| 610 | if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name)) | 
|---|
| 611 | addColumn(src_column); | 
|---|
| 612 | } | 
|---|
| 613 |  | 
|---|
| 614 | for (auto &  : extras) | 
|---|
| 615 | addColumn(extra); | 
|---|
| 616 |  | 
|---|
| 617 | for (auto & tn : type_name) | 
|---|
| 618 | right_indexes.push_back(saved_block_sample.getPositionByName(tn.second)); | 
|---|
| 619 | } | 
|---|
| 620 |  | 
|---|
| 621 | size_t size() const { return columns.size(); } | 
|---|
| 622 |  | 
|---|
| 623 | ColumnWithTypeAndName moveColumn(size_t i) | 
|---|
| 624 | { | 
|---|
| 625 | return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].first, type_name[i].second); | 
|---|
| 626 | } | 
|---|
| 627 |  | 
|---|
| 628 | template <bool has_defaults> | 
|---|
| 629 | void appendFromBlock(const Block & block, size_t row_num) | 
|---|
| 630 | { | 
|---|
| 631 | if constexpr (has_defaults) | 
|---|
| 632 | applyLazyDefaults(); | 
|---|
| 633 |  | 
|---|
| 634 | for (size_t j = 0; j < right_indexes.size(); ++j) | 
|---|
| 635 | columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); | 
|---|
| 636 | } | 
|---|
| 637 |  | 
|---|
| 638 | void appendDefaultRow() | 
|---|
| 639 | { | 
|---|
| 640 | ++lazy_defaults_count; | 
|---|
| 641 | } | 
|---|
| 642 |  | 
|---|
| 643 | void applyLazyDefaults() | 
|---|
| 644 | { | 
|---|
| 645 | if (lazy_defaults_count) | 
|---|
| 646 | { | 
|---|
| 647 | for (size_t j = 0; j < right_indexes.size(); ++j) | 
|---|
| 648 | columns[j]->insertManyDefaults(lazy_defaults_count); | 
|---|
| 649 | lazy_defaults_count = 0; | 
|---|
| 650 | } | 
|---|
| 651 | } | 
|---|
| 652 |  | 
|---|
| 653 | const Join & join; | 
|---|
| 654 | const ColumnRawPtrs & key_columns; | 
|---|
| 655 | const Sizes & key_sizes; | 
|---|
| 656 | size_t rows_to_add; | 
|---|
| 657 | std::unique_ptr<IColumn::Offsets> offsets_to_replicate; | 
|---|
| 658 | bool need_filter; | 
|---|
| 659 |  | 
|---|
| 660 | private: | 
|---|
| 661 | TypeAndNames type_name; | 
|---|
| 662 | MutableColumns columns; | 
|---|
| 663 | std::vector<size_t> right_indexes; | 
|---|
| 664 | size_t lazy_defaults_count = 0; | 
|---|
| 665 |  | 
|---|
| 666 | void addColumn(const ColumnWithTypeAndName & src_column) | 
|---|
| 667 | { | 
|---|
| 668 | columns.push_back(src_column.column->cloneEmpty()); | 
|---|
| 669 | columns.back()->reserve(src_column.column->size()); | 
|---|
| 670 | type_name.emplace_back(src_column.type, src_column.name); | 
|---|
| 671 | } | 
|---|
| 672 | }; | 
|---|
| 673 |  | 
|---|
| 674 | template <typename Map, bool add_missing> | 
|---|
| 675 | void addFoundRowAll(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset) | 
|---|
| 676 | { | 
|---|
| 677 | if constexpr (add_missing) | 
|---|
| 678 | added.applyLazyDefaults(); | 
|---|
| 679 |  | 
|---|
| 680 | for (auto it = mapped.begin(); it.ok(); ++it) | 
|---|
| 681 | { | 
|---|
| 682 | added.appendFromBlock<false>(*it->block, it->row_num); | 
|---|
| 683 | ++current_offset; | 
|---|
| 684 | } | 
|---|
| 685 | }; | 
|---|
| 686 |  | 
|---|
| 687 | template <bool add_missing, bool need_offset> | 
|---|
| 688 | void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]]) | 
|---|
| 689 | { | 
|---|
| 690 | if constexpr (add_missing) | 
|---|
| 691 | { | 
|---|
| 692 | added.appendDefaultRow(); | 
|---|
| 693 | if constexpr (need_offset) | 
|---|
| 694 | ++current_offset; | 
|---|
| 695 | } | 
|---|
| 696 | } | 
|---|
| 697 |  | 
|---|
| 698 | template <bool need_filter> | 
|---|
| 699 | void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]) | 
|---|
| 700 | { | 
|---|
| 701 | if constexpr (need_filter) | 
|---|
| 702 | filter[pos] = 1; | 
|---|
| 703 | } | 
|---|
| 704 |  | 
|---|
| 705 |  | 
|---|
| 706 | /// Joins right table columns which indexes are present in right_indexes using specified map. | 
|---|
| 707 | /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). | 
|---|
| 708 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool has_null_map> | 
|---|
| 709 | NO_INLINE IColumn::Filter joinRightColumns(const Map & map, AddedColumns & added_columns, const ConstNullMapPtr & null_map [[maybe_unused]]) | 
|---|
| 710 | { | 
|---|
| 711 | constexpr bool is_any_join = STRICTNESS == ASTTableJoin::Strictness::Any; | 
|---|
| 712 | constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All; | 
|---|
| 713 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; | 
|---|
| 714 | constexpr bool is_semi_join = STRICTNESS == ASTTableJoin::Strictness::Semi; | 
|---|
| 715 | constexpr bool is_anti_join = STRICTNESS == ASTTableJoin::Strictness::Anti; | 
|---|
| 716 | constexpr bool left = KIND == ASTTableJoin::Kind::Left; | 
|---|
| 717 | constexpr bool right = KIND == ASTTableJoin::Kind::Right; | 
|---|
| 718 | constexpr bool full = KIND == ASTTableJoin::Kind::Full; | 
|---|
| 719 |  | 
|---|
| 720 | constexpr bool add_missing = (left || full) && !is_semi_join; | 
|---|
| 721 | constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right); | 
|---|
| 722 |  | 
|---|
| 723 | size_t rows = added_columns.rows_to_add; | 
|---|
| 724 | IColumn::Filter filter; | 
|---|
| 725 | if constexpr (need_filter) | 
|---|
| 726 | filter = IColumn::Filter(rows, 0); | 
|---|
| 727 |  | 
|---|
| 728 | Arena pool; | 
|---|
| 729 |  | 
|---|
| 730 | if constexpr (need_replication) | 
|---|
| 731 | added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows); | 
|---|
| 732 |  | 
|---|
| 733 | const IColumn * asof_column [[maybe_unused]] = nullptr; | 
|---|
| 734 | if constexpr (is_asof_join) | 
|---|
| 735 | asof_column = extractAsofColumn(added_columns.key_columns); | 
|---|
| 736 |  | 
|---|
| 737 | auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(added_columns.key_columns, added_columns.key_sizes); | 
|---|
| 738 |  | 
|---|
| 739 | IColumn::Offset current_offset = 0; | 
|---|
| 740 |  | 
|---|
| 741 | for (size_t i = 0; i < rows; ++i) | 
|---|
| 742 | { | 
|---|
| 743 | if constexpr (has_null_map) | 
|---|
| 744 | { | 
|---|
| 745 | if ((*null_map)[i]) | 
|---|
| 746 | { | 
|---|
| 747 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); | 
|---|
| 748 |  | 
|---|
| 749 | if constexpr (need_replication) | 
|---|
| 750 | (*added_columns.offsets_to_replicate)[i] = current_offset; | 
|---|
| 751 | continue; | 
|---|
| 752 | } | 
|---|
| 753 | } | 
|---|
| 754 |  | 
|---|
| 755 | auto find_result = key_getter.findKey(map, i, pool); | 
|---|
| 756 |  | 
|---|
| 757 | if (find_result.isFound()) | 
|---|
| 758 | { | 
|---|
| 759 | auto & mapped = find_result.getMapped(); | 
|---|
| 760 |  | 
|---|
| 761 | if constexpr (is_asof_join) | 
|---|
| 762 | { | 
|---|
| 763 | const Join & join = added_columns.join; | 
|---|
| 764 | if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofInequality(), asof_column, i)) | 
|---|
| 765 | { | 
|---|
| 766 | setUsed<need_filter>(filter, i); | 
|---|
| 767 | mapped.setUsed(); | 
|---|
| 768 | added_columns.appendFromBlock<add_missing>(*found->block, found->row_num); | 
|---|
| 769 | } | 
|---|
| 770 | else | 
|---|
| 771 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); | 
|---|
| 772 | } | 
|---|
| 773 | else if constexpr (is_all_join) | 
|---|
| 774 | { | 
|---|
| 775 | setUsed<need_filter>(filter, i); | 
|---|
| 776 | mapped.setUsed(); | 
|---|
| 777 | addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); | 
|---|
| 778 | } | 
|---|
| 779 | else if constexpr ((is_any_join || is_semi_join) && right) | 
|---|
| 780 | { | 
|---|
| 781 | /// Use first appered left key + it needs left columns replication | 
|---|
| 782 | if (mapped.setUsedOnce()) | 
|---|
| 783 | { | 
|---|
| 784 | setUsed<need_filter>(filter, i); | 
|---|
| 785 | addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); | 
|---|
| 786 | } | 
|---|
| 787 | } | 
|---|
| 788 | else if constexpr (is_any_join && KIND == ASTTableJoin::Kind::Inner) | 
|---|
| 789 | { | 
|---|
| 790 | /// Use first appered left key only | 
|---|
| 791 | if (mapped.setUsedOnce()) | 
|---|
| 792 | { | 
|---|
| 793 | setUsed<need_filter>(filter, i); | 
|---|
| 794 | added_columns.appendFromBlock<add_missing>(*mapped.block, mapped.row_num); | 
|---|
| 795 | } | 
|---|
| 796 | } | 
|---|
| 797 | else if constexpr (is_any_join && full) | 
|---|
| 798 | { | 
|---|
| 799 | /// TODO | 
|---|
| 800 | } | 
|---|
| 801 | else if constexpr (is_anti_join) | 
|---|
| 802 | { | 
|---|
| 803 | if constexpr (right) | 
|---|
| 804 | mapped.setUsed(); | 
|---|
| 805 | } | 
|---|
| 806 | else /// ANY LEFT, SEMI LEFT, old ANY (RightAny) | 
|---|
| 807 | { | 
|---|
| 808 | setUsed<need_filter>(filter, i); | 
|---|
| 809 | mapped.setUsed(); | 
|---|
| 810 | added_columns.appendFromBlock<add_missing>(*mapped.block, mapped.row_num); | 
|---|
| 811 | } | 
|---|
| 812 | } | 
|---|
| 813 | else | 
|---|
| 814 | { | 
|---|
| 815 | if constexpr (is_anti_join && left) | 
|---|
| 816 | setUsed<need_filter>(filter, i); | 
|---|
| 817 | addNotFoundRow<add_missing, need_replication>(added_columns, current_offset); | 
|---|
| 818 | } | 
|---|
| 819 |  | 
|---|
| 820 | if constexpr (need_replication) | 
|---|
| 821 | (*added_columns.offsets_to_replicate)[i] = current_offset; | 
|---|
| 822 | } | 
|---|
| 823 |  | 
|---|
| 824 | added_columns.applyLazyDefaults(); | 
|---|
| 825 | return filter; | 
|---|
| 826 | } | 
|---|
| 827 |  | 
|---|
| 828 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> | 
|---|
| 829 | IColumn::Filter joinRightColumnsSwitchNullability(const Map & map, AddedColumns & added_columns, const ConstNullMapPtr & null_map) | 
|---|
| 830 | { | 
|---|
| 831 | if (added_columns.need_filter) | 
|---|
| 832 | { | 
|---|
| 833 | if (null_map) | 
|---|
| 834 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, true, true>(map, added_columns, null_map); | 
|---|
| 835 | else | 
|---|
| 836 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, true, false>(map, added_columns, nullptr); | 
|---|
| 837 | } | 
|---|
| 838 | else | 
|---|
| 839 | { | 
|---|
| 840 | if (null_map) | 
|---|
| 841 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, false, true>(map, added_columns, null_map); | 
|---|
| 842 | else | 
|---|
| 843 | return joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, false, false>(map, added_columns, nullptr); | 
|---|
| 844 | } | 
|---|
| 845 | } | 
|---|
| 846 |  | 
|---|
| 847 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> | 
|---|
| 848 | IColumn::Filter switchJoinRightColumns(const Maps & maps_, AddedColumns & added_columns, Join::Type type, const ConstNullMapPtr & null_map) | 
|---|
| 849 | { | 
|---|
| 850 | switch (type) | 
|---|
| 851 | { | 
|---|
| 852 | #define M(TYPE) \ | 
|---|
| 853 | case Join::Type::TYPE: \ | 
|---|
| 854 | return joinRightColumnsSwitchNullability<KIND, STRICTNESS,\ | 
|---|
| 855 | typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\ | 
|---|
| 856 | *maps_.TYPE, added_columns, null_map);\ | 
|---|
| 857 | break; | 
|---|
| 858 | APPLY_FOR_JOIN_VARIANTS(M) | 
|---|
| 859 | #undef M | 
|---|
| 860 |  | 
|---|
| 861 | default: | 
|---|
| 862 | throw Exception( "Unsupported JOIN keys. Type: "+ toString(static_cast<UInt32>(type)), ErrorCodes::UNSUPPORTED_JOIN_KEYS); | 
|---|
| 863 | } | 
|---|
| 864 | } | 
|---|
| 865 |  | 
|---|
| 866 | } /// nameless | 
|---|
| 867 |  | 
|---|
| 868 |  | 
|---|
| 869 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> | 
|---|
| 870 | void Join::joinBlockImpl( | 
|---|
| 871 | Block & block, | 
|---|
| 872 | const Names & key_names_left, | 
|---|
| 873 | const Block & block_with_columns_to_add, | 
|---|
| 874 | const Maps & maps_) const | 
|---|
| 875 | { | 
|---|
| 876 | constexpr bool is_any_join = STRICTNESS == ASTTableJoin::Strictness::Any; | 
|---|
| 877 | constexpr bool is_all_join = STRICTNESS == ASTTableJoin::Strictness::All; | 
|---|
| 878 | constexpr bool is_asof_join = STRICTNESS == ASTTableJoin::Strictness::Asof; | 
|---|
| 879 | constexpr bool is_semi_join = STRICTNESS == ASTTableJoin::Strictness::Semi; | 
|---|
| 880 | constexpr bool is_anti_join = STRICTNESS == ASTTableJoin::Strictness::Anti; | 
|---|
| 881 |  | 
|---|
| 882 | constexpr bool left = KIND == ASTTableJoin::Kind::Left; | 
|---|
| 883 | constexpr bool right = KIND == ASTTableJoin::Kind::Right; | 
|---|
| 884 | constexpr bool inner = KIND == ASTTableJoin::Kind::Inner; | 
|---|
| 885 | constexpr bool full = KIND == ASTTableJoin::Kind::Full; | 
|---|
| 886 |  | 
|---|
| 887 | constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right); | 
|---|
| 888 | constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left)); | 
|---|
| 889 |  | 
|---|
| 890 | /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them. | 
|---|
| 891 | Columns materialized_keys = JoinCommon::materializeColumns(block, key_names_left); | 
|---|
| 892 | ColumnRawPtrs key_columns = JoinCommon::getRawPointers(materialized_keys); | 
|---|
| 893 |  | 
|---|
| 894 | /// Keys with NULL value in any column won't join to anything. | 
|---|
| 895 | ConstNullMapPtr null_map{}; | 
|---|
| 896 | ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); | 
|---|
| 897 |  | 
|---|
| 898 | size_t existing_columns = block.columns(); | 
|---|
| 899 |  | 
|---|
| 900 | /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. | 
|---|
| 901 | * Because if they are constants, then in the "not joined" rows, they may have different values | 
|---|
| 902 | *  - default values, which can differ from the values of these constants. | 
|---|
| 903 | */ | 
|---|
| 904 | if constexpr (right || full) | 
|---|
| 905 | { | 
|---|
| 906 | materializeBlockInplace(block); | 
|---|
| 907 |  | 
|---|
| 908 | if (nullable_left_side) | 
|---|
| 909 | JoinCommon::convertColumnsToNullable(block); | 
|---|
| 910 | } | 
|---|
| 911 |  | 
|---|
| 912 | /** For LEFT/INNER JOIN, the saved blocks do not contain keys. | 
|---|
| 913 | * For FULL/RIGHT JOIN, the saved blocks contain keys; | 
|---|
| 914 | *  but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. | 
|---|
| 915 | * For ASOF, the last column is used as the ASOF column | 
|---|
| 916 | */ | 
|---|
| 917 | ColumnsWithTypeAndName ; | 
|---|
| 918 | if constexpr (is_asof_join) | 
|---|
| 919 | extras.push_back(right_table_keys.getByName(key_names_right.back())); | 
|---|
| 920 |  | 
|---|
| 921 | AddedColumns added_columns(sample_block_with_columns_to_add, block_with_columns_to_add, block, savedBlockSample(), | 
|---|
| 922 | extras, *this, key_columns, key_sizes); | 
|---|
| 923 | bool has_required_right_keys = (required_right_keys.columns() != 0); | 
|---|
| 924 | added_columns.need_filter = need_filter || has_required_right_keys; | 
|---|
| 925 |  | 
|---|
| 926 | IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, null_map); | 
|---|
| 927 |  | 
|---|
| 928 | for (size_t i = 0; i < added_columns.size(); ++i) | 
|---|
| 929 | block.insert(added_columns.moveColumn(i)); | 
|---|
| 930 |  | 
|---|
| 931 | std::vector<size_t> right_keys_to_replicate [[maybe_unused]]; | 
|---|
| 932 |  | 
|---|
| 933 | if constexpr (need_filter) | 
|---|
| 934 | { | 
|---|
| 935 | /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. | 
|---|
| 936 | for (size_t i = 0; i < existing_columns; ++i) | 
|---|
| 937 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(row_filter, -1); | 
|---|
| 938 |  | 
|---|
| 939 | /// Add join key columns from right block if needed. | 
|---|
| 940 | for (size_t i = 0; i < required_right_keys.columns(); ++i) | 
|---|
| 941 | { | 
|---|
| 942 | const auto & right_key = required_right_keys.getByPosition(i); | 
|---|
| 943 | const auto & left_name = required_right_keys_sources[i]; | 
|---|
| 944 |  | 
|---|
| 945 | const auto & col = block.getByName(left_name); | 
|---|
| 946 | bool is_nullable = nullable_right_side || right_key.type->isNullable(); | 
|---|
| 947 | block.insert(correctNullability({col.column, col.type, right_key.name}, is_nullable)); | 
|---|
| 948 | } | 
|---|
| 949 | } | 
|---|
| 950 | else if (has_required_right_keys) | 
|---|
| 951 | { | 
|---|
| 952 | /// Some trash to represent IColumn::Filter as ColumnUInt8 needed for ColumnNullable::applyNullMap() | 
|---|
| 953 | auto null_map_filter_ptr = ColumnUInt8::create(); | 
|---|
| 954 | ColumnUInt8 & null_map_filter = assert_cast<ColumnUInt8 &>(*null_map_filter_ptr); | 
|---|
| 955 | null_map_filter.getData().swap(row_filter); | 
|---|
| 956 | const IColumn::Filter & filter = null_map_filter.getData(); | 
|---|
| 957 |  | 
|---|
| 958 | /// Add join key columns from right block if needed. | 
|---|
| 959 | for (size_t i = 0; i < required_right_keys.columns(); ++i) | 
|---|
| 960 | { | 
|---|
| 961 | const auto & right_key = required_right_keys.getByPosition(i); | 
|---|
| 962 | const auto & left_name = required_right_keys_sources[i]; | 
|---|
| 963 |  | 
|---|
| 964 | const auto & col = block.getByName(left_name); | 
|---|
| 965 | bool is_nullable = nullable_right_side || right_key.type->isNullable(); | 
|---|
| 966 |  | 
|---|
| 967 | ColumnPtr thin_column = filterWithBlanks(col.column, filter); | 
|---|
| 968 | block.insert(correctNullability({thin_column, col.type, right_key.name}, is_nullable, null_map_filter)); | 
|---|
| 969 |  | 
|---|
| 970 | if constexpr (need_replication) | 
|---|
| 971 | right_keys_to_replicate.push_back(block.getPositionByName(right_key.name)); | 
|---|
| 972 | } | 
|---|
| 973 | } | 
|---|
| 974 |  | 
|---|
| 975 | if constexpr (need_replication) | 
|---|
| 976 | { | 
|---|
| 977 | std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate; | 
|---|
| 978 |  | 
|---|
| 979 | /// If ALL ... JOIN - we replicate all the columns except the new ones. | 
|---|
| 980 | for (size_t i = 0; i < existing_columns; ++i) | 
|---|
| 981 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate); | 
|---|
| 982 |  | 
|---|
| 983 | /// Replicate additional right keys | 
|---|
| 984 | for (size_t pos : right_keys_to_replicate) | 
|---|
| 985 | block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate); | 
|---|
| 986 | } | 
|---|
| 987 | } | 
|---|
| 988 |  | 
|---|
| 989 |  | 
|---|
| 990 | void Join::joinBlockImplCross(Block & block) const | 
|---|
| 991 | { | 
|---|
| 992 | /// Add new columns to the block. | 
|---|
| 993 | size_t num_existing_columns = block.columns(); | 
|---|
| 994 | size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); | 
|---|
| 995 |  | 
|---|
| 996 | size_t rows_left = block.rows(); | 
|---|
| 997 |  | 
|---|
| 998 | ColumnRawPtrs src_left_columns(num_existing_columns); | 
|---|
| 999 | MutableColumns dst_columns(num_existing_columns + num_columns_to_add); | 
|---|
| 1000 |  | 
|---|
| 1001 | for (size_t i = 0; i < num_existing_columns; ++i) | 
|---|
| 1002 | { | 
|---|
| 1003 | src_left_columns[i] = block.getByPosition(i).column.get(); | 
|---|
| 1004 | dst_columns[i] = src_left_columns[i]->cloneEmpty(); | 
|---|
| 1005 | } | 
|---|
| 1006 |  | 
|---|
| 1007 | for (size_t i = 0; i < num_columns_to_add; ++i) | 
|---|
| 1008 | { | 
|---|
| 1009 | const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); | 
|---|
| 1010 | dst_columns[num_existing_columns + i] = src_column.column->cloneEmpty(); | 
|---|
| 1011 | block.insert(src_column); | 
|---|
| 1012 | } | 
|---|
| 1013 |  | 
|---|
| 1014 | /// NOTE It would be better to use `reserve`, as well as `replicate` methods to duplicate the values of the left block. | 
|---|
| 1015 |  | 
|---|
| 1016 | for (size_t i = 0; i < rows_left; ++i) | 
|---|
| 1017 | { | 
|---|
| 1018 | for (const Block & block_right : data->blocks) | 
|---|
| 1019 | { | 
|---|
| 1020 | size_t rows_right = block_right.rows(); | 
|---|
| 1021 |  | 
|---|
| 1022 | for (size_t col_num = 0; col_num < num_existing_columns; ++col_num) | 
|---|
| 1023 | for (size_t j = 0; j < rows_right; ++j) | 
|---|
| 1024 | dst_columns[col_num]->insertFrom(*src_left_columns[col_num], i); | 
|---|
| 1025 |  | 
|---|
| 1026 | for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num) | 
|---|
| 1027 | { | 
|---|
| 1028 | const IColumn * column_right = block_right.getByPosition(col_num).column.get(); | 
|---|
| 1029 |  | 
|---|
| 1030 | for (size_t j = 0; j < rows_right; ++j) | 
|---|
| 1031 | dst_columns[num_existing_columns + col_num]->insertFrom(*column_right, j); | 
|---|
| 1032 | } | 
|---|
| 1033 | } | 
|---|
| 1034 | } | 
|---|
| 1035 |  | 
|---|
| 1036 | block = block.cloneWithColumns(std::move(dst_columns)); | 
|---|
| 1037 | } | 
|---|
| 1038 |  | 
|---|
| 1039 | static void checkTypeOfKey(const Block & block_left, const Block & block_right) | 
|---|
| 1040 | { | 
|---|
| 1041 | auto & [c1, left_type_origin, left_name] = block_left.safeGetByPosition(0); | 
|---|
| 1042 | auto & [c2, right_type_origin, right_name] = block_right.safeGetByPosition(0); | 
|---|
| 1043 | auto left_type = removeNullable(left_type_origin); | 
|---|
| 1044 | auto right_type = removeNullable(right_type_origin); | 
|---|
| 1045 |  | 
|---|
| 1046 | if (!left_type->equals(*right_type)) | 
|---|
| 1047 | throw Exception( "Type mismatch of columns to joinGet by: " | 
|---|
| 1048 | + left_name + " "+ left_type->getName() + " at left, " | 
|---|
| 1049 | + right_name + " "+ right_type->getName() + " at right", | 
|---|
| 1050 | ErrorCodes::TYPE_MISMATCH); | 
|---|
| 1051 | } | 
|---|
| 1052 |  | 
|---|
| 1053 |  | 
|---|
| 1054 | DataTypePtr Join::joinGetReturnType(const String & column_name) const | 
|---|
| 1055 | { | 
|---|
| 1056 | std::shared_lock lock(data->rwlock); | 
|---|
| 1057 |  | 
|---|
| 1058 | if (!sample_block_with_columns_to_add.has(column_name)) | 
|---|
| 1059 | throw Exception( "StorageJoin doesn't contain column "+ column_name, ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1060 | return sample_block_with_columns_to_add.getByName(column_name).type; | 
|---|
| 1061 | } | 
|---|
| 1062 |  | 
|---|
| 1063 |  | 
|---|
| 1064 | template <typename Maps> | 
|---|
| 1065 | void Join::joinGetImpl(Block & block, const String & column_name, const Maps & maps_) const | 
|---|
| 1066 | { | 
|---|
| 1067 | joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::RightAny>( | 
|---|
| 1068 | block, {block.getByPosition(0).name}, {sample_block_with_columns_to_add.getByName(column_name)}, maps_); | 
|---|
| 1069 | } | 
|---|
| 1070 |  | 
|---|
| 1071 |  | 
|---|
| 1072 | // TODO: support composite key | 
|---|
| 1073 | // TODO: return multiple columns as named tuple | 
|---|
| 1074 | // TODO: return array of values when strictness == ASTTableJoin::Strictness::All | 
|---|
| 1075 | void Join::joinGet(Block & block, const String & column_name) const | 
|---|
| 1076 | { | 
|---|
| 1077 | std::shared_lock lock(data->rwlock); | 
|---|
| 1078 |  | 
|---|
| 1079 | if (key_names_right.size() != 1) | 
|---|
| 1080 | throw Exception( "joinGet only supports StorageJoin containing exactly one key", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1081 |  | 
|---|
| 1082 | checkTypeOfKey(block, right_table_keys); | 
|---|
| 1083 |  | 
|---|
| 1084 | if ((strictness == ASTTableJoin::Strictness::Any || strictness == ASTTableJoin::Strictness::RightAny) && | 
|---|
| 1085 | kind == ASTTableJoin::Kind::Left) | 
|---|
| 1086 | { | 
|---|
| 1087 | joinGetImpl(block, column_name, std::get<MapsOne>(data->maps)); | 
|---|
| 1088 | } | 
|---|
| 1089 | else | 
|---|
| 1090 | throw Exception( "joinGet only supports StorageJoin of type Left Any", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1091 | } | 
|---|
| 1092 |  | 
|---|
| 1093 |  | 
|---|
| 1094 | void Join::joinBlock(Block & block) | 
|---|
| 1095 | { | 
|---|
| 1096 | std::shared_lock lock(data->rwlock); | 
|---|
| 1097 |  | 
|---|
| 1098 | const Names & key_names_left = table_join->keyNamesLeft(); | 
|---|
| 1099 | JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right); | 
|---|
| 1100 |  | 
|---|
| 1101 | if (joinDispatch(kind, strictness, data->maps, [&](auto kind_, auto strictness_, auto & map) | 
|---|
| 1102 | { | 
|---|
| 1103 | joinBlockImpl<kind_, strictness_>(block, key_names_left, sample_block_with_columns_to_add, map); | 
|---|
| 1104 | })) | 
|---|
| 1105 | { | 
|---|
| 1106 | /// Joined | 
|---|
| 1107 | } | 
|---|
| 1108 | else if (kind == ASTTableJoin::Kind::Cross) | 
|---|
| 1109 | joinBlockImplCross(block); | 
|---|
| 1110 | else | 
|---|
| 1111 | throw Exception( "Logical error: unknown combination of JOIN", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1112 | } | 
|---|
| 1113 |  | 
|---|
| 1114 |  | 
|---|
| 1115 | void Join::joinTotals(Block & block) const | 
|---|
| 1116 | { | 
|---|
| 1117 | JoinCommon::joinTotals(totals, sample_block_with_columns_to_add, key_names_right, block); | 
|---|
| 1118 | } | 
|---|
| 1119 |  | 
|---|
| 1120 |  | 
|---|
| 1121 | template <typename Mapped> | 
|---|
| 1122 | struct AdderNonJoined | 
|---|
| 1123 | { | 
|---|
| 1124 | static void add(const Mapped & mapped, size_t & rows_added, MutableColumns & columns_right) | 
|---|
| 1125 | { | 
|---|
| 1126 | constexpr bool mapped_asof = std::is_same_v<Mapped, JoinStuff::MappedAsof>; | 
|---|
| 1127 | [[maybe_unused]] constexpr bool mapped_one = std::is_same_v<Mapped, JoinStuff::MappedOne> || std::is_same_v<Mapped, JoinStuff::MappedOneFlagged>; | 
|---|
| 1128 |  | 
|---|
| 1129 | if constexpr (mapped_asof) | 
|---|
| 1130 | { | 
|---|
| 1131 | /// Do nothing | 
|---|
| 1132 | } | 
|---|
| 1133 | else if constexpr (mapped_one) | 
|---|
| 1134 | { | 
|---|
| 1135 | for (size_t j = 0; j < columns_right.size(); ++j) | 
|---|
| 1136 | { | 
|---|
| 1137 | const auto & mapped_column = mapped.block->getByPosition(j).column; | 
|---|
| 1138 | columns_right[j]->insertFrom(*mapped_column, mapped.row_num); | 
|---|
| 1139 | } | 
|---|
| 1140 |  | 
|---|
| 1141 | ++rows_added; | 
|---|
| 1142 | } | 
|---|
| 1143 | else | 
|---|
| 1144 | { | 
|---|
| 1145 | for (auto it = mapped.begin(); it.ok(); ++it) | 
|---|
| 1146 | { | 
|---|
| 1147 | for (size_t j = 0; j < columns_right.size(); ++j) | 
|---|
| 1148 | { | 
|---|
| 1149 | const auto & mapped_column = it->block->getByPosition(j).column; | 
|---|
| 1150 | columns_right[j]->insertFrom(*mapped_column, it->row_num); | 
|---|
| 1151 | } | 
|---|
| 1152 |  | 
|---|
| 1153 | ++rows_added; | 
|---|
| 1154 | } | 
|---|
| 1155 | } | 
|---|
| 1156 | } | 
|---|
| 1157 | }; | 
|---|
| 1158 |  | 
|---|
| 1159 |  | 
|---|
| 1160 | /// Stream from not joined earlier rows of the right table. | 
|---|
| 1161 | class NonJoinedBlockInputStream : public IBlockInputStream | 
|---|
| 1162 | { | 
|---|
| 1163 | public: | 
|---|
| 1164 | NonJoinedBlockInputStream(const Join & parent_, const Block & result_sample_block_, UInt64 max_block_size_) | 
|---|
| 1165 | : parent(parent_) | 
|---|
| 1166 | , max_block_size(max_block_size_) | 
|---|
| 1167 | , result_sample_block(materializeBlock(result_sample_block_)) | 
|---|
| 1168 | { | 
|---|
| 1169 | bool remap_keys = parent.table_join->hasUsing(); | 
|---|
| 1170 | std::unordered_map<size_t, size_t> left_to_right_key_remap; | 
|---|
| 1171 |  | 
|---|
| 1172 | for (size_t i = 0; i < parent.table_join->keyNamesLeft().size(); ++i) | 
|---|
| 1173 | { | 
|---|
| 1174 | const String & left_key_name = parent.table_join->keyNamesLeft()[i]; | 
|---|
| 1175 | const String & right_key_name = parent.table_join->keyNamesRight()[i]; | 
|---|
| 1176 |  | 
|---|
| 1177 | size_t left_key_pos = result_sample_block.getPositionByName(left_key_name); | 
|---|
| 1178 | size_t right_key_pos = parent.savedBlockSample().getPositionByName(right_key_name); | 
|---|
| 1179 |  | 
|---|
| 1180 | if (remap_keys && !parent.required_right_keys.has(right_key_name)) | 
|---|
| 1181 | left_to_right_key_remap[left_key_pos] = right_key_pos; | 
|---|
| 1182 | } | 
|---|
| 1183 |  | 
|---|
| 1184 | /// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys | 
|---|
| 1185 | size_t left_columns_count = result_sample_block.columns() - | 
|---|
| 1186 | parent.sample_block_with_columns_to_add.columns() - parent.required_right_keys.columns(); | 
|---|
| 1187 |  | 
|---|
| 1188 | for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos) | 
|---|
| 1189 | { | 
|---|
| 1190 | /// We need right 'x' for 'RIGHT JOIN ... USING(x)'. | 
|---|
| 1191 | if (left_to_right_key_remap.count(left_pos)) | 
|---|
| 1192 | { | 
|---|
| 1193 | size_t right_key_pos = left_to_right_key_remap[left_pos]; | 
|---|
| 1194 | setRightIndex(right_key_pos, left_pos); | 
|---|
| 1195 | } | 
|---|
| 1196 | else | 
|---|
| 1197 | column_indices_left.emplace_back(left_pos); | 
|---|
| 1198 | } | 
|---|
| 1199 |  | 
|---|
| 1200 | const auto & saved_block_sample = parent.savedBlockSample(); | 
|---|
| 1201 | for (size_t right_pos = 0; right_pos < saved_block_sample.columns(); ++right_pos) | 
|---|
| 1202 | { | 
|---|
| 1203 | const String & name = saved_block_sample.getByPosition(right_pos).name; | 
|---|
| 1204 | if (!result_sample_block.has(name)) | 
|---|
| 1205 | continue; | 
|---|
| 1206 |  | 
|---|
| 1207 | size_t result_position = result_sample_block.getPositionByName(name); | 
|---|
| 1208 |  | 
|---|
| 1209 | /// Don't remap left keys twice. We need only qualified right keys here | 
|---|
| 1210 | if (result_position < left_columns_count) | 
|---|
| 1211 | continue; | 
|---|
| 1212 |  | 
|---|
| 1213 | setRightIndex(right_pos, result_position); | 
|---|
| 1214 | } | 
|---|
| 1215 |  | 
|---|
| 1216 | if (column_indices_left.size() + column_indices_right.size() + same_result_keys.size() != result_sample_block.columns()) | 
|---|
| 1217 | throw Exception( "Error in columns mapping in RIGHT|FULL JOIN. Left: "+ toString(column_indices_left.size()) + | 
|---|
| 1218 | ", right: "+ toString(column_indices_right.size()) + | 
|---|
| 1219 | ", same: "+ toString(same_result_keys.size()) + | 
|---|
| 1220 | ", result: "+ toString(result_sample_block.columns()), | 
|---|
| 1221 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1222 | } | 
|---|
| 1223 |  | 
|---|
| 1224 | String getName() const override { return "NonJoined"; } | 
|---|
| 1225 |  | 
|---|
| 1226 | Block () const override { return result_sample_block; } | 
|---|
| 1227 |  | 
|---|
| 1228 |  | 
|---|
| 1229 | protected: | 
|---|
| 1230 | Block readImpl() override | 
|---|
| 1231 | { | 
|---|
| 1232 | if (parent.data->blocks.empty()) | 
|---|
| 1233 | return Block(); | 
|---|
| 1234 | return createBlock(); | 
|---|
| 1235 | } | 
|---|
| 1236 |  | 
|---|
| 1237 | private: | 
|---|
| 1238 | const Join & parent; | 
|---|
| 1239 | UInt64 max_block_size; | 
|---|
| 1240 |  | 
|---|
| 1241 | Block result_sample_block; | 
|---|
| 1242 | /// Indices of columns in result_sample_block that should be generated | 
|---|
| 1243 | std::vector<size_t> column_indices_left; | 
|---|
| 1244 | /// Indices of columns that come from the right-side table: right_pos -> result_pos | 
|---|
| 1245 | std::unordered_map<size_t, size_t> column_indices_right; | 
|---|
| 1246 | /// | 
|---|
| 1247 | std::unordered_map<size_t, size_t> same_result_keys; | 
|---|
| 1248 | /// Which right columns (saved in parent) need nullability change before placing them in result block | 
|---|
| 1249 | std::vector<size_t> right_nullability_changes; | 
|---|
| 1250 |  | 
|---|
| 1251 | std::any position; | 
|---|
| 1252 | std::optional<Join::BlockNullmapList::const_iterator> nulls_position; | 
|---|
| 1253 |  | 
|---|
| 1254 | void setRightIndex(size_t right_pos, size_t result_position) | 
|---|
| 1255 | { | 
|---|
| 1256 | if (!column_indices_right.count(right_pos)) | 
|---|
| 1257 | { | 
|---|
| 1258 | column_indices_right[right_pos] = result_position; | 
|---|
| 1259 |  | 
|---|
| 1260 | if (hasNullabilityChange(right_pos, result_position)) | 
|---|
| 1261 | right_nullability_changes.push_back(right_pos); | 
|---|
| 1262 | } | 
|---|
| 1263 | else | 
|---|
| 1264 | same_result_keys[result_position] = column_indices_right[right_pos]; | 
|---|
| 1265 | } | 
|---|
| 1266 |  | 
|---|
| 1267 | bool hasNullabilityChange(size_t right_pos, size_t result_pos) const | 
|---|
| 1268 | { | 
|---|
| 1269 | const auto & src = parent.savedBlockSample().getByPosition(right_pos).column; | 
|---|
| 1270 | const auto & dst = result_sample_block.getByPosition(result_pos).column; | 
|---|
| 1271 | return src->isNullable() != dst->isNullable(); | 
|---|
| 1272 | } | 
|---|
| 1273 |  | 
|---|
| 1274 | Block createBlock() | 
|---|
| 1275 | { | 
|---|
| 1276 | MutableColumns columns_right = parent.savedBlockSample().cloneEmptyColumns(); | 
|---|
| 1277 |  | 
|---|
| 1278 | size_t rows_added = 0; | 
|---|
| 1279 |  | 
|---|
| 1280 | auto fill_callback = [&](auto, auto strictness, auto & map) | 
|---|
| 1281 | { | 
|---|
| 1282 | rows_added = fillColumnsFromMap<strictness>(map, columns_right); | 
|---|
| 1283 | }; | 
|---|
| 1284 |  | 
|---|
| 1285 | if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, fill_callback)) | 
|---|
| 1286 | throw Exception( "Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1287 |  | 
|---|
| 1288 | fillNullsFromBlocks(columns_right, rows_added); | 
|---|
| 1289 |  | 
|---|
| 1290 | if (!rows_added) | 
|---|
| 1291 | return {}; | 
|---|
| 1292 |  | 
|---|
| 1293 | for (size_t pos : right_nullability_changes) | 
|---|
| 1294 | changeNullability(columns_right[pos]); | 
|---|
| 1295 |  | 
|---|
| 1296 | Block res = result_sample_block.cloneEmpty(); | 
|---|
| 1297 |  | 
|---|
| 1298 | /// @note it's possible to make ColumnConst here and materialize it later | 
|---|
| 1299 | for (size_t pos : column_indices_left) | 
|---|
| 1300 | res.getByPosition(pos).column = res.getByPosition(pos).column->cloneResized(rows_added); | 
|---|
| 1301 |  | 
|---|
| 1302 | for (auto & pr : column_indices_right) | 
|---|
| 1303 | { | 
|---|
| 1304 | auto & right_column = columns_right[pr.first]; | 
|---|
| 1305 | auto & result_column = res.getByPosition(pr.second).column; | 
|---|
| 1306 | #ifndef NDEBUG | 
|---|
| 1307 | if (result_column->getName() != right_column->getName()) | 
|---|
| 1308 | throw Exception( "Wrong columns assign in RIGHT|FULL JOIN: "+ result_column->getName() + | 
|---|
| 1309 | " "+ right_column->getName(), ErrorCodes::LOGICAL_ERROR); | 
|---|
| 1310 | #endif | 
|---|
| 1311 | result_column = std::move(right_column); | 
|---|
| 1312 | } | 
|---|
| 1313 |  | 
|---|
| 1314 | for (auto & pr : same_result_keys) | 
|---|
| 1315 | { | 
|---|
| 1316 | auto & src_column = res.getByPosition(pr.second).column; | 
|---|
| 1317 | auto & dst_column = res.getByPosition(pr.first).column; | 
|---|
| 1318 |  | 
|---|
| 1319 | if (src_column->isNullable() && !dst_column->isNullable()) | 
|---|
| 1320 | { | 
|---|
| 1321 | auto * nullable = checkAndGetColumn<ColumnNullable>(*src_column); | 
|---|
| 1322 | dst_column = nullable->getNestedColumnPtr(); | 
|---|
| 1323 | } | 
|---|
| 1324 | else if (!src_column->isNullable() && dst_column->isNullable()) | 
|---|
| 1325 | dst_column = makeNullable(src_column); | 
|---|
| 1326 | else | 
|---|
| 1327 | dst_column = src_column; | 
|---|
| 1328 | } | 
|---|
| 1329 |  | 
|---|
| 1330 | return res; | 
|---|
| 1331 | } | 
|---|
| 1332 |  | 
|---|
| 1333 | template <ASTTableJoin::Strictness STRICTNESS, typename Maps> | 
|---|
| 1334 | size_t fillColumnsFromMap(const Maps & maps, MutableColumns & columns_keys_and_right) | 
|---|
| 1335 | { | 
|---|
| 1336 | switch (parent.data->type) | 
|---|
| 1337 | { | 
|---|
| 1338 | #define M(TYPE) \ | 
|---|
| 1339 | case Join::Type::TYPE: \ | 
|---|
| 1340 | return fillColumns<STRICTNESS>(*maps.TYPE, columns_keys_and_right); | 
|---|
| 1341 | APPLY_FOR_JOIN_VARIANTS(M) | 
|---|
| 1342 | #undef M | 
|---|
| 1343 | default: | 
|---|
| 1344 | throw Exception( "Unsupported JOIN keys. Type: "+ toString(static_cast<UInt32>(parent.data->type)), | 
|---|
| 1345 | ErrorCodes::UNSUPPORTED_JOIN_KEYS); | 
|---|
| 1346 | } | 
|---|
| 1347 |  | 
|---|
| 1348 | __builtin_unreachable(); | 
|---|
| 1349 | } | 
|---|
| 1350 |  | 
|---|
| 1351 | template <ASTTableJoin::Strictness STRICTNESS, typename Map> | 
|---|
| 1352 | size_t fillColumns(const Map & map, MutableColumns & columns_keys_and_right) | 
|---|
| 1353 | { | 
|---|
| 1354 | using Mapped = typename Map::mapped_type; | 
|---|
| 1355 | using Iterator = typename Map::const_iterator; | 
|---|
| 1356 |  | 
|---|
| 1357 | size_t rows_added = 0; | 
|---|
| 1358 |  | 
|---|
| 1359 | if (!position.has_value()) | 
|---|
| 1360 | position = std::make_any<Iterator>(map.begin()); | 
|---|
| 1361 |  | 
|---|
| 1362 | Iterator & it = std::any_cast<Iterator &>(position); | 
|---|
| 1363 | auto end = map.end(); | 
|---|
| 1364 |  | 
|---|
| 1365 | for (; it != end; ++it) | 
|---|
| 1366 | { | 
|---|
| 1367 | const Mapped & mapped = it->getMapped(); | 
|---|
| 1368 |  | 
|---|
| 1369 | if (mapped.getUsed()) | 
|---|
| 1370 | continue; | 
|---|
| 1371 |  | 
|---|
| 1372 | AdderNonJoined<Mapped>::add(mapped, rows_added, columns_keys_and_right); | 
|---|
| 1373 |  | 
|---|
| 1374 | if (rows_added >= max_block_size) | 
|---|
| 1375 | { | 
|---|
| 1376 | ++it; | 
|---|
| 1377 | break; | 
|---|
| 1378 | } | 
|---|
| 1379 | } | 
|---|
| 1380 |  | 
|---|
| 1381 | return rows_added; | 
|---|
| 1382 | } | 
|---|
| 1383 |  | 
|---|
| 1384 | void fillNullsFromBlocks(MutableColumns & columns_keys_and_right, size_t & rows_added) | 
|---|
| 1385 | { | 
|---|
| 1386 | if (!nulls_position.has_value()) | 
|---|
| 1387 | nulls_position = parent.data->blocks_nullmaps.begin(); | 
|---|
| 1388 |  | 
|---|
| 1389 | auto end = parent.data->blocks_nullmaps.end(); | 
|---|
| 1390 |  | 
|---|
| 1391 | for (auto & it = *nulls_position; it != end && rows_added < max_block_size; ++it) | 
|---|
| 1392 | { | 
|---|
| 1393 | const Block * block = it->first; | 
|---|
| 1394 | const NullMap & nullmap = assert_cast<const ColumnUInt8 &>(*it->second).getData(); | 
|---|
| 1395 |  | 
|---|
| 1396 | for (size_t row = 0; row < nullmap.size(); ++row) | 
|---|
| 1397 | { | 
|---|
| 1398 | if (nullmap[row]) | 
|---|
| 1399 | { | 
|---|
| 1400 | for (size_t col = 0; col < columns_keys_and_right.size(); ++col) | 
|---|
| 1401 | columns_keys_and_right[col]->insertFrom(*block->getByPosition(col).column, row); | 
|---|
| 1402 | ++rows_added; | 
|---|
| 1403 | } | 
|---|
| 1404 | } | 
|---|
| 1405 | } | 
|---|
| 1406 | } | 
|---|
| 1407 | }; | 
|---|
| 1408 |  | 
|---|
| 1409 |  | 
|---|
| 1410 | BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const | 
|---|
| 1411 | { | 
|---|
| 1412 | if (table_join->strictness() == ASTTableJoin::Strictness::Asof || | 
|---|
| 1413 | table_join->strictness() == ASTTableJoin::Strictness::Semi) | 
|---|
| 1414 | return {}; | 
|---|
| 1415 |  | 
|---|
| 1416 | if (isRightOrFull(table_join->kind())) | 
|---|
| 1417 | return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size); | 
|---|
| 1418 | return {}; | 
|---|
| 1419 | } | 
|---|
| 1420 |  | 
|---|
| 1421 |  | 
|---|
| 1422 | bool Join::hasStreamWithNonJoinedRows() const | 
|---|
| 1423 | { | 
|---|
| 1424 | if (table_join->strictness() == ASTTableJoin::Strictness::Asof || | 
|---|
| 1425 | table_join->strictness() == ASTTableJoin::Strictness::Semi) | 
|---|
| 1426 | return false; | 
|---|
| 1427 |  | 
|---|
| 1428 | return isRightOrFull(table_join->kind()); | 
|---|
| 1429 | } | 
|---|
| 1430 |  | 
|---|
| 1431 | } | 
|---|
| 1432 |  | 
|---|