| 1 | #include <DataStreams/AddingConstColumnBlockInputStream.h> |
| 2 | #include <DataStreams/narrowBlockInputStreams.h> |
| 3 | #include <DataStreams/LazyBlockInputStream.h> |
| 4 | #include <DataStreams/NullBlockInputStream.h> |
| 5 | #include <DataStreams/ConvertingBlockInputStream.h> |
| 6 | #include <DataStreams/OneBlockInputStream.h> |
| 7 | #include <DataStreams/ConcatBlockInputStream.h> |
| 8 | #include <DataStreams/materializeBlock.h> |
| 9 | #include <DataStreams/MaterializingBlockInputStream.h> |
| 10 | #include <DataStreams/FilterBlockInputStream.h> |
| 11 | #include <Storages/StorageMerge.h> |
| 12 | #include <Storages/StorageFactory.h> |
| 13 | #include <Storages/VirtualColumnUtils.h> |
| 14 | #include <Storages/AlterCommands.h> |
| 15 | #include <Interpreters/InterpreterAlterQuery.h> |
| 16 | #include <Interpreters/SyntaxAnalyzer.h> |
| 17 | #include <Interpreters/ExpressionActions.h> |
| 18 | #include <Interpreters/evaluateConstantExpression.h> |
| 19 | #include <Interpreters/InterpreterSelectQuery.h> |
| 20 | #include <Parsers/ASTSelectQuery.h> |
| 21 | #include <Parsers/ASTLiteral.h> |
| 22 | #include <Parsers/ASTExpressionList.h> |
| 23 | #include <DataTypes/DataTypeString.h> |
| 24 | #include <Columns/ColumnString.h> |
| 25 | #include <Common/typeid_cast.h> |
| 26 | #include <Common/checkStackSize.h> |
| 27 | #include <Databases/IDatabase.h> |
| 28 | #include <ext/range.h> |
| 29 | #include <algorithm> |
| 30 | #include <Parsers/ASTFunction.h> |
| 31 | #include <Parsers/queryToString.h> |
| 32 | |
| 33 | |
| 34 | namespace DB |
| 35 | { |
| 36 | |
| 37 | namespace ErrorCodes |
| 38 | { |
| 39 | extern const int ILLEGAL_PREWHERE; |
| 40 | extern const int INCOMPATIBLE_SOURCE_TABLES; |
| 41 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 42 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
| 43 | extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; |
| 44 | extern const int SAMPLING_NOT_SUPPORTED; |
| 45 | } |
| 46 | |
| 47 | |
| 48 | StorageMerge::StorageMerge( |
| 49 | const std::string & database_name_, |
| 50 | const std::string & table_name_, |
| 51 | const ColumnsDescription & columns_, |
| 52 | const String & source_database_, |
| 53 | const String & table_name_regexp_, |
| 54 | const Context & context_) |
| 55 | : IStorage(ColumnsDescription({{"_table" , std::make_shared<DataTypeString>()}}, true)) |
| 56 | , table_name(table_name_) |
| 57 | , database_name(database_name_) |
| 58 | , source_database(source_database_) |
| 59 | , table_name_regexp(table_name_regexp_) |
| 60 | , global_context(context_) |
| 61 | { |
| 62 | setColumns(columns_); |
| 63 | } |
| 64 | |
| 65 | |
| 66 | /// NOTE: structure of underlying tables as well as their set are not constant, |
| 67 | /// so the results of these methods may become obsolete after the call. |
| 68 | |
| 69 | NameAndTypePair StorageMerge::getColumn(const String & column_name) const |
| 70 | { |
| 71 | if (!IStorage::hasColumn(column_name)) |
| 72 | { |
| 73 | auto first_table = getFirstTable([](auto &&) { return true; }); |
| 74 | if (first_table) |
| 75 | return first_table->getColumn(column_name); |
| 76 | } |
| 77 | |
| 78 | return IStorage::getColumn(column_name); |
| 79 | } |
| 80 | |
| 81 | |
| 82 | bool StorageMerge::hasColumn(const String & column_name) const |
| 83 | { |
| 84 | if (!IStorage::hasColumn(column_name)) |
| 85 | { |
| 86 | auto first_table = getFirstTable([](auto &&) { return true; }); |
| 87 | if (first_table) |
| 88 | return first_table->hasColumn(column_name); |
| 89 | } |
| 90 | |
| 91 | return true; |
| 92 | } |
| 93 | |
| 94 | |
| 95 | template <typename F> |
| 96 | StoragePtr StorageMerge::getFirstTable(F && predicate) const |
| 97 | { |
| 98 | auto iterator = getDatabaseIterator(global_context); |
| 99 | |
| 100 | while (iterator->isValid()) |
| 101 | { |
| 102 | auto & table = iterator->table(); |
| 103 | if (table.get() != this && predicate(table)) |
| 104 | return table; |
| 105 | |
| 106 | iterator->next(); |
| 107 | } |
| 108 | |
| 109 | return {}; |
| 110 | } |
| 111 | |
| 112 | |
| 113 | bool StorageMerge::isRemote() const |
| 114 | { |
| 115 | auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); }); |
| 116 | return first_remote_table != nullptr; |
| 117 | } |
| 118 | |
| 119 | |
| 120 | bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const |
| 121 | { |
| 122 | /// It's beneficial if it is true for at least one table. |
| 123 | StorageListWithLocks selected_tables = getSelectedTables(query_context.getCurrentQueryId()); |
| 124 | |
| 125 | size_t i = 0; |
| 126 | for (const auto & table : selected_tables) |
| 127 | { |
| 128 | if (table.first->mayBenefitFromIndexForIn(left_in_operand, query_context)) |
| 129 | return true; |
| 130 | |
| 131 | ++i; |
| 132 | /// For simplicity reasons, check only first ten tables. |
| 133 | if (i > 10) |
| 134 | break; |
| 135 | } |
| 136 | |
| 137 | return false; |
| 138 | } |
| 139 | |
| 140 | |
| 141 | QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const |
| 142 | { |
| 143 | auto stage_in_source_tables = QueryProcessingStage::FetchColumns; |
| 144 | |
| 145 | DatabaseTablesIteratorPtr iterator = getDatabaseIterator(context); |
| 146 | |
| 147 | size_t selected_table_size = 0; |
| 148 | |
| 149 | while (iterator->isValid()) |
| 150 | { |
| 151 | auto & table = iterator->table(); |
| 152 | if (table.get() != this) |
| 153 | { |
| 154 | ++selected_table_size; |
| 155 | stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context)); |
| 156 | } |
| 157 | |
| 158 | iterator->next(); |
| 159 | } |
| 160 | |
| 161 | return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState); |
| 162 | } |
| 163 | |
| 164 | |
| 165 | BlockInputStreams StorageMerge::read( |
| 166 | const Names & column_names, |
| 167 | const SelectQueryInfo & query_info, |
| 168 | const Context & context, |
| 169 | QueryProcessingStage::Enum processed_stage, |
| 170 | const size_t max_block_size, |
| 171 | unsigned num_streams) |
| 172 | { |
| 173 | BlockInputStreams res; |
| 174 | |
| 175 | bool has_table_virtual_column = false; |
| 176 | Names real_column_names; |
| 177 | real_column_names.reserve(column_names.size()); |
| 178 | |
| 179 | for (const auto & column_name : column_names) |
| 180 | { |
| 181 | if (column_name == "_table" && isVirtualColumn(column_name)) |
| 182 | has_table_virtual_column = true; |
| 183 | else |
| 184 | real_column_names.push_back(column_name); |
| 185 | } |
| 186 | |
| 187 | /** Just in case, turn off optimization "transfer to PREWHERE", |
| 188 | * since there is no certainty that it works when one of table is MergeTree and other is not. |
| 189 | */ |
| 190 | Context modified_context = context; |
| 191 | modified_context.getSettingsRef().optimize_move_to_prewhere = false; |
| 192 | |
| 193 | /// What will be result structure depending on query processed stage in source tables? |
| 194 | Block = getQueryHeader(column_names, query_info, context, processed_stage); |
| 195 | |
| 196 | /** First we make list of selected tables to find out its size. |
| 197 | * This is necessary to correctly pass the recommended number of threads to each table. |
| 198 | */ |
| 199 | StorageListWithLocks selected_tables = getSelectedTables( |
| 200 | query_info.query, has_table_virtual_column, true, context.getCurrentQueryId()); |
| 201 | |
| 202 | if (selected_tables.empty()) |
| 203 | /// FIXME: do we support sampling in this case? |
| 204 | return createSourceStreams( |
| 205 | query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column); |
| 206 | |
| 207 | size_t tables_count = selected_tables.size(); |
| 208 | Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables))); |
| 209 | num_streams *= num_streams_multiplier; |
| 210 | size_t remaining_streams = num_streams; |
| 211 | |
| 212 | InputSortingInfoPtr input_sorting_info; |
| 213 | if (query_info.order_by_optimizer) |
| 214 | { |
| 215 | for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) |
| 216 | { |
| 217 | auto current_info = query_info.order_by_optimizer->getInputOrder(it->first); |
| 218 | if (it == selected_tables.begin()) |
| 219 | input_sorting_info = current_info; |
| 220 | else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info)) |
| 221 | input_sorting_info.reset(); |
| 222 | |
| 223 | if (!input_sorting_info) |
| 224 | break; |
| 225 | } |
| 226 | |
| 227 | query_info.input_sorting_info = input_sorting_info; |
| 228 | } |
| 229 | |
| 230 | for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) |
| 231 | { |
| 232 | size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); |
| 233 | size_t current_streams = std::min(current_need_streams, remaining_streams); |
| 234 | remaining_streams -= current_streams; |
| 235 | current_streams = std::max(size_t(1), current_streams); |
| 236 | |
| 237 | StoragePtr storage = it->first; |
| 238 | TableStructureReadLockHolder struct_lock = it->second; |
| 239 | |
| 240 | /// If sampling requested, then check that table supports it. |
| 241 | if (query_info.query->as<ASTSelectQuery>()->sample_size() && !storage->supportsSampling()) |
| 242 | throw Exception("Illegal SAMPLE: table doesn't support sampling" , ErrorCodes::SAMPLING_NOT_SUPPORTED); |
| 243 | |
| 244 | BlockInputStreams source_streams; |
| 245 | |
| 246 | if (current_streams) |
| 247 | { |
| 248 | source_streams = createSourceStreams( |
| 249 | query_info, processed_stage, max_block_size, header, storage, |
| 250 | struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column); |
| 251 | } |
| 252 | else |
| 253 | { |
| 254 | source_streams.emplace_back(std::make_shared<LazyBlockInputStream>( |
| 255 | header, [=, this]() mutable -> BlockInputStreamPtr |
| 256 | { |
| 257 | BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size, |
| 258 | header, storage, struct_lock, real_column_names, |
| 259 | modified_context, current_streams, has_table_virtual_column, true); |
| 260 | |
| 261 | if (!streams.empty() && streams.size() != 1) |
| 262 | throw Exception("LogicalError: the lazy stream size must to be one or empty." , ErrorCodes::LOGICAL_ERROR); |
| 263 | |
| 264 | return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0]; |
| 265 | })); |
| 266 | } |
| 267 | |
| 268 | res.insert(res.end(), source_streams.begin(), source_streams.end()); |
| 269 | } |
| 270 | |
| 271 | if (res.empty()) |
| 272 | return res; |
| 273 | |
| 274 | res = narrowBlockInputStreams(res, num_streams); |
| 275 | return res; |
| 276 | } |
| 277 | |
| 278 | BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, |
| 279 | const UInt64 max_block_size, const Block & , const StoragePtr & storage, |
| 280 | const TableStructureReadLockHolder & struct_lock, Names & real_column_names, |
| 281 | Context & modified_context, size_t streams_num, bool has_table_virtual_column, |
| 282 | bool concat_streams) |
| 283 | { |
| 284 | SelectQueryInfo modified_query_info = query_info; |
| 285 | modified_query_info.query = query_info.query->clone(); |
| 286 | |
| 287 | VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table" , storage ? storage->getTableName() : "" ); |
| 288 | |
| 289 | if (!storage) |
| 290 | return BlockInputStreams{ |
| 291 | InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header), |
| 292 | SelectQueryOptions(processed_stage).analyze()).execute().in}; |
| 293 | |
| 294 | BlockInputStreams source_streams; |
| 295 | |
| 296 | if (processed_stage <= storage->getQueryProcessingStage(modified_context)) |
| 297 | { |
| 298 | /// If there are only virtual columns in query, you must request at least one other column. |
| 299 | if (real_column_names.size() ==0) |
| 300 | real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical())); |
| 301 | |
| 302 | source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, |
| 303 | UInt32(streams_num)); |
| 304 | } |
| 305 | else if (processed_stage > storage->getQueryProcessingStage(modified_context)) |
| 306 | { |
| 307 | modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, storage->getTableName()); |
| 308 | |
| 309 | /// Maximum permissible parallelism is streams_num |
| 310 | modified_context.getSettingsRef().max_threads = UInt64(streams_num); |
| 311 | modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1; |
| 312 | |
| 313 | InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)}; |
| 314 | BlockInputStreamPtr interpreter_stream = interpreter.execute().in; |
| 315 | |
| 316 | /** Materialization is needed, since from distributed storage the constants come materialized. |
| 317 | * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, |
| 318 | * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. |
| 319 | */ |
| 320 | source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream)); |
| 321 | } |
| 322 | |
| 323 | if (!source_streams.empty()) |
| 324 | { |
| 325 | if (concat_streams) |
| 326 | { |
| 327 | BlockInputStreamPtr stream = |
| 328 | source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0]; |
| 329 | |
| 330 | source_streams.resize(1); |
| 331 | source_streams[0] = stream; |
| 332 | } |
| 333 | |
| 334 | for (BlockInputStreamPtr & source_stream : source_streams) |
| 335 | { |
| 336 | if (has_table_virtual_column) |
| 337 | source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>( |
| 338 | source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table" ); |
| 339 | |
| 340 | /// Subordinary tables could have different but convertible types, like numeric types of different width. |
| 341 | /// We must return streams with structure equals to structure of Merge table. |
| 342 | convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage); |
| 343 | |
| 344 | source_stream->addTableLock(struct_lock); |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | return source_streams; |
| 349 | } |
| 350 | |
| 351 | |
| 352 | StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id) const |
| 353 | { |
| 354 | StorageListWithLocks selected_tables; |
| 355 | auto iterator = getDatabaseIterator(global_context); |
| 356 | |
| 357 | while (iterator->isValid()) |
| 358 | { |
| 359 | auto & table = iterator->table(); |
| 360 | if (table.get() != this) |
| 361 | selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id)); |
| 362 | |
| 363 | iterator->next(); |
| 364 | } |
| 365 | |
| 366 | return selected_tables; |
| 367 | } |
| 368 | |
| 369 | |
| 370 | StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock, const String & query_id) const |
| 371 | { |
| 372 | StorageListWithLocks selected_tables; |
| 373 | DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context); |
| 374 | |
| 375 | auto virtual_column = ColumnString::create(); |
| 376 | |
| 377 | while (iterator->isValid()) |
| 378 | { |
| 379 | StoragePtr storage = iterator->table(); |
| 380 | |
| 381 | if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere()) |
| 382 | throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE." , ErrorCodes::ILLEGAL_PREWHERE); |
| 383 | |
| 384 | if (storage.get() != this) |
| 385 | { |
| 386 | selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{}); |
| 387 | virtual_column->insert(storage->getTableName()); |
| 388 | } |
| 389 | |
| 390 | iterator->next(); |
| 391 | } |
| 392 | |
| 393 | if (has_virtual_column) |
| 394 | { |
| 395 | Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table" )}; |
| 396 | VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context); |
| 397 | auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table" ); |
| 398 | |
| 399 | /// Remove unused tables from the list |
| 400 | selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); }); |
| 401 | } |
| 402 | |
| 403 | return selected_tables; |
| 404 | } |
| 405 | |
| 406 | |
| 407 | DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & context) const |
| 408 | { |
| 409 | checkStackSize(); |
| 410 | auto database = context.getDatabase(source_database); |
| 411 | auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); }; |
| 412 | return database->getTablesIterator(global_context, table_name_match); |
| 413 | } |
| 414 | |
| 415 | |
| 416 | void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) |
| 417 | { |
| 418 | for (const auto & command : commands) |
| 419 | { |
| 420 | if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN |
| 421 | && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN) |
| 422 | throw Exception( |
| 423 | "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(), |
| 424 | ErrorCodes::NOT_IMPLEMENTED); |
| 425 | } |
| 426 | } |
| 427 | |
| 428 | void StorageMerge::alter( |
| 429 | const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) |
| 430 | { |
| 431 | lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); |
| 432 | |
| 433 | StorageInMemoryMetadata storage_metadata = getInMemoryMetadata(); |
| 434 | params.apply(storage_metadata); |
| 435 | context.getDatabase(database_name)->alterTable(context, table_name, storage_metadata); |
| 436 | setColumns(storage_metadata.columns); |
| 437 | } |
| 438 | |
| 439 | Block StorageMerge::( |
| 440 | const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage) |
| 441 | { |
| 442 | switch (processed_stage) |
| 443 | { |
| 444 | case QueryProcessingStage::FetchColumns: |
| 445 | { |
| 446 | Block = getSampleBlockForColumns(column_names); |
| 447 | if (query_info.prewhere_info) |
| 448 | { |
| 449 | query_info.prewhere_info->prewhere_actions->execute(header); |
| 450 | header = materializeBlock(header); |
| 451 | if (query_info.prewhere_info->remove_prewhere_column) |
| 452 | header.erase(query_info.prewhere_info->prewhere_column_name); |
| 453 | } |
| 454 | return header; |
| 455 | } |
| 456 | case QueryProcessingStage::WithMergeableState: |
| 457 | case QueryProcessingStage::Complete: |
| 458 | return materializeBlock(InterpreterSelectQuery( |
| 459 | query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)), |
| 460 | SelectQueryOptions(processed_stage).analyze()).getSampleBlock()); |
| 461 | } |
| 462 | throw Exception("Logical Error: unknown processed stage." , ErrorCodes::LOGICAL_ERROR); |
| 463 | } |
| 464 | |
| 465 | void StorageMerge::convertingSourceStream(const Block & , const Context & context, ASTPtr & query, |
| 466 | BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage) |
| 467 | { |
| 468 | Block = source_stream->getHeader(); |
| 469 | source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); |
| 470 | |
| 471 | auto where_expression = query->as<ASTSelectQuery>()->where(); |
| 472 | |
| 473 | if (!where_expression) |
| 474 | return; |
| 475 | |
| 476 | for (size_t column_index : ext::range(0, header.columns())) |
| 477 | { |
| 478 | ColumnWithTypeAndName header_column = header.getByPosition(column_index); |
| 479 | ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name); |
| 480 | /// If the processed_stage greater than FetchColumns and the block structure between streams is different. |
| 481 | /// the where expression maybe invalid because of convertingBlockInputStream. |
| 482 | /// So we need to throw exception. |
| 483 | if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns) |
| 484 | { |
| 485 | NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList(); |
| 486 | NameAndTypePair virtual_column = getColumn("_table" ); |
| 487 | source_columns.insert(source_columns.end(), virtual_column); |
| 488 | auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns); |
| 489 | ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, context}.getActions(false, false); |
| 490 | Names required_columns = actions->getRequiredColumns(); |
| 491 | |
| 492 | for (const auto & required_column : required_columns) |
| 493 | { |
| 494 | if (required_column == header_column.name) |
| 495 | throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure() |
| 496 | + "\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); |
| 497 | } |
| 498 | } |
| 499 | |
| 500 | } |
| 501 | } |
| 502 | |
| 503 | |
| 504 | void registerStorageMerge(StorageFactory & factory) |
| 505 | { |
| 506 | factory.registerStorage("Merge" , [](const StorageFactory::Arguments & args) |
| 507 | { |
| 508 | /** In query, the name of database is specified as table engine argument which contains source tables, |
| 509 | * as well as regex for source-table names. |
| 510 | */ |
| 511 | |
| 512 | ASTs & engine_args = args.engine_args; |
| 513 | |
| 514 | if (engine_args.size() != 2) |
| 515 | throw Exception("Storage Merge requires exactly 2 parameters" |
| 516 | " - name of source database and regexp for table names." , |
| 517 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 518 | |
| 519 | engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); |
| 520 | engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); |
| 521 | |
| 522 | String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
| 523 | String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
| 524 | |
| 525 | return StorageMerge::create( |
| 526 | args.database_name, args.table_name, args.columns, |
| 527 | source_database, table_name_regexp, args.context); |
| 528 | }); |
| 529 | } |
| 530 | |
| 531 | } |
| 532 | |