| 1 | #include <Storages/StorageJoin.h> |
| 2 | #include <Storages/StorageFactory.h> |
| 3 | #include <Interpreters/Join.h> |
| 4 | #include <Parsers/ASTCreateQuery.h> |
| 5 | #include <Parsers/ASTSetQuery.h> |
| 6 | #include <Parsers/ASTIdentifier.h> |
| 7 | #include <Core/ColumnNumbers.h> |
| 8 | #include <DataStreams/IBlockInputStream.h> |
| 9 | #include <DataTypes/NestedUtils.h> |
| 10 | #include <Interpreters/joinDispatch.h> |
| 11 | #include <Interpreters/AnalyzedJoin.h> |
| 12 | #include <Common/assert_cast.h> |
| 13 | #include <Common/quoteString.h> |
| 14 | |
| 15 | #include <Poco/String.h> /// toLower |
| 16 | #include <Poco/File.h> |
| 17 | |
| 18 | |
| 19 | namespace DB |
| 20 | { |
| 21 | |
| 22 | namespace ErrorCodes |
| 23 | { |
| 24 | extern const int UNSUPPORTED_JOIN_KEYS; |
| 25 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
| 26 | extern const int INCOMPATIBLE_TYPE_OF_JOIN; |
| 27 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 28 | extern const int BAD_ARGUMENTS; |
| 29 | } |
| 30 | |
| 31 | StorageJoin::StorageJoin( |
| 32 | const String & relative_path_, |
| 33 | const String & database_name_, |
| 34 | const String & table_name_, |
| 35 | const Names & key_names_, |
| 36 | bool use_nulls_, |
| 37 | SizeLimits limits_, |
| 38 | ASTTableJoin::Kind kind_, |
| 39 | ASTTableJoin::Strictness strictness_, |
| 40 | const ColumnsDescription & columns_, |
| 41 | const ConstraintsDescription & constraints_, |
| 42 | bool overwrite, |
| 43 | const Context & context_) |
| 44 | : StorageSetOrJoinBase{relative_path_, database_name_, table_name_, columns_, constraints_, context_} |
| 45 | , key_names(key_names_) |
| 46 | , use_nulls(use_nulls_) |
| 47 | , limits(limits_) |
| 48 | , kind(kind_) |
| 49 | , strictness(strictness_) |
| 50 | { |
| 51 | for (const auto & key : key_names) |
| 52 | if (!getColumns().hasPhysical(key)) |
| 53 | throw Exception{"Key column (" + key + ") does not exist in table declaration." , ErrorCodes::NO_SUCH_COLUMN_IN_TABLE}; |
| 54 | |
| 55 | table_join = std::make_shared<AnalyzedJoin>(limits, use_nulls, kind, strictness, key_names); |
| 56 | join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns(), overwrite); |
| 57 | restore(); |
| 58 | } |
| 59 | |
| 60 | |
| 61 | void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
| 62 | { |
| 63 | Poco::File(path).remove(true); |
| 64 | Poco::File(path).createDirectories(); |
| 65 | Poco::File(path + "tmp/" ).createDirectories(); |
| 66 | |
| 67 | increment = 0; |
| 68 | join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns()); |
| 69 | } |
| 70 | |
| 71 | |
| 72 | HashJoinPtr StorageJoin::getJoin(std::shared_ptr<AnalyzedJoin> analyzed_join) const |
| 73 | { |
| 74 | if (kind != analyzed_join->kind() || strictness != analyzed_join->strictness()) |
| 75 | throw Exception("Table " + backQuote(table_name) + " has incompatible type of JOIN." , ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN); |
| 76 | |
| 77 | if ((analyzed_join->forceNullableRight() && !use_nulls) || |
| 78 | (!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls)) |
| 79 | throw Exception("Table " + backQuote(table_name) + " needs the same join_use_nulls setting as present in LEFT or FULL JOIN." , |
| 80 | ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN); |
| 81 | |
| 82 | /// TODO: check key columns |
| 83 | |
| 84 | /// Some HACK to remove wrong names qualifiers: table.column -> column. |
| 85 | analyzed_join->setRightKeys(key_names); |
| 86 | |
| 87 | HashJoinPtr join_clone = std::make_shared<Join>(analyzed_join, getSampleBlock().sortColumns()); |
| 88 | join_clone->reuseJoinedData(*join); |
| 89 | return join_clone; |
| 90 | } |
| 91 | |
| 92 | |
| 93 | void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block); } |
| 94 | size_t StorageJoin::getSize() const { return join->getTotalRowCount(); } |
| 95 | |
| 96 | |
| 97 | void registerStorageJoin(StorageFactory & factory) |
| 98 | { |
| 99 | factory.registerStorage("Join" , [](const StorageFactory::Arguments & args) |
| 100 | { |
| 101 | /// Join(ANY, LEFT, k1, k2, ...) |
| 102 | |
| 103 | ASTs & engine_args = args.engine_args; |
| 104 | |
| 105 | auto & settings = args.context.getSettingsRef(); |
| 106 | |
| 107 | auto join_use_nulls = settings.join_use_nulls; |
| 108 | auto max_rows_in_join = settings.max_rows_in_join; |
| 109 | auto max_bytes_in_join = settings.max_bytes_in_join; |
| 110 | auto join_overflow_mode = settings.join_overflow_mode; |
| 111 | auto join_any_take_last_row = settings.join_any_take_last_row; |
| 112 | auto old_any_join = settings.any_join_distinct_right_table_keys; |
| 113 | |
| 114 | if (args.storage_def && args.storage_def->settings) |
| 115 | { |
| 116 | for (const auto & setting : args.storage_def->settings->changes) |
| 117 | { |
| 118 | if (setting.name == "join_use_nulls" ) |
| 119 | join_use_nulls.set(setting.value); |
| 120 | else if (setting.name == "max_rows_in_join" ) |
| 121 | max_rows_in_join.set(setting.value); |
| 122 | else if (setting.name == "max_bytes_in_join" ) |
| 123 | max_bytes_in_join.set(setting.value); |
| 124 | else if (setting.name == "join_overflow_mode" ) |
| 125 | join_overflow_mode.set(setting.value); |
| 126 | else if (setting.name == "join_any_take_last_row" ) |
| 127 | join_any_take_last_row.set(setting.value); |
| 128 | else if (setting.name == "any_join_distinct_right_table_keys" ) |
| 129 | old_any_join.set(setting.value); |
| 130 | else |
| 131 | throw Exception( |
| 132 | "Unknown setting " + setting.name + " for storage " + args.engine_name, |
| 133 | ErrorCodes::BAD_ARGUMENTS); |
| 134 | } |
| 135 | } |
| 136 | |
| 137 | if (engine_args.size() < 3) |
| 138 | throw Exception( |
| 139 | "Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...)." , |
| 140 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 141 | |
| 142 | ASTTableJoin::Strictness strictness = ASTTableJoin::Strictness::Unspecified; |
| 143 | ASTTableJoin::Kind kind = ASTTableJoin::Kind::Comma; |
| 144 | |
| 145 | if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0])) |
| 146 | { |
| 147 | const String strictness_str = Poco::toLower(*opt_strictness_id); |
| 148 | |
| 149 | if (strictness_str == "any" ) |
| 150 | { |
| 151 | if (old_any_join) |
| 152 | strictness = ASTTableJoin::Strictness::RightAny; |
| 153 | else |
| 154 | strictness = ASTTableJoin::Strictness::Any; |
| 155 | } |
| 156 | else if (strictness_str == "all" ) |
| 157 | strictness = ASTTableJoin::Strictness::All; |
| 158 | else if (strictness_str == "semi" ) |
| 159 | strictness = ASTTableJoin::Strictness::Semi; |
| 160 | else if (strictness_str == "anti" ) |
| 161 | strictness = ASTTableJoin::Strictness::Anti; |
| 162 | } |
| 163 | |
| 164 | if (strictness == ASTTableJoin::Strictness::Unspecified) |
| 165 | throw Exception("First parameter of storage Join must be ANY or ALL or SEMI or ANTI (without quotes)." , |
| 166 | ErrorCodes::BAD_ARGUMENTS); |
| 167 | |
| 168 | if (auto opt_kind_id = tryGetIdentifierName(engine_args[1])) |
| 169 | { |
| 170 | const String kind_str = Poco::toLower(*opt_kind_id); |
| 171 | |
| 172 | if (kind_str == "left" ) |
| 173 | kind = ASTTableJoin::Kind::Left; |
| 174 | else if (kind_str == "inner" ) |
| 175 | kind = ASTTableJoin::Kind::Inner; |
| 176 | else if (kind_str == "right" ) |
| 177 | kind = ASTTableJoin::Kind::Right; |
| 178 | else if (kind_str == "full" ) |
| 179 | { |
| 180 | if (strictness == ASTTableJoin::Strictness::Any) |
| 181 | strictness = ASTTableJoin::Strictness::RightAny; |
| 182 | kind = ASTTableJoin::Kind::Full; |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | if (kind == ASTTableJoin::Kind::Comma) |
| 187 | throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes)." , |
| 188 | ErrorCodes::BAD_ARGUMENTS); |
| 189 | |
| 190 | Names key_names; |
| 191 | key_names.reserve(engine_args.size() - 2); |
| 192 | for (size_t i = 2, size = engine_args.size(); i < size; ++i) |
| 193 | { |
| 194 | auto opt_key = tryGetIdentifierName(engine_args[i]); |
| 195 | if (!opt_key) |
| 196 | throw Exception("Parameter â„–" + toString(i + 1) + " of storage Join don't look like column name." , ErrorCodes::BAD_ARGUMENTS); |
| 197 | |
| 198 | key_names.push_back(*opt_key); |
| 199 | } |
| 200 | |
| 201 | return StorageJoin::create( |
| 202 | args.relative_data_path, |
| 203 | args.database_name, |
| 204 | args.table_name, |
| 205 | key_names, |
| 206 | join_use_nulls, |
| 207 | SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode}, |
| 208 | kind, |
| 209 | strictness, |
| 210 | args.columns, |
| 211 | args.constraints, |
| 212 | join_any_take_last_row, |
| 213 | args.context); |
| 214 | }); |
| 215 | } |
| 216 | |
| 217 | template <typename T> |
| 218 | static const char * rawData(T & t) |
| 219 | { |
| 220 | return reinterpret_cast<const char *>(&t); |
| 221 | } |
| 222 | template <typename T> |
| 223 | static size_t rawSize(T &) |
| 224 | { |
| 225 | return sizeof(T); |
| 226 | } |
| 227 | template <> |
| 228 | const char * rawData(const StringRef & t) |
| 229 | { |
| 230 | return t.data; |
| 231 | } |
| 232 | template <> |
| 233 | size_t rawSize(const StringRef & t) |
| 234 | { |
| 235 | return t.size; |
| 236 | } |
| 237 | |
| 238 | class JoinBlockInputStream : public IBlockInputStream |
| 239 | { |
| 240 | public: |
| 241 | JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_) |
| 242 | : parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_)) |
| 243 | { |
| 244 | columns.resize(sample_block.columns()); |
| 245 | column_indices.resize(sample_block.columns()); |
| 246 | column_with_null.resize(sample_block.columns()); |
| 247 | for (size_t i = 0; i < sample_block.columns(); ++i) |
| 248 | { |
| 249 | auto & [_, type, name] = sample_block.getByPosition(i); |
| 250 | if (parent.right_table_keys.has(name)) |
| 251 | { |
| 252 | key_pos = i; |
| 253 | column_with_null[i] = parent.right_table_keys.getByName(name).type->isNullable(); |
| 254 | } |
| 255 | else |
| 256 | { |
| 257 | auto pos = parent.sample_block_with_columns_to_add.getPositionByName(name); |
| 258 | column_indices[i] = pos; |
| 259 | column_with_null[i] = !parent.sample_block_with_columns_to_add.getByPosition(pos).type->equals(*type); |
| 260 | } |
| 261 | } |
| 262 | } |
| 263 | |
| 264 | String getName() const override { return "Join" ; } |
| 265 | |
| 266 | Block () const override { return sample_block; } |
| 267 | |
| 268 | |
| 269 | protected: |
| 270 | Block readImpl() override |
| 271 | { |
| 272 | if (parent.data->blocks.empty()) |
| 273 | return Block(); |
| 274 | |
| 275 | Block block; |
| 276 | if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, |
| 277 | [&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); })) |
| 278 | throw Exception("Logical error: unknown JOIN strictness" , ErrorCodes::LOGICAL_ERROR); |
| 279 | return block; |
| 280 | } |
| 281 | |
| 282 | private: |
| 283 | const Join & parent; |
| 284 | std::shared_lock<std::shared_mutex> lock; |
| 285 | UInt64 max_block_size; |
| 286 | Block sample_block; |
| 287 | |
| 288 | ColumnNumbers column_indices; |
| 289 | std::vector<bool> column_with_null; |
| 290 | std::optional<size_t> key_pos; |
| 291 | MutableColumns columns; |
| 292 | |
| 293 | std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure |
| 294 | |
| 295 | |
| 296 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> |
| 297 | Block createBlock(const Maps & maps) |
| 298 | { |
| 299 | for (size_t i = 0; i < sample_block.columns(); ++i) |
| 300 | { |
| 301 | const auto & src_col = sample_block.safeGetByPosition(i); |
| 302 | columns[i] = src_col.type->createColumn(); |
| 303 | if (column_with_null[i]) |
| 304 | { |
| 305 | if (key_pos == i) |
| 306 | { |
| 307 | // unwrap null key column |
| 308 | ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*columns[i]); |
| 309 | columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable(); |
| 310 | } |
| 311 | else |
| 312 | // wrap non key column with null |
| 313 | columns[i] = makeNullable(std::move(columns[i]))->assumeMutable(); |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | size_t rows_added = 0; |
| 318 | |
| 319 | switch (parent.data->type) |
| 320 | { |
| 321 | #define M(TYPE) \ |
| 322 | case Join::Type::TYPE: \ |
| 323 | rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE); \ |
| 324 | break; |
| 325 | APPLY_FOR_JOIN_VARIANTS_LIMITED(M) |
| 326 | #undef M |
| 327 | |
| 328 | default: |
| 329 | throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(parent.data->type)), |
| 330 | ErrorCodes::UNSUPPORTED_JOIN_KEYS); |
| 331 | } |
| 332 | |
| 333 | if (!rows_added) |
| 334 | return {}; |
| 335 | |
| 336 | Block res = sample_block.cloneEmpty(); |
| 337 | for (size_t i = 0; i < columns.size(); ++i) |
| 338 | if (column_with_null[i]) |
| 339 | { |
| 340 | if (key_pos == i) |
| 341 | res.getByPosition(i).column = makeNullable(std::move(columns[i])); |
| 342 | else |
| 343 | { |
| 344 | const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]); |
| 345 | res.getByPosition(i).column = nullable_col.getNestedColumnPtr(); |
| 346 | } |
| 347 | } |
| 348 | else |
| 349 | res.getByPosition(i).column = std::move(columns[i]); |
| 350 | |
| 351 | return res; |
| 352 | } |
| 353 | |
| 354 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map> |
| 355 | size_t fillColumns(const Map & map) |
| 356 | { |
| 357 | size_t rows_added = 0; |
| 358 | |
| 359 | if (!position) |
| 360 | position = decltype(position)( |
| 361 | static_cast<void *>(new typename Map::const_iterator(map.begin())), |
| 362 | [](void * ptr) { delete reinterpret_cast<typename Map::const_iterator *>(ptr); }); |
| 363 | |
| 364 | auto & it = *reinterpret_cast<typename Map::const_iterator *>(position.get()); |
| 365 | auto end = map.end(); |
| 366 | |
| 367 | for (; it != end; ++it) |
| 368 | { |
| 369 | if constexpr (STRICTNESS == ASTTableJoin::Strictness::RightAny) |
| 370 | { |
| 371 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
| 372 | } |
| 373 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::All) |
| 374 | { |
| 375 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
| 376 | } |
| 377 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) |
| 378 | { |
| 379 | if constexpr (KIND == ASTTableJoin::Kind::Left || KIND == ASTTableJoin::Kind::Inner) |
| 380 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
| 381 | else if constexpr (KIND == ASTTableJoin::Kind::Right) |
| 382 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
| 383 | } |
| 384 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Semi) |
| 385 | { |
| 386 | if constexpr (KIND == ASTTableJoin::Kind::Left) |
| 387 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
| 388 | else if constexpr (KIND == ASTTableJoin::Kind::Right) |
| 389 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
| 390 | } |
| 391 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Anti) |
| 392 | { |
| 393 | if constexpr (KIND == ASTTableJoin::Kind::Left) |
| 394 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
| 395 | else if constexpr (KIND == ASTTableJoin::Kind::Right) |
| 396 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
| 397 | } |
| 398 | else |
| 399 | throw Exception("This JOIN is not implemented yet" , ErrorCodes::NOT_IMPLEMENTED); |
| 400 | |
| 401 | if (rows_added >= max_block_size) |
| 402 | { |
| 403 | ++it; |
| 404 | break; |
| 405 | } |
| 406 | } |
| 407 | |
| 408 | return rows_added; |
| 409 | } |
| 410 | |
| 411 | template <typename Map> |
| 412 | static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it, |
| 413 | const std::optional<size_t> & key_pos, size_t & rows_added) |
| 414 | { |
| 415 | for (size_t j = 0; j < columns.size(); ++j) |
| 416 | if (j == key_pos) |
| 417 | columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey())); |
| 418 | else |
| 419 | columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num); |
| 420 | ++rows_added; |
| 421 | } |
| 422 | |
| 423 | template <typename Map> |
| 424 | static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it, |
| 425 | const std::optional<size_t> & key_pos, size_t & rows_added) |
| 426 | { |
| 427 | for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it) |
| 428 | { |
| 429 | for (size_t j = 0; j < columns.size(); ++j) |
| 430 | if (j == key_pos) |
| 431 | columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey())); |
| 432 | else |
| 433 | columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num); |
| 434 | ++rows_added; |
| 435 | } |
| 436 | } |
| 437 | }; |
| 438 | |
| 439 | |
| 440 | // TODO: multiple stream read and index read |
| 441 | BlockInputStreams StorageJoin::read( |
| 442 | const Names & column_names, |
| 443 | const SelectQueryInfo & /*query_info*/, |
| 444 | const Context & /*context*/, |
| 445 | QueryProcessingStage::Enum /*processed_stage*/, |
| 446 | size_t max_block_size, |
| 447 | unsigned /*num_streams*/) |
| 448 | { |
| 449 | check(column_names); |
| 450 | return {std::make_shared<JoinBlockInputStream>(*join, max_block_size, getSampleBlockForColumns(column_names))}; |
| 451 | } |
| 452 | |
| 453 | } |
| 454 | |