1 | #include <DataStreams/AddingConstColumnBlockInputStream.h> |
2 | #include <DataStreams/narrowBlockInputStreams.h> |
3 | #include <DataStreams/LazyBlockInputStream.h> |
4 | #include <DataStreams/NullBlockInputStream.h> |
5 | #include <DataStreams/ConvertingBlockInputStream.h> |
6 | #include <DataStreams/OneBlockInputStream.h> |
7 | #include <DataStreams/ConcatBlockInputStream.h> |
8 | #include <DataStreams/materializeBlock.h> |
9 | #include <DataStreams/MaterializingBlockInputStream.h> |
10 | #include <DataStreams/FilterBlockInputStream.h> |
11 | #include <Storages/StorageMerge.h> |
12 | #include <Storages/StorageFactory.h> |
13 | #include <Storages/VirtualColumnUtils.h> |
14 | #include <Storages/AlterCommands.h> |
15 | #include <Interpreters/InterpreterAlterQuery.h> |
16 | #include <Interpreters/SyntaxAnalyzer.h> |
17 | #include <Interpreters/ExpressionActions.h> |
18 | #include <Interpreters/evaluateConstantExpression.h> |
19 | #include <Interpreters/InterpreterSelectQuery.h> |
20 | #include <Parsers/ASTSelectQuery.h> |
21 | #include <Parsers/ASTLiteral.h> |
22 | #include <Parsers/ASTExpressionList.h> |
23 | #include <DataTypes/DataTypeString.h> |
24 | #include <Columns/ColumnString.h> |
25 | #include <Common/typeid_cast.h> |
26 | #include <Common/checkStackSize.h> |
27 | #include <Databases/IDatabase.h> |
28 | #include <ext/range.h> |
29 | #include <algorithm> |
30 | #include <Parsers/ASTFunction.h> |
31 | #include <Parsers/queryToString.h> |
32 | |
33 | |
34 | namespace DB |
35 | { |
36 | |
37 | namespace ErrorCodes |
38 | { |
39 | extern const int ILLEGAL_PREWHERE; |
40 | extern const int INCOMPATIBLE_SOURCE_TABLES; |
41 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
42 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
43 | extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; |
44 | extern const int SAMPLING_NOT_SUPPORTED; |
45 | } |
46 | |
47 | |
48 | StorageMerge::StorageMerge( |
49 | const std::string & database_name_, |
50 | const std::string & table_name_, |
51 | const ColumnsDescription & columns_, |
52 | const String & source_database_, |
53 | const String & table_name_regexp_, |
54 | const Context & context_) |
55 | : IStorage(ColumnsDescription({{"_table" , std::make_shared<DataTypeString>()}}, true)) |
56 | , table_name(table_name_) |
57 | , database_name(database_name_) |
58 | , source_database(source_database_) |
59 | , table_name_regexp(table_name_regexp_) |
60 | , global_context(context_) |
61 | { |
62 | setColumns(columns_); |
63 | } |
64 | |
65 | |
66 | /// NOTE: structure of underlying tables as well as their set are not constant, |
67 | /// so the results of these methods may become obsolete after the call. |
68 | |
69 | NameAndTypePair StorageMerge::getColumn(const String & column_name) const |
70 | { |
71 | if (!IStorage::hasColumn(column_name)) |
72 | { |
73 | auto first_table = getFirstTable([](auto &&) { return true; }); |
74 | if (first_table) |
75 | return first_table->getColumn(column_name); |
76 | } |
77 | |
78 | return IStorage::getColumn(column_name); |
79 | } |
80 | |
81 | |
82 | bool StorageMerge::hasColumn(const String & column_name) const |
83 | { |
84 | if (!IStorage::hasColumn(column_name)) |
85 | { |
86 | auto first_table = getFirstTable([](auto &&) { return true; }); |
87 | if (first_table) |
88 | return first_table->hasColumn(column_name); |
89 | } |
90 | |
91 | return true; |
92 | } |
93 | |
94 | |
95 | template <typename F> |
96 | StoragePtr StorageMerge::getFirstTable(F && predicate) const |
97 | { |
98 | auto iterator = getDatabaseIterator(global_context); |
99 | |
100 | while (iterator->isValid()) |
101 | { |
102 | auto & table = iterator->table(); |
103 | if (table.get() != this && predicate(table)) |
104 | return table; |
105 | |
106 | iterator->next(); |
107 | } |
108 | |
109 | return {}; |
110 | } |
111 | |
112 | |
113 | bool StorageMerge::isRemote() const |
114 | { |
115 | auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); }); |
116 | return first_remote_table != nullptr; |
117 | } |
118 | |
119 | |
120 | bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const |
121 | { |
122 | /// It's beneficial if it is true for at least one table. |
123 | StorageListWithLocks selected_tables = getSelectedTables(query_context.getCurrentQueryId()); |
124 | |
125 | size_t i = 0; |
126 | for (const auto & table : selected_tables) |
127 | { |
128 | if (table.first->mayBenefitFromIndexForIn(left_in_operand, query_context)) |
129 | return true; |
130 | |
131 | ++i; |
132 | /// For simplicity reasons, check only first ten tables. |
133 | if (i > 10) |
134 | break; |
135 | } |
136 | |
137 | return false; |
138 | } |
139 | |
140 | |
141 | QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const |
142 | { |
143 | auto stage_in_source_tables = QueryProcessingStage::FetchColumns; |
144 | |
145 | DatabaseTablesIteratorPtr iterator = getDatabaseIterator(context); |
146 | |
147 | size_t selected_table_size = 0; |
148 | |
149 | while (iterator->isValid()) |
150 | { |
151 | auto & table = iterator->table(); |
152 | if (table.get() != this) |
153 | { |
154 | ++selected_table_size; |
155 | stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context)); |
156 | } |
157 | |
158 | iterator->next(); |
159 | } |
160 | |
161 | return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState); |
162 | } |
163 | |
164 | |
165 | BlockInputStreams StorageMerge::read( |
166 | const Names & column_names, |
167 | const SelectQueryInfo & query_info, |
168 | const Context & context, |
169 | QueryProcessingStage::Enum processed_stage, |
170 | const size_t max_block_size, |
171 | unsigned num_streams) |
172 | { |
173 | BlockInputStreams res; |
174 | |
175 | bool has_table_virtual_column = false; |
176 | Names real_column_names; |
177 | real_column_names.reserve(column_names.size()); |
178 | |
179 | for (const auto & column_name : column_names) |
180 | { |
181 | if (column_name == "_table" && isVirtualColumn(column_name)) |
182 | has_table_virtual_column = true; |
183 | else |
184 | real_column_names.push_back(column_name); |
185 | } |
186 | |
187 | /** Just in case, turn off optimization "transfer to PREWHERE", |
188 | * since there is no certainty that it works when one of table is MergeTree and other is not. |
189 | */ |
190 | Context modified_context = context; |
191 | modified_context.getSettingsRef().optimize_move_to_prewhere = false; |
192 | |
193 | /// What will be result structure depending on query processed stage in source tables? |
194 | Block = getQueryHeader(column_names, query_info, context, processed_stage); |
195 | |
196 | /** First we make list of selected tables to find out its size. |
197 | * This is necessary to correctly pass the recommended number of threads to each table. |
198 | */ |
199 | StorageListWithLocks selected_tables = getSelectedTables( |
200 | query_info.query, has_table_virtual_column, true, context.getCurrentQueryId()); |
201 | |
202 | if (selected_tables.empty()) |
203 | /// FIXME: do we support sampling in this case? |
204 | return createSourceStreams( |
205 | query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column); |
206 | |
207 | size_t tables_count = selected_tables.size(); |
208 | Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables))); |
209 | num_streams *= num_streams_multiplier; |
210 | size_t remaining_streams = num_streams; |
211 | |
212 | InputSortingInfoPtr input_sorting_info; |
213 | if (query_info.order_by_optimizer) |
214 | { |
215 | for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) |
216 | { |
217 | auto current_info = query_info.order_by_optimizer->getInputOrder(it->first); |
218 | if (it == selected_tables.begin()) |
219 | input_sorting_info = current_info; |
220 | else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info)) |
221 | input_sorting_info.reset(); |
222 | |
223 | if (!input_sorting_info) |
224 | break; |
225 | } |
226 | |
227 | query_info.input_sorting_info = input_sorting_info; |
228 | } |
229 | |
230 | for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) |
231 | { |
232 | size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); |
233 | size_t current_streams = std::min(current_need_streams, remaining_streams); |
234 | remaining_streams -= current_streams; |
235 | current_streams = std::max(size_t(1), current_streams); |
236 | |
237 | StoragePtr storage = it->first; |
238 | TableStructureReadLockHolder struct_lock = it->second; |
239 | |
240 | /// If sampling requested, then check that table supports it. |
241 | if (query_info.query->as<ASTSelectQuery>()->sample_size() && !storage->supportsSampling()) |
242 | throw Exception("Illegal SAMPLE: table doesn't support sampling" , ErrorCodes::SAMPLING_NOT_SUPPORTED); |
243 | |
244 | BlockInputStreams source_streams; |
245 | |
246 | if (current_streams) |
247 | { |
248 | source_streams = createSourceStreams( |
249 | query_info, processed_stage, max_block_size, header, storage, |
250 | struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column); |
251 | } |
252 | else |
253 | { |
254 | source_streams.emplace_back(std::make_shared<LazyBlockInputStream>( |
255 | header, [=, this]() mutable -> BlockInputStreamPtr |
256 | { |
257 | BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size, |
258 | header, storage, struct_lock, real_column_names, |
259 | modified_context, current_streams, has_table_virtual_column, true); |
260 | |
261 | if (!streams.empty() && streams.size() != 1) |
262 | throw Exception("LogicalError: the lazy stream size must to be one or empty." , ErrorCodes::LOGICAL_ERROR); |
263 | |
264 | return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0]; |
265 | })); |
266 | } |
267 | |
268 | res.insert(res.end(), source_streams.begin(), source_streams.end()); |
269 | } |
270 | |
271 | if (res.empty()) |
272 | return res; |
273 | |
274 | res = narrowBlockInputStreams(res, num_streams); |
275 | return res; |
276 | } |
277 | |
278 | BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, |
279 | const UInt64 max_block_size, const Block & , const StoragePtr & storage, |
280 | const TableStructureReadLockHolder & struct_lock, Names & real_column_names, |
281 | Context & modified_context, size_t streams_num, bool has_table_virtual_column, |
282 | bool concat_streams) |
283 | { |
284 | SelectQueryInfo modified_query_info = query_info; |
285 | modified_query_info.query = query_info.query->clone(); |
286 | |
287 | VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table" , storage ? storage->getTableName() : "" ); |
288 | |
289 | if (!storage) |
290 | return BlockInputStreams{ |
291 | InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header), |
292 | SelectQueryOptions(processed_stage).analyze()).execute().in}; |
293 | |
294 | BlockInputStreams source_streams; |
295 | |
296 | if (processed_stage <= storage->getQueryProcessingStage(modified_context)) |
297 | { |
298 | /// If there are only virtual columns in query, you must request at least one other column. |
299 | if (real_column_names.size() ==0) |
300 | real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical())); |
301 | |
302 | source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, |
303 | UInt32(streams_num)); |
304 | } |
305 | else if (processed_stage > storage->getQueryProcessingStage(modified_context)) |
306 | { |
307 | modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, storage->getTableName()); |
308 | |
309 | /// Maximum permissible parallelism is streams_num |
310 | modified_context.getSettingsRef().max_threads = UInt64(streams_num); |
311 | modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1; |
312 | |
313 | InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)}; |
314 | BlockInputStreamPtr interpreter_stream = interpreter.execute().in; |
315 | |
316 | /** Materialization is needed, since from distributed storage the constants come materialized. |
317 | * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, |
318 | * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. |
319 | */ |
320 | source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream)); |
321 | } |
322 | |
323 | if (!source_streams.empty()) |
324 | { |
325 | if (concat_streams) |
326 | { |
327 | BlockInputStreamPtr stream = |
328 | source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0]; |
329 | |
330 | source_streams.resize(1); |
331 | source_streams[0] = stream; |
332 | } |
333 | |
334 | for (BlockInputStreamPtr & source_stream : source_streams) |
335 | { |
336 | if (has_table_virtual_column) |
337 | source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>( |
338 | source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table" ); |
339 | |
340 | /// Subordinary tables could have different but convertible types, like numeric types of different width. |
341 | /// We must return streams with structure equals to structure of Merge table. |
342 | convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage); |
343 | |
344 | source_stream->addTableLock(struct_lock); |
345 | } |
346 | } |
347 | |
348 | return source_streams; |
349 | } |
350 | |
351 | |
352 | StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id) const |
353 | { |
354 | StorageListWithLocks selected_tables; |
355 | auto iterator = getDatabaseIterator(global_context); |
356 | |
357 | while (iterator->isValid()) |
358 | { |
359 | auto & table = iterator->table(); |
360 | if (table.get() != this) |
361 | selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id)); |
362 | |
363 | iterator->next(); |
364 | } |
365 | |
366 | return selected_tables; |
367 | } |
368 | |
369 | |
370 | StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock, const String & query_id) const |
371 | { |
372 | StorageListWithLocks selected_tables; |
373 | DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context); |
374 | |
375 | auto virtual_column = ColumnString::create(); |
376 | |
377 | while (iterator->isValid()) |
378 | { |
379 | StoragePtr storage = iterator->table(); |
380 | |
381 | if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere()) |
382 | throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE." , ErrorCodes::ILLEGAL_PREWHERE); |
383 | |
384 | if (storage.get() != this) |
385 | { |
386 | selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{}); |
387 | virtual_column->insert(storage->getTableName()); |
388 | } |
389 | |
390 | iterator->next(); |
391 | } |
392 | |
393 | if (has_virtual_column) |
394 | { |
395 | Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table" )}; |
396 | VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context); |
397 | auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table" ); |
398 | |
399 | /// Remove unused tables from the list |
400 | selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); }); |
401 | } |
402 | |
403 | return selected_tables; |
404 | } |
405 | |
406 | |
407 | DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & context) const |
408 | { |
409 | checkStackSize(); |
410 | auto database = context.getDatabase(source_database); |
411 | auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); }; |
412 | return database->getTablesIterator(global_context, table_name_match); |
413 | } |
414 | |
415 | |
416 | void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) |
417 | { |
418 | for (const auto & command : commands) |
419 | { |
420 | if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN |
421 | && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN) |
422 | throw Exception( |
423 | "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(), |
424 | ErrorCodes::NOT_IMPLEMENTED); |
425 | } |
426 | } |
427 | |
428 | void StorageMerge::alter( |
429 | const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) |
430 | { |
431 | lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); |
432 | |
433 | StorageInMemoryMetadata storage_metadata = getInMemoryMetadata(); |
434 | params.apply(storage_metadata); |
435 | context.getDatabase(database_name)->alterTable(context, table_name, storage_metadata); |
436 | setColumns(storage_metadata.columns); |
437 | } |
438 | |
439 | Block StorageMerge::( |
440 | const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage) |
441 | { |
442 | switch (processed_stage) |
443 | { |
444 | case QueryProcessingStage::FetchColumns: |
445 | { |
446 | Block = getSampleBlockForColumns(column_names); |
447 | if (query_info.prewhere_info) |
448 | { |
449 | query_info.prewhere_info->prewhere_actions->execute(header); |
450 | header = materializeBlock(header); |
451 | if (query_info.prewhere_info->remove_prewhere_column) |
452 | header.erase(query_info.prewhere_info->prewhere_column_name); |
453 | } |
454 | return header; |
455 | } |
456 | case QueryProcessingStage::WithMergeableState: |
457 | case QueryProcessingStage::Complete: |
458 | return materializeBlock(InterpreterSelectQuery( |
459 | query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)), |
460 | SelectQueryOptions(processed_stage).analyze()).getSampleBlock()); |
461 | } |
462 | throw Exception("Logical Error: unknown processed stage." , ErrorCodes::LOGICAL_ERROR); |
463 | } |
464 | |
465 | void StorageMerge::convertingSourceStream(const Block & , const Context & context, ASTPtr & query, |
466 | BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage) |
467 | { |
468 | Block = source_stream->getHeader(); |
469 | source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); |
470 | |
471 | auto where_expression = query->as<ASTSelectQuery>()->where(); |
472 | |
473 | if (!where_expression) |
474 | return; |
475 | |
476 | for (size_t column_index : ext::range(0, header.columns())) |
477 | { |
478 | ColumnWithTypeAndName header_column = header.getByPosition(column_index); |
479 | ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name); |
480 | /// If the processed_stage greater than FetchColumns and the block structure between streams is different. |
481 | /// the where expression maybe invalid because of convertingBlockInputStream. |
482 | /// So we need to throw exception. |
483 | if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns) |
484 | { |
485 | NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList(); |
486 | NameAndTypePair virtual_column = getColumn("_table" ); |
487 | source_columns.insert(source_columns.end(), virtual_column); |
488 | auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns); |
489 | ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, context}.getActions(false, false); |
490 | Names required_columns = actions->getRequiredColumns(); |
491 | |
492 | for (const auto & required_column : required_columns) |
493 | { |
494 | if (required_column == header_column.name) |
495 | throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure() |
496 | + "\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); |
497 | } |
498 | } |
499 | |
500 | } |
501 | } |
502 | |
503 | |
504 | void registerStorageMerge(StorageFactory & factory) |
505 | { |
506 | factory.registerStorage("Merge" , [](const StorageFactory::Arguments & args) |
507 | { |
508 | /** In query, the name of database is specified as table engine argument which contains source tables, |
509 | * as well as regex for source-table names. |
510 | */ |
511 | |
512 | ASTs & engine_args = args.engine_args; |
513 | |
514 | if (engine_args.size() != 2) |
515 | throw Exception("Storage Merge requires exactly 2 parameters" |
516 | " - name of source database and regexp for table names." , |
517 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
518 | |
519 | engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); |
520 | engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); |
521 | |
522 | String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
523 | String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
524 | |
525 | return StorageMerge::create( |
526 | args.database_name, args.table_name, args.columns, |
527 | source_database, table_name_regexp, args.context); |
528 | }); |
529 | } |
530 | |
531 | } |
532 | |