1#include <DataStreams/AddingConstColumnBlockInputStream.h>
2#include <DataStreams/narrowBlockInputStreams.h>
3#include <DataStreams/LazyBlockInputStream.h>
4#include <DataStreams/NullBlockInputStream.h>
5#include <DataStreams/ConvertingBlockInputStream.h>
6#include <DataStreams/OneBlockInputStream.h>
7#include <DataStreams/ConcatBlockInputStream.h>
8#include <DataStreams/materializeBlock.h>
9#include <DataStreams/MaterializingBlockInputStream.h>
10#include <DataStreams/FilterBlockInputStream.h>
11#include <Storages/StorageMerge.h>
12#include <Storages/StorageFactory.h>
13#include <Storages/VirtualColumnUtils.h>
14#include <Storages/AlterCommands.h>
15#include <Interpreters/InterpreterAlterQuery.h>
16#include <Interpreters/SyntaxAnalyzer.h>
17#include <Interpreters/ExpressionActions.h>
18#include <Interpreters/evaluateConstantExpression.h>
19#include <Interpreters/InterpreterSelectQuery.h>
20#include <Parsers/ASTSelectQuery.h>
21#include <Parsers/ASTLiteral.h>
22#include <Parsers/ASTExpressionList.h>
23#include <DataTypes/DataTypeString.h>
24#include <Columns/ColumnString.h>
25#include <Common/typeid_cast.h>
26#include <Common/checkStackSize.h>
27#include <Databases/IDatabase.h>
28#include <ext/range.h>
29#include <algorithm>
30#include <Parsers/ASTFunction.h>
31#include <Parsers/queryToString.h>
32
33
34namespace DB
35{
36
37namespace ErrorCodes
38{
39 extern const int ILLEGAL_PREWHERE;
40 extern const int INCOMPATIBLE_SOURCE_TABLES;
41 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
42 extern const int NO_SUCH_COLUMN_IN_TABLE;
43 extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
44 extern const int SAMPLING_NOT_SUPPORTED;
45}
46
47
48StorageMerge::StorageMerge(
49 const std::string & database_name_,
50 const std::string & table_name_,
51 const ColumnsDescription & columns_,
52 const String & source_database_,
53 const String & table_name_regexp_,
54 const Context & context_)
55 : IStorage(ColumnsDescription({{"_table", std::make_shared<DataTypeString>()}}, true))
56 , table_name(table_name_)
57 , database_name(database_name_)
58 , source_database(source_database_)
59 , table_name_regexp(table_name_regexp_)
60 , global_context(context_)
61{
62 setColumns(columns_);
63}
64
65
66/// NOTE: structure of underlying tables as well as their set are not constant,
67/// so the results of these methods may become obsolete after the call.
68
69NameAndTypePair StorageMerge::getColumn(const String & column_name) const
70{
71 if (!IStorage::hasColumn(column_name))
72 {
73 auto first_table = getFirstTable([](auto &&) { return true; });
74 if (first_table)
75 return first_table->getColumn(column_name);
76 }
77
78 return IStorage::getColumn(column_name);
79}
80
81
82bool StorageMerge::hasColumn(const String & column_name) const
83{
84 if (!IStorage::hasColumn(column_name))
85 {
86 auto first_table = getFirstTable([](auto &&) { return true; });
87 if (first_table)
88 return first_table->hasColumn(column_name);
89 }
90
91 return true;
92}
93
94
95template <typename F>
96StoragePtr StorageMerge::getFirstTable(F && predicate) const
97{
98 auto iterator = getDatabaseIterator(global_context);
99
100 while (iterator->isValid())
101 {
102 auto & table = iterator->table();
103 if (table.get() != this && predicate(table))
104 return table;
105
106 iterator->next();
107 }
108
109 return {};
110}
111
112
113bool StorageMerge::isRemote() const
114{
115 auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); });
116 return first_remote_table != nullptr;
117}
118
119
120bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
121{
122 /// It's beneficial if it is true for at least one table.
123 StorageListWithLocks selected_tables = getSelectedTables(query_context.getCurrentQueryId());
124
125 size_t i = 0;
126 for (const auto & table : selected_tables)
127 {
128 if (table.first->mayBenefitFromIndexForIn(left_in_operand, query_context))
129 return true;
130
131 ++i;
132 /// For simplicity reasons, check only first ten tables.
133 if (i > 10)
134 break;
135 }
136
137 return false;
138}
139
140
141QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
142{
143 auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
144
145 DatabaseTablesIteratorPtr iterator = getDatabaseIterator(context);
146
147 size_t selected_table_size = 0;
148
149 while (iterator->isValid())
150 {
151 auto & table = iterator->table();
152 if (table.get() != this)
153 {
154 ++selected_table_size;
155 stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
156 }
157
158 iterator->next();
159 }
160
161 return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
162}
163
164
165BlockInputStreams StorageMerge::read(
166 const Names & column_names,
167 const SelectQueryInfo & query_info,
168 const Context & context,
169 QueryProcessingStage::Enum processed_stage,
170 const size_t max_block_size,
171 unsigned num_streams)
172{
173 BlockInputStreams res;
174
175 bool has_table_virtual_column = false;
176 Names real_column_names;
177 real_column_names.reserve(column_names.size());
178
179 for (const auto & column_name : column_names)
180 {
181 if (column_name == "_table" && isVirtualColumn(column_name))
182 has_table_virtual_column = true;
183 else
184 real_column_names.push_back(column_name);
185 }
186
187 /** Just in case, turn off optimization "transfer to PREWHERE",
188 * since there is no certainty that it works when one of table is MergeTree and other is not.
189 */
190 Context modified_context = context;
191 modified_context.getSettingsRef().optimize_move_to_prewhere = false;
192
193 /// What will be result structure depending on query processed stage in source tables?
194 Block header = getQueryHeader(column_names, query_info, context, processed_stage);
195
196 /** First we make list of selected tables to find out its size.
197 * This is necessary to correctly pass the recommended number of threads to each table.
198 */
199 StorageListWithLocks selected_tables = getSelectedTables(
200 query_info.query, has_table_virtual_column, true, context.getCurrentQueryId());
201
202 if (selected_tables.empty())
203 /// FIXME: do we support sampling in this case?
204 return createSourceStreams(
205 query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
206
207 size_t tables_count = selected_tables.size();
208 Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables)));
209 num_streams *= num_streams_multiplier;
210 size_t remaining_streams = num_streams;
211
212 InputSortingInfoPtr input_sorting_info;
213 if (query_info.order_by_optimizer)
214 {
215 for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
216 {
217 auto current_info = query_info.order_by_optimizer->getInputOrder(it->first);
218 if (it == selected_tables.begin())
219 input_sorting_info = current_info;
220 else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
221 input_sorting_info.reset();
222
223 if (!input_sorting_info)
224 break;
225 }
226
227 query_info.input_sorting_info = input_sorting_info;
228 }
229
230 for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
231 {
232 size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
233 size_t current_streams = std::min(current_need_streams, remaining_streams);
234 remaining_streams -= current_streams;
235 current_streams = std::max(size_t(1), current_streams);
236
237 StoragePtr storage = it->first;
238 TableStructureReadLockHolder struct_lock = it->second;
239
240 /// If sampling requested, then check that table supports it.
241 if (query_info.query->as<ASTSelectQuery>()->sample_size() && !storage->supportsSampling())
242 throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
243
244 BlockInputStreams source_streams;
245
246 if (current_streams)
247 {
248 source_streams = createSourceStreams(
249 query_info, processed_stage, max_block_size, header, storage,
250 struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column);
251 }
252 else
253 {
254 source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(
255 header, [=, this]() mutable -> BlockInputStreamPtr
256 {
257 BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size,
258 header, storage, struct_lock, real_column_names,
259 modified_context, current_streams, has_table_virtual_column, true);
260
261 if (!streams.empty() && streams.size() != 1)
262 throw Exception("LogicalError: the lazy stream size must to be one or empty.", ErrorCodes::LOGICAL_ERROR);
263
264 return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0];
265 }));
266 }
267
268 res.insert(res.end(), source_streams.begin(), source_streams.end());
269 }
270
271 if (res.empty())
272 return res;
273
274 res = narrowBlockInputStreams(res, num_streams);
275 return res;
276}
277
278BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
279 const UInt64 max_block_size, const Block & header, const StoragePtr & storage,
280 const TableStructureReadLockHolder & struct_lock, Names & real_column_names,
281 Context & modified_context, size_t streams_num, bool has_table_virtual_column,
282 bool concat_streams)
283{
284 SelectQueryInfo modified_query_info = query_info;
285 modified_query_info.query = query_info.query->clone();
286
287 VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage ? storage->getTableName() : "");
288
289 if (!storage)
290 return BlockInputStreams{
291 InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
292 SelectQueryOptions(processed_stage).analyze()).execute().in};
293
294 BlockInputStreams source_streams;
295
296 if (processed_stage <= storage->getQueryProcessingStage(modified_context))
297 {
298 /// If there are only virtual columns in query, you must request at least one other column.
299 if (real_column_names.size() ==0)
300 real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
301
302 source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size,
303 UInt32(streams_num));
304 }
305 else if (processed_stage > storage->getQueryProcessingStage(modified_context))
306 {
307 modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, storage->getTableName());
308
309 /// Maximum permissible parallelism is streams_num
310 modified_context.getSettingsRef().max_threads = UInt64(streams_num);
311 modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
312
313 InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
314 BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
315
316 /** Materialization is needed, since from distributed storage the constants come materialized.
317 * If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
318 * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
319 */
320 source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream));
321 }
322
323 if (!source_streams.empty())
324 {
325 if (concat_streams)
326 {
327 BlockInputStreamPtr stream =
328 source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
329
330 source_streams.resize(1);
331 source_streams[0] = stream;
332 }
333
334 for (BlockInputStreamPtr & source_stream : source_streams)
335 {
336 if (has_table_virtual_column)
337 source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
338 source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
339
340 /// Subordinary tables could have different but convertible types, like numeric types of different width.
341 /// We must return streams with structure equals to structure of Merge table.
342 convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage);
343
344 source_stream->addTableLock(struct_lock);
345 }
346 }
347
348 return source_streams;
349}
350
351
352StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id) const
353{
354 StorageListWithLocks selected_tables;
355 auto iterator = getDatabaseIterator(global_context);
356
357 while (iterator->isValid())
358 {
359 auto & table = iterator->table();
360 if (table.get() != this)
361 selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id));
362
363 iterator->next();
364 }
365
366 return selected_tables;
367}
368
369
370StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock, const String & query_id) const
371{
372 StorageListWithLocks selected_tables;
373 DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context);
374
375 auto virtual_column = ColumnString::create();
376
377 while (iterator->isValid())
378 {
379 StoragePtr storage = iterator->table();
380
381 if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere())
382 throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
383
384 if (storage.get() != this)
385 {
386 selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{});
387 virtual_column->insert(storage->getTableName());
388 }
389
390 iterator->next();
391 }
392
393 if (has_virtual_column)
394 {
395 Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
396 VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context);
397 auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
398
399 /// Remove unused tables from the list
400 selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); });
401 }
402
403 return selected_tables;
404}
405
406
407DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & context) const
408{
409 checkStackSize();
410 auto database = context.getDatabase(source_database);
411 auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); };
412 return database->getTablesIterator(global_context, table_name_match);
413}
414
415
416void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
417{
418 for (const auto & command : commands)
419 {
420 if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
421 && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
422 throw Exception(
423 "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
424 ErrorCodes::NOT_IMPLEMENTED);
425 }
426}
427
428void StorageMerge::alter(
429 const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
430{
431 lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
432
433 StorageInMemoryMetadata storage_metadata = getInMemoryMetadata();
434 params.apply(storage_metadata);
435 context.getDatabase(database_name)->alterTable(context, table_name, storage_metadata);
436 setColumns(storage_metadata.columns);
437}
438
439Block StorageMerge::getQueryHeader(
440 const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
441{
442 switch (processed_stage)
443 {
444 case QueryProcessingStage::FetchColumns:
445 {
446 Block header = getSampleBlockForColumns(column_names);
447 if (query_info.prewhere_info)
448 {
449 query_info.prewhere_info->prewhere_actions->execute(header);
450 header = materializeBlock(header);
451 if (query_info.prewhere_info->remove_prewhere_column)
452 header.erase(query_info.prewhere_info->prewhere_column_name);
453 }
454 return header;
455 }
456 case QueryProcessingStage::WithMergeableState:
457 case QueryProcessingStage::Complete:
458 return materializeBlock(InterpreterSelectQuery(
459 query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
460 SelectQueryOptions(processed_stage).analyze()).getSampleBlock());
461 }
462 throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
463}
464
465void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
466 BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage)
467{
468 Block before_block_header = source_stream->getHeader();
469 source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
470
471 auto where_expression = query->as<ASTSelectQuery>()->where();
472
473 if (!where_expression)
474 return;
475
476 for (size_t column_index : ext::range(0, header.columns()))
477 {
478 ColumnWithTypeAndName header_column = header.getByPosition(column_index);
479 ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name);
480 /// If the processed_stage greater than FetchColumns and the block structure between streams is different.
481 /// the where expression maybe invalid because of convertingBlockInputStream.
482 /// So we need to throw exception.
483 if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns)
484 {
485 NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList();
486 NameAndTypePair virtual_column = getColumn("_table");
487 source_columns.insert(source_columns.end(), virtual_column);
488 auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns);
489 ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, context}.getActions(false, false);
490 Names required_columns = actions->getRequiredColumns();
491
492 for (const auto & required_column : required_columns)
493 {
494 if (required_column == header_column.name)
495 throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure()
496 + "\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
497 }
498 }
499
500 }
501}
502
503
504void registerStorageMerge(StorageFactory & factory)
505{
506 factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
507 {
508 /** In query, the name of database is specified as table engine argument which contains source tables,
509 * as well as regex for source-table names.
510 */
511
512 ASTs & engine_args = args.engine_args;
513
514 if (engine_args.size() != 2)
515 throw Exception("Storage Merge requires exactly 2 parameters"
516 " - name of source database and regexp for table names.",
517 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
518
519 engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
520 engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
521
522 String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
523 String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
524
525 return StorageMerge::create(
526 args.database_name, args.table_name, args.columns,
527 source_database, table_name_regexp, args.context);
528 });
529}
530
531}
532