| 1 | #include <DataStreams/AddingConstColumnBlockInputStream.h> | 
|---|
| 2 | #include <DataStreams/narrowBlockInputStreams.h> | 
|---|
| 3 | #include <DataStreams/LazyBlockInputStream.h> | 
|---|
| 4 | #include <DataStreams/NullBlockInputStream.h> | 
|---|
| 5 | #include <DataStreams/ConvertingBlockInputStream.h> | 
|---|
| 6 | #include <DataStreams/OneBlockInputStream.h> | 
|---|
| 7 | #include <DataStreams/ConcatBlockInputStream.h> | 
|---|
| 8 | #include <DataStreams/materializeBlock.h> | 
|---|
| 9 | #include <DataStreams/MaterializingBlockInputStream.h> | 
|---|
| 10 | #include <DataStreams/FilterBlockInputStream.h> | 
|---|
| 11 | #include <Storages/StorageMerge.h> | 
|---|
| 12 | #include <Storages/StorageFactory.h> | 
|---|
| 13 | #include <Storages/VirtualColumnUtils.h> | 
|---|
| 14 | #include <Storages/AlterCommands.h> | 
|---|
| 15 | #include <Interpreters/InterpreterAlterQuery.h> | 
|---|
| 16 | #include <Interpreters/SyntaxAnalyzer.h> | 
|---|
| 17 | #include <Interpreters/ExpressionActions.h> | 
|---|
| 18 | #include <Interpreters/evaluateConstantExpression.h> | 
|---|
| 19 | #include <Interpreters/InterpreterSelectQuery.h> | 
|---|
| 20 | #include <Parsers/ASTSelectQuery.h> | 
|---|
| 21 | #include <Parsers/ASTLiteral.h> | 
|---|
| 22 | #include <Parsers/ASTExpressionList.h> | 
|---|
| 23 | #include <DataTypes/DataTypeString.h> | 
|---|
| 24 | #include <Columns/ColumnString.h> | 
|---|
| 25 | #include <Common/typeid_cast.h> | 
|---|
| 26 | #include <Common/checkStackSize.h> | 
|---|
| 27 | #include <Databases/IDatabase.h> | 
|---|
| 28 | #include <ext/range.h> | 
|---|
| 29 | #include <algorithm> | 
|---|
| 30 | #include <Parsers/ASTFunction.h> | 
|---|
| 31 | #include <Parsers/queryToString.h> | 
|---|
| 32 |  | 
|---|
| 33 |  | 
|---|
| 34 | namespace DB | 
|---|
| 35 | { | 
|---|
| 36 |  | 
|---|
| 37 | namespace ErrorCodes | 
|---|
| 38 | { | 
|---|
| 39 | extern const int ILLEGAL_PREWHERE; | 
|---|
| 40 | extern const int INCOMPATIBLE_SOURCE_TABLES; | 
|---|
| 41 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; | 
|---|
| 42 | extern const int NO_SUCH_COLUMN_IN_TABLE; | 
|---|
| 43 | extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; | 
|---|
| 44 | extern const int SAMPLING_NOT_SUPPORTED; | 
|---|
| 45 | } | 
|---|
| 46 |  | 
|---|
| 47 |  | 
|---|
| 48 | StorageMerge::StorageMerge( | 
|---|
| 49 | const std::string & database_name_, | 
|---|
| 50 | const std::string & table_name_, | 
|---|
| 51 | const ColumnsDescription & columns_, | 
|---|
| 52 | const String & source_database_, | 
|---|
| 53 | const String & table_name_regexp_, | 
|---|
| 54 | const Context & context_) | 
|---|
| 55 | : IStorage(ColumnsDescription({{ "_table", std::make_shared<DataTypeString>()}}, true)) | 
|---|
| 56 | , table_name(table_name_) | 
|---|
| 57 | , database_name(database_name_) | 
|---|
| 58 | , source_database(source_database_) | 
|---|
| 59 | , table_name_regexp(table_name_regexp_) | 
|---|
| 60 | , global_context(context_) | 
|---|
| 61 | { | 
|---|
| 62 | setColumns(columns_); | 
|---|
| 63 | } | 
|---|
| 64 |  | 
|---|
| 65 |  | 
|---|
| 66 | /// NOTE: structure of underlying tables as well as their set are not constant, | 
|---|
| 67 | ///       so the results of these methods may become obsolete after the call. | 
|---|
| 68 |  | 
|---|
| 69 | NameAndTypePair StorageMerge::getColumn(const String & column_name) const | 
|---|
| 70 | { | 
|---|
| 71 | if (!IStorage::hasColumn(column_name)) | 
|---|
| 72 | { | 
|---|
| 73 | auto first_table = getFirstTable([](auto &&) { return true; }); | 
|---|
| 74 | if (first_table) | 
|---|
| 75 | return first_table->getColumn(column_name); | 
|---|
| 76 | } | 
|---|
| 77 |  | 
|---|
| 78 | return IStorage::getColumn(column_name); | 
|---|
| 79 | } | 
|---|
| 80 |  | 
|---|
| 81 |  | 
|---|
| 82 | bool StorageMerge::hasColumn(const String & column_name) const | 
|---|
| 83 | { | 
|---|
| 84 | if (!IStorage::hasColumn(column_name)) | 
|---|
| 85 | { | 
|---|
| 86 | auto first_table = getFirstTable([](auto &&) { return true; }); | 
|---|
| 87 | if (first_table) | 
|---|
| 88 | return first_table->hasColumn(column_name); | 
|---|
| 89 | } | 
|---|
| 90 |  | 
|---|
| 91 | return true; | 
|---|
| 92 | } | 
|---|
| 93 |  | 
|---|
| 94 |  | 
|---|
| 95 | template <typename F> | 
|---|
| 96 | StoragePtr StorageMerge::getFirstTable(F && predicate) const | 
|---|
| 97 | { | 
|---|
| 98 | auto iterator = getDatabaseIterator(global_context); | 
|---|
| 99 |  | 
|---|
| 100 | while (iterator->isValid()) | 
|---|
| 101 | { | 
|---|
| 102 | auto & table = iterator->table(); | 
|---|
| 103 | if (table.get() != this && predicate(table)) | 
|---|
| 104 | return table; | 
|---|
| 105 |  | 
|---|
| 106 | iterator->next(); | 
|---|
| 107 | } | 
|---|
| 108 |  | 
|---|
| 109 | return {}; | 
|---|
| 110 | } | 
|---|
| 111 |  | 
|---|
| 112 |  | 
|---|
| 113 | bool StorageMerge::isRemote() const | 
|---|
| 114 | { | 
|---|
| 115 | auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); }); | 
|---|
| 116 | return first_remote_table != nullptr; | 
|---|
| 117 | } | 
|---|
| 118 |  | 
|---|
| 119 |  | 
|---|
| 120 | bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const | 
|---|
| 121 | { | 
|---|
| 122 | /// It's beneficial if it is true for at least one table. | 
|---|
| 123 | StorageListWithLocks selected_tables = getSelectedTables(query_context.getCurrentQueryId()); | 
|---|
| 124 |  | 
|---|
| 125 | size_t i = 0; | 
|---|
| 126 | for (const auto & table : selected_tables) | 
|---|
| 127 | { | 
|---|
| 128 | if (table.first->mayBenefitFromIndexForIn(left_in_operand, query_context)) | 
|---|
| 129 | return true; | 
|---|
| 130 |  | 
|---|
| 131 | ++i; | 
|---|
| 132 | /// For simplicity reasons, check only first ten tables. | 
|---|
| 133 | if (i > 10) | 
|---|
| 134 | break; | 
|---|
| 135 | } | 
|---|
| 136 |  | 
|---|
| 137 | return false; | 
|---|
| 138 | } | 
|---|
| 139 |  | 
|---|
| 140 |  | 
|---|
| 141 | QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const | 
|---|
| 142 | { | 
|---|
| 143 | auto stage_in_source_tables = QueryProcessingStage::FetchColumns; | 
|---|
| 144 |  | 
|---|
| 145 | DatabaseTablesIteratorPtr iterator = getDatabaseIterator(context); | 
|---|
| 146 |  | 
|---|
| 147 | size_t selected_table_size = 0; | 
|---|
| 148 |  | 
|---|
| 149 | while (iterator->isValid()) | 
|---|
| 150 | { | 
|---|
| 151 | auto & table = iterator->table(); | 
|---|
| 152 | if (table.get() != this) | 
|---|
| 153 | { | 
|---|
| 154 | ++selected_table_size; | 
|---|
| 155 | stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context)); | 
|---|
| 156 | } | 
|---|
| 157 |  | 
|---|
| 158 | iterator->next(); | 
|---|
| 159 | } | 
|---|
| 160 |  | 
|---|
| 161 | return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState); | 
|---|
| 162 | } | 
|---|
| 163 |  | 
|---|
| 164 |  | 
|---|
| 165 | BlockInputStreams StorageMerge::read( | 
|---|
| 166 | const Names & column_names, | 
|---|
| 167 | const SelectQueryInfo & query_info, | 
|---|
| 168 | const Context & context, | 
|---|
| 169 | QueryProcessingStage::Enum processed_stage, | 
|---|
| 170 | const size_t max_block_size, | 
|---|
| 171 | unsigned num_streams) | 
|---|
| 172 | { | 
|---|
| 173 | BlockInputStreams res; | 
|---|
| 174 |  | 
|---|
| 175 | bool has_table_virtual_column = false; | 
|---|
| 176 | Names real_column_names; | 
|---|
| 177 | real_column_names.reserve(column_names.size()); | 
|---|
| 178 |  | 
|---|
| 179 | for (const auto & column_name : column_names) | 
|---|
| 180 | { | 
|---|
| 181 | if (column_name == "_table"&& isVirtualColumn(column_name)) | 
|---|
| 182 | has_table_virtual_column = true; | 
|---|
| 183 | else | 
|---|
| 184 | real_column_names.push_back(column_name); | 
|---|
| 185 | } | 
|---|
| 186 |  | 
|---|
| 187 | /** Just in case, turn off optimization "transfer to PREWHERE", | 
|---|
| 188 | * since there is no certainty that it works when one of table is MergeTree and other is not. | 
|---|
| 189 | */ | 
|---|
| 190 | Context modified_context = context; | 
|---|
| 191 | modified_context.getSettingsRef().optimize_move_to_prewhere = false; | 
|---|
| 192 |  | 
|---|
| 193 | /// What will be result structure depending on query processed stage in source tables? | 
|---|
| 194 | Block  = getQueryHeader(column_names, query_info, context, processed_stage); | 
|---|
| 195 |  | 
|---|
| 196 | /** First we make list of selected tables to find out its size. | 
|---|
| 197 | * This is necessary to correctly pass the recommended number of threads to each table. | 
|---|
| 198 | */ | 
|---|
| 199 | StorageListWithLocks selected_tables = getSelectedTables( | 
|---|
| 200 | query_info.query, has_table_virtual_column, true, context.getCurrentQueryId()); | 
|---|
| 201 |  | 
|---|
| 202 | if (selected_tables.empty()) | 
|---|
| 203 | /// FIXME: do we support sampling in this case? | 
|---|
| 204 | return createSourceStreams( | 
|---|
| 205 | query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column); | 
|---|
| 206 |  | 
|---|
| 207 | size_t tables_count = selected_tables.size(); | 
|---|
| 208 | Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables))); | 
|---|
| 209 | num_streams *= num_streams_multiplier; | 
|---|
| 210 | size_t remaining_streams = num_streams; | 
|---|
| 211 |  | 
|---|
| 212 | InputSortingInfoPtr input_sorting_info; | 
|---|
| 213 | if (query_info.order_by_optimizer) | 
|---|
| 214 | { | 
|---|
| 215 | for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) | 
|---|
| 216 | { | 
|---|
| 217 | auto current_info = query_info.order_by_optimizer->getInputOrder(it->first); | 
|---|
| 218 | if (it == selected_tables.begin()) | 
|---|
| 219 | input_sorting_info = current_info; | 
|---|
| 220 | else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info)) | 
|---|
| 221 | input_sorting_info.reset(); | 
|---|
| 222 |  | 
|---|
| 223 | if (!input_sorting_info) | 
|---|
| 224 | break; | 
|---|
| 225 | } | 
|---|
| 226 |  | 
|---|
| 227 | query_info.input_sorting_info = input_sorting_info; | 
|---|
| 228 | } | 
|---|
| 229 |  | 
|---|
| 230 | for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) | 
|---|
| 231 | { | 
|---|
| 232 | size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); | 
|---|
| 233 | size_t current_streams = std::min(current_need_streams, remaining_streams); | 
|---|
| 234 | remaining_streams -= current_streams; | 
|---|
| 235 | current_streams = std::max(size_t(1), current_streams); | 
|---|
| 236 |  | 
|---|
| 237 | StoragePtr storage = it->first; | 
|---|
| 238 | TableStructureReadLockHolder struct_lock = it->second; | 
|---|
| 239 |  | 
|---|
| 240 | /// If sampling requested, then check that table supports it. | 
|---|
| 241 | if (query_info.query->as<ASTSelectQuery>()->sample_size() && !storage->supportsSampling()) | 
|---|
| 242 | throw Exception( "Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); | 
|---|
| 243 |  | 
|---|
| 244 | BlockInputStreams source_streams; | 
|---|
| 245 |  | 
|---|
| 246 | if (current_streams) | 
|---|
| 247 | { | 
|---|
| 248 | source_streams = createSourceStreams( | 
|---|
| 249 | query_info, processed_stage, max_block_size, header, storage, | 
|---|
| 250 | struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column); | 
|---|
| 251 | } | 
|---|
| 252 | else | 
|---|
| 253 | { | 
|---|
| 254 | source_streams.emplace_back(std::make_shared<LazyBlockInputStream>( | 
|---|
| 255 | header, [=, this]() mutable -> BlockInputStreamPtr | 
|---|
| 256 | { | 
|---|
| 257 | BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size, | 
|---|
| 258 | header, storage, struct_lock, real_column_names, | 
|---|
| 259 | modified_context, current_streams, has_table_virtual_column, true); | 
|---|
| 260 |  | 
|---|
| 261 | if (!streams.empty() && streams.size() != 1) | 
|---|
| 262 | throw Exception( "LogicalError: the lazy stream size must to be one or empty.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 263 |  | 
|---|
| 264 | return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0]; | 
|---|
| 265 | })); | 
|---|
| 266 | } | 
|---|
| 267 |  | 
|---|
| 268 | res.insert(res.end(), source_streams.begin(), source_streams.end()); | 
|---|
| 269 | } | 
|---|
| 270 |  | 
|---|
| 271 | if (res.empty()) | 
|---|
| 272 | return res; | 
|---|
| 273 |  | 
|---|
| 274 | res = narrowBlockInputStreams(res, num_streams); | 
|---|
| 275 | return res; | 
|---|
| 276 | } | 
|---|
| 277 |  | 
|---|
| 278 | BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, | 
|---|
| 279 | const UInt64 max_block_size, const Block & , const StoragePtr & storage, | 
|---|
| 280 | const TableStructureReadLockHolder & struct_lock, Names & real_column_names, | 
|---|
| 281 | Context & modified_context, size_t streams_num, bool has_table_virtual_column, | 
|---|
| 282 | bool concat_streams) | 
|---|
| 283 | { | 
|---|
| 284 | SelectQueryInfo modified_query_info = query_info; | 
|---|
| 285 | modified_query_info.query = query_info.query->clone(); | 
|---|
| 286 |  | 
|---|
| 287 | VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage ? storage->getTableName() : ""); | 
|---|
| 288 |  | 
|---|
| 289 | if (!storage) | 
|---|
| 290 | return BlockInputStreams{ | 
|---|
| 291 | InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header), | 
|---|
| 292 | SelectQueryOptions(processed_stage).analyze()).execute().in}; | 
|---|
| 293 |  | 
|---|
| 294 | BlockInputStreams source_streams; | 
|---|
| 295 |  | 
|---|
| 296 | if (processed_stage <= storage->getQueryProcessingStage(modified_context)) | 
|---|
| 297 | { | 
|---|
| 298 | /// If there are only virtual columns in query, you must request at least one other column. | 
|---|
| 299 | if (real_column_names.size() ==0) | 
|---|
| 300 | real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical())); | 
|---|
| 301 |  | 
|---|
| 302 | source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, | 
|---|
| 303 | UInt32(streams_num)); | 
|---|
| 304 | } | 
|---|
| 305 | else if (processed_stage > storage->getQueryProcessingStage(modified_context)) | 
|---|
| 306 | { | 
|---|
| 307 | modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, storage->getTableName()); | 
|---|
| 308 |  | 
|---|
| 309 | /// Maximum permissible parallelism is streams_num | 
|---|
| 310 | modified_context.getSettingsRef().max_threads = UInt64(streams_num); | 
|---|
| 311 | modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1; | 
|---|
| 312 |  | 
|---|
| 313 | InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)}; | 
|---|
| 314 | BlockInputStreamPtr interpreter_stream = interpreter.execute().in; | 
|---|
| 315 |  | 
|---|
| 316 | /** Materialization is needed, since from distributed storage the constants come materialized. | 
|---|
| 317 | * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, | 
|---|
| 318 | * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. | 
|---|
| 319 | */ | 
|---|
| 320 | source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream)); | 
|---|
| 321 | } | 
|---|
| 322 |  | 
|---|
| 323 | if (!source_streams.empty()) | 
|---|
| 324 | { | 
|---|
| 325 | if (concat_streams) | 
|---|
| 326 | { | 
|---|
| 327 | BlockInputStreamPtr stream = | 
|---|
| 328 | source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0]; | 
|---|
| 329 |  | 
|---|
| 330 | source_streams.resize(1); | 
|---|
| 331 | source_streams[0] = stream; | 
|---|
| 332 | } | 
|---|
| 333 |  | 
|---|
| 334 | for (BlockInputStreamPtr & source_stream : source_streams) | 
|---|
| 335 | { | 
|---|
| 336 | if (has_table_virtual_column) | 
|---|
| 337 | source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>( | 
|---|
| 338 | source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table"); | 
|---|
| 339 |  | 
|---|
| 340 | /// Subordinary tables could have different but convertible types, like numeric types of different width. | 
|---|
| 341 | /// We must return streams with structure equals to structure of Merge table. | 
|---|
| 342 | convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage); | 
|---|
| 343 |  | 
|---|
| 344 | source_stream->addTableLock(struct_lock); | 
|---|
| 345 | } | 
|---|
| 346 | } | 
|---|
| 347 |  | 
|---|
| 348 | return source_streams; | 
|---|
| 349 | } | 
|---|
| 350 |  | 
|---|
| 351 |  | 
|---|
| 352 | StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id) const | 
|---|
| 353 | { | 
|---|
| 354 | StorageListWithLocks selected_tables; | 
|---|
| 355 | auto iterator = getDatabaseIterator(global_context); | 
|---|
| 356 |  | 
|---|
| 357 | while (iterator->isValid()) | 
|---|
| 358 | { | 
|---|
| 359 | auto & table = iterator->table(); | 
|---|
| 360 | if (table.get() != this) | 
|---|
| 361 | selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id)); | 
|---|
| 362 |  | 
|---|
| 363 | iterator->next(); | 
|---|
| 364 | } | 
|---|
| 365 |  | 
|---|
| 366 | return selected_tables; | 
|---|
| 367 | } | 
|---|
| 368 |  | 
|---|
| 369 |  | 
|---|
| 370 | StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock, const String & query_id) const | 
|---|
| 371 | { | 
|---|
| 372 | StorageListWithLocks selected_tables; | 
|---|
| 373 | DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context); | 
|---|
| 374 |  | 
|---|
| 375 | auto virtual_column = ColumnString::create(); | 
|---|
| 376 |  | 
|---|
| 377 | while (iterator->isValid()) | 
|---|
| 378 | { | 
|---|
| 379 | StoragePtr storage = iterator->table(); | 
|---|
| 380 |  | 
|---|
| 381 | if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere()) | 
|---|
| 382 | throw Exception( "Storage "+ storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE); | 
|---|
| 383 |  | 
|---|
| 384 | if (storage.get() != this) | 
|---|
| 385 | { | 
|---|
| 386 | selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{}); | 
|---|
| 387 | virtual_column->insert(storage->getTableName()); | 
|---|
| 388 | } | 
|---|
| 389 |  | 
|---|
| 390 | iterator->next(); | 
|---|
| 391 | } | 
|---|
| 392 |  | 
|---|
| 393 | if (has_virtual_column) | 
|---|
| 394 | { | 
|---|
| 395 | Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")}; | 
|---|
| 396 | VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context); | 
|---|
| 397 | auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table"); | 
|---|
| 398 |  | 
|---|
| 399 | /// Remove unused tables from the list | 
|---|
| 400 | selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); }); | 
|---|
| 401 | } | 
|---|
| 402 |  | 
|---|
| 403 | return selected_tables; | 
|---|
| 404 | } | 
|---|
| 405 |  | 
|---|
| 406 |  | 
|---|
| 407 | DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & context) const | 
|---|
| 408 | { | 
|---|
| 409 | checkStackSize(); | 
|---|
| 410 | auto database = context.getDatabase(source_database); | 
|---|
| 411 | auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); }; | 
|---|
| 412 | return database->getTablesIterator(global_context, table_name_match); | 
|---|
| 413 | } | 
|---|
| 414 |  | 
|---|
| 415 |  | 
|---|
| 416 | void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) | 
|---|
| 417 | { | 
|---|
| 418 | for (const auto & command : commands) | 
|---|
| 419 | { | 
|---|
| 420 | if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN | 
|---|
| 421 | && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN) | 
|---|
| 422 | throw Exception( | 
|---|
| 423 | "Alter of type '"+ alterTypeToString(command.type) + "' is not supported by storage "+ getName(), | 
|---|
| 424 | ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 425 | } | 
|---|
| 426 | } | 
|---|
| 427 |  | 
|---|
| 428 | void StorageMerge::alter( | 
|---|
| 429 | const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) | 
|---|
| 430 | { | 
|---|
| 431 | lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); | 
|---|
| 432 |  | 
|---|
| 433 | StorageInMemoryMetadata storage_metadata = getInMemoryMetadata(); | 
|---|
| 434 | params.apply(storage_metadata); | 
|---|
| 435 | context.getDatabase(database_name)->alterTable(context, table_name, storage_metadata); | 
|---|
| 436 | setColumns(storage_metadata.columns); | 
|---|
| 437 | } | 
|---|
| 438 |  | 
|---|
| 439 | Block StorageMerge::( | 
|---|
| 440 | const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage) | 
|---|
| 441 | { | 
|---|
| 442 | switch (processed_stage) | 
|---|
| 443 | { | 
|---|
| 444 | case QueryProcessingStage::FetchColumns: | 
|---|
| 445 | { | 
|---|
| 446 | Block  = getSampleBlockForColumns(column_names); | 
|---|
| 447 | if (query_info.prewhere_info) | 
|---|
| 448 | { | 
|---|
| 449 | query_info.prewhere_info->prewhere_actions->execute(header); | 
|---|
| 450 | header = materializeBlock(header); | 
|---|
| 451 | if (query_info.prewhere_info->remove_prewhere_column) | 
|---|
| 452 | header.erase(query_info.prewhere_info->prewhere_column_name); | 
|---|
| 453 | } | 
|---|
| 454 | return header; | 
|---|
| 455 | } | 
|---|
| 456 | case QueryProcessingStage::WithMergeableState: | 
|---|
| 457 | case QueryProcessingStage::Complete: | 
|---|
| 458 | return materializeBlock(InterpreterSelectQuery( | 
|---|
| 459 | query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)), | 
|---|
| 460 | SelectQueryOptions(processed_stage).analyze()).getSampleBlock()); | 
|---|
| 461 | } | 
|---|
| 462 | throw Exception( "Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 463 | } | 
|---|
| 464 |  | 
|---|
| 465 | void StorageMerge::convertingSourceStream(const Block & , const Context & context, ASTPtr & query, | 
|---|
| 466 | BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage) | 
|---|
| 467 | { | 
|---|
| 468 | Block  = source_stream->getHeader(); | 
|---|
| 469 | source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); | 
|---|
| 470 |  | 
|---|
| 471 | auto where_expression = query->as<ASTSelectQuery>()->where(); | 
|---|
| 472 |  | 
|---|
| 473 | if (!where_expression) | 
|---|
| 474 | return; | 
|---|
| 475 |  | 
|---|
| 476 | for (size_t column_index : ext::range(0, header.columns())) | 
|---|
| 477 | { | 
|---|
| 478 | ColumnWithTypeAndName header_column = header.getByPosition(column_index); | 
|---|
| 479 | ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name); | 
|---|
| 480 | /// If the processed_stage greater than FetchColumns and the block structure between streams is different. | 
|---|
| 481 | /// the where expression maybe invalid because of convertingBlockInputStream. | 
|---|
| 482 | /// So we need to throw exception. | 
|---|
| 483 | if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns) | 
|---|
| 484 | { | 
|---|
| 485 | NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList(); | 
|---|
| 486 | NameAndTypePair virtual_column = getColumn( "_table"); | 
|---|
| 487 | source_columns.insert(source_columns.end(), virtual_column); | 
|---|
| 488 | auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns); | 
|---|
| 489 | ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, context}.getActions(false, false); | 
|---|
| 490 | Names required_columns = actions->getRequiredColumns(); | 
|---|
| 491 |  | 
|---|
| 492 | for (const auto & required_column : required_columns) | 
|---|
| 493 | { | 
|---|
| 494 | if (required_column == header_column.name) | 
|---|
| 495 | throw Exception( "Block structure mismatch in Merge Storage: different types:\n"+ before_block_header.dumpStructure() | 
|---|
| 496 | + "\n"+ header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); | 
|---|
| 497 | } | 
|---|
| 498 | } | 
|---|
| 499 |  | 
|---|
| 500 | } | 
|---|
| 501 | } | 
|---|
| 502 |  | 
|---|
| 503 |  | 
|---|
| 504 | void registerStorageMerge(StorageFactory & factory) | 
|---|
| 505 | { | 
|---|
| 506 | factory.registerStorage( "Merge", [](const StorageFactory::Arguments & args) | 
|---|
| 507 | { | 
|---|
| 508 | /** In query, the name of database is specified as table engine argument which contains source tables, | 
|---|
| 509 | *  as well as regex for source-table names. | 
|---|
| 510 | */ | 
|---|
| 511 |  | 
|---|
| 512 | ASTs & engine_args = args.engine_args; | 
|---|
| 513 |  | 
|---|
| 514 | if (engine_args.size() != 2) | 
|---|
| 515 | throw Exception( "Storage Merge requires exactly 2 parameters" | 
|---|
| 516 | " - name of source database and regexp for table names.", | 
|---|
| 517 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); | 
|---|
| 518 |  | 
|---|
| 519 | engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); | 
|---|
| 520 | engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); | 
|---|
| 521 |  | 
|---|
| 522 | String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 523 | String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 524 |  | 
|---|
| 525 | return StorageMerge::create( | 
|---|
| 526 | args.database_name, args.table_name, args.columns, | 
|---|
| 527 | source_database, table_name_regexp, args.context); | 
|---|
| 528 | }); | 
|---|
| 529 | } | 
|---|
| 530 |  | 
|---|
| 531 | } | 
|---|
| 532 |  | 
|---|