1#include <DataStreams/ExpressionBlockInputStream.h>
2#include <DataStreams/FilterBlockInputStream.h>
3#include <DataStreams/FinishSortingBlockInputStream.h>
4#include <DataStreams/LimitBlockInputStream.h>
5#include <DataStreams/LimitByBlockInputStream.h>
6#include <DataStreams/PartialSortingBlockInputStream.h>
7#include <DataStreams/MergeSortingBlockInputStream.h>
8#include <DataStreams/MergingSortedBlockInputStream.h>
9#include <DataStreams/AggregatingBlockInputStream.h>
10#include <DataStreams/MergingAggregatedBlockInputStream.h>
11#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
12#include <DataStreams/AsynchronousBlockInputStream.h>
13#include <DataStreams/UnionBlockInputStream.h>
14#include <DataStreams/ParallelAggregatingBlockInputStream.h>
15#include <DataStreams/DistinctBlockInputStream.h>
16#include <DataStreams/NullBlockInputStream.h>
17#include <DataStreams/TotalsHavingBlockInputStream.h>
18#include <DataStreams/OneBlockInputStream.h>
19#include <DataStreams/copyData.h>
20#include <DataStreams/CreatingSetsBlockInputStream.h>
21#include <DataStreams/MaterializingBlockInputStream.h>
22#include <DataStreams/ConcatBlockInputStream.h>
23#include <DataStreams/RollupBlockInputStream.h>
24#include <DataStreams/CubeBlockInputStream.h>
25#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
26#include <DataStreams/ConvertingBlockInputStream.h>
27#include <DataStreams/ReverseBlockInputStream.h>
28#include <DataStreams/FillingBlockInputStream.h>
29#include <DataStreams/SquashingBlockInputStream.h>
30
31#include <Parsers/ASTFunction.h>
32#include <Parsers/ASTIdentifier.h>
33#include <Parsers/ASTLiteral.h>
34#include <Parsers/ASTOrderByElement.h>
35#include <Parsers/ASTSelectWithUnionQuery.h>
36#include <Parsers/ASTTablesInSelectQuery.h>
37#include <Parsers/ParserSelectQuery.h>
38#include <Parsers/ExpressionListParsers.h>
39#include <Parsers/parseQuery.h>
40
41#include <Access/RowPolicyContext.h>
42
43#include <Interpreters/InterpreterSelectQuery.h>
44#include <Interpreters/InterpreterSelectWithUnionQuery.h>
45#include <Interpreters/InterpreterSetQuery.h>
46#include <Interpreters/evaluateConstantExpression.h>
47#include <Interpreters/convertFieldToType.h>
48#include <Interpreters/ExpressionAnalyzer.h>
49#include <Interpreters/getTableExpressions.h>
50#include <Interpreters/JoinToSubqueryTransformVisitor.h>
51#include <Interpreters/CrossToInnerJoinVisitor.h>
52#include <Interpreters/AnalyzedJoin.h>
53
54#include <Storages/MergeTree/MergeTreeData.h>
55#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
56#include <Storages/IStorage.h>
57#include <Storages/StorageValues.h>
58
59#include <TableFunctions/ITableFunction.h>
60#include <TableFunctions/TableFunctionFactory.h>
61
62#include <Functions/IFunction.h>
63#include <Core/Field.h>
64#include <Core/Types.h>
65#include <Columns/Collator.h>
66#include <Common/FieldVisitors.h>
67#include <Common/typeid_cast.h>
68#include <Common/checkStackSize.h>
69#include <Parsers/queryToString.h>
70#include <ext/map.h>
71#include <ext/scope_guard.h>
72#include <memory>
73
74#include <Processors/Sources/NullSource.h>
75#include <Processors/Sources/SourceFromInputStream.h>
76#include <Processors/Transforms/FilterTransform.h>
77#include <Processors/Transforms/ExpressionTransform.h>
78#include <Processors/Transforms/AggregatingTransform.h>
79#include <Processors/Transforms/MergingAggregatedTransform.h>
80#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
81#include <Processors/Transforms/TotalsHavingTransform.h>
82#include <Processors/Transforms/PartialSortingTransform.h>
83#include <Processors/Transforms/LimitsCheckingTransform.h>
84#include <Processors/Transforms/MergeSortingTransform.h>
85#include <Processors/Transforms/MergingSortedTransform.h>
86#include <Processors/Transforms/DistinctTransform.h>
87#include <Processors/Transforms/LimitByTransform.h>
88#include <Processors/Transforms/ExtremesTransform.h>
89#include <Processors/Transforms/CreatingSetsTransform.h>
90#include <Processors/Transforms/RollupTransform.h>
91#include <Processors/Transforms/CubeTransform.h>
92#include <Processors/Transforms/FillingTransform.h>
93#include <Processors/LimitTransform.h>
94#include <Processors/Transforms/FinishSortingTransform.h>
95#include <DataTypes/DataTypeAggregateFunction.h>
96#include <DataStreams/materializeBlock.h>
97#include <Processors/Pipe.h>
98
99
100namespace DB
101{
102
103namespace ErrorCodes
104{
105 extern const int TOO_DEEP_SUBQUERIES;
106 extern const int THERE_IS_NO_COLUMN;
107 extern const int SAMPLING_NOT_SUPPORTED;
108 extern const int ILLEGAL_FINAL;
109 extern const int ILLEGAL_PREWHERE;
110 extern const int TOO_MANY_COLUMNS;
111 extern const int LOGICAL_ERROR;
112 extern const int NOT_IMPLEMENTED;
113 extern const int PARAMETER_OUT_OF_BOUND;
114 extern const int ARGUMENT_OUT_OF_BOUND;
115 extern const int INVALID_LIMIT_EXPRESSION;
116 extern const int INVALID_WITH_FILL_EXPRESSION;
117}
118
119namespace
120{
121
122/// Assumes `storage` is set and the table filter (row-level security) is not empty.
123String generateFilterActions(ExpressionActionsPtr & actions, const Context & context, const StoragePtr & storage, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {})
124{
125 const auto & db_name = storage->getDatabaseName();
126 const auto & table_name = storage->getTableName();
127
128 /// TODO: implement some AST builders for this kind of stuff
129 ASTPtr query_ast = std::make_shared<ASTSelectQuery>();
130 auto * select_ast = query_ast->as<ASTSelectQuery>();
131
132 select_ast->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
133 auto expr_list = select_ast->select();
134
135 // The first column is our filter expression.
136 expr_list->children.push_back(row_policy_filter);
137
138 /// Keep columns that are required after the filter actions.
139 for (const auto & column_str : prerequisite_columns)
140 {
141 ParserExpression expr_parser;
142 expr_list->children.push_back(parseQuery(expr_parser, column_str, 0));
143 }
144
145 select_ast->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
146 auto tables = select_ast->tables();
147 auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
148 auto table_expr = std::make_shared<ASTTableExpression>();
149 tables->children.push_back(tables_elem);
150 tables_elem->table_expression = table_expr;
151 tables_elem->children.push_back(table_expr);
152 table_expr->database_and_table_name = createTableIdentifier(db_name, table_name);
153 table_expr->children.push_back(table_expr->database_and_table_name);
154
155 /// Using separate expression analyzer to prevent any possible alias injection
156 auto syntax_result = SyntaxAnalyzer(context).analyze(query_ast, storage->getColumns().getAllPhysical());
157 SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, context);
158 ExpressionActionsChain new_chain(context);
159 analyzer.appendSelect(new_chain, false);
160 actions = new_chain.getLastActions();
161
162 return expr_list->children.at(0)->getColumnName();
163}
164
165}
166
167InterpreterSelectQuery::InterpreterSelectQuery(
168 const ASTPtr & query_ptr_,
169 const Context & context_,
170 const SelectQueryOptions & options_,
171 const Names & required_result_column_names_)
172 : InterpreterSelectQuery(query_ptr_, context_, nullptr, nullptr, options_, required_result_column_names_)
173{
174}
175
176InterpreterSelectQuery::InterpreterSelectQuery(
177 const ASTPtr & query_ptr_,
178 const Context & context_,
179 const BlockInputStreamPtr & input_,
180 const SelectQueryOptions & options_)
181 : InterpreterSelectQuery(query_ptr_, context_, input_, nullptr, options_.copy().noSubquery())
182{}
183
184InterpreterSelectQuery::InterpreterSelectQuery(
185 const ASTPtr & query_ptr_,
186 const Context & context_,
187 const StoragePtr & storage_,
188 const SelectQueryOptions & options_)
189 : InterpreterSelectQuery(query_ptr_, context_, nullptr, storage_, options_.copy().noSubquery())
190{}
191
192InterpreterSelectQuery::~InterpreterSelectQuery() = default;
193
194
195/** There are no limits on the maximum size of the result for the subquery.
196 * Since the result of the query is not the result of the entire query.
197 */
198static Context getSubqueryContext(const Context & context)
199{
200 Context subquery_context = context;
201 Settings subquery_settings = context.getSettings();
202 subquery_settings.max_result_rows = 0;
203 subquery_settings.max_result_bytes = 0;
204 /// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).
205 subquery_settings.extremes = 0;
206 subquery_context.setSettings(subquery_settings);
207 return subquery_context;
208}
209
210static void sanitizeBlock(Block & block)
211{
212 for (auto & col : block)
213 {
214 if (!col.column)
215 col.column = col.type->createColumn();
216 else if (isColumnConst(*col.column) && !col.column->empty())
217 col.column = col.column->cloneEmpty();
218 }
219}
220
221InterpreterSelectQuery::InterpreterSelectQuery(
222 const ASTPtr & query_ptr_,
223 const Context & context_,
224 const BlockInputStreamPtr & input_,
225 const StoragePtr & storage_,
226 const SelectQueryOptions & options_,
227 const Names & required_result_column_names)
228 : options(options_)
229 /// NOTE: the query almost always should be cloned because it will be modified during analysis.
230 , query_ptr(options.modify_inplace ? query_ptr_ : query_ptr_->clone())
231 , context(std::make_shared<Context>(context_))
232 , storage(storage_)
233 , input(input_)
234 , log(&Logger::get("InterpreterSelectQuery"))
235{
236 checkStackSize();
237
238 initSettings();
239 const Settings & settings = context->getSettingsRef();
240
241 if (settings.max_subquery_depth && options.subquery_depth > settings.max_subquery_depth)
242 throw Exception("Too deep subqueries. Maximum: " + settings.max_subquery_depth.toString(),
243 ErrorCodes::TOO_DEEP_SUBQUERIES);
244
245 CrossToInnerJoinVisitor::Data cross_to_inner;
246 CrossToInnerJoinVisitor(cross_to_inner).visit(query_ptr);
247
248 JoinToSubqueryTransformVisitor::Data join_to_subs_data{*context};
249 JoinToSubqueryTransformVisitor(join_to_subs_data).visit(query_ptr);
250
251 max_streams = settings.max_threads;
252 auto & query = getSelectQuery();
253
254 ASTPtr table_expression = extractTableExpression(query, 0);
255
256 bool is_table_func = false;
257 bool is_subquery = false;
258 if (table_expression)
259 {
260 is_table_func = table_expression->as<ASTFunction>();
261 is_subquery = table_expression->as<ASTSelectWithUnionQuery>();
262 }
263
264 if (input)
265 {
266 /// Read from prepared input.
267 source_header = input->getHeader();
268 }
269 else if (is_subquery)
270 {
271 /// Read from subquery.
272 interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
273 table_expression, getSubqueryContext(*context), options.subquery(), required_columns);
274
275 source_header = interpreter_subquery->getSampleBlock();
276 }
277 else if (!storage)
278 {
279 if (is_table_func)
280 {
281 /// Read from table function. propagate all settings from initSettings(),
282 /// alternative is to call on current `context`, but that can potentially pollute it.
283 storage = getSubqueryContext(*context).executeTableFunction(table_expression);
284 }
285 else
286 {
287 String database_name;
288 String table_name;
289
290 getDatabaseAndTableNames(query, database_name, table_name, *context);
291
292 if (auto view_source = context->getViewSource())
293 {
294 auto & storage_values = static_cast<const StorageValues &>(*view_source);
295 if (storage_values.getDatabaseName() == database_name && storage_values.getTableName() == table_name)
296 {
297 /// Read from view source.
298 storage = context->getViewSource();
299 }
300 }
301
302 if (!storage)
303 {
304 /// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
305 storage = context->getTable(database_name, table_name);
306 }
307 }
308 }
309
310 if (storage)
311 table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
312
313 auto analyze = [&] ()
314 {
315 syntax_analyzer_result = SyntaxAnalyzer(*context, options).analyze(
316 query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
317
318 /// Save scalar sub queries's results in the query context
319 if (context->hasQueryContext())
320 for (const auto & it : syntax_analyzer_result->getScalars())
321 context->getQueryContext().addScalar(it.first, it.second);
322
323 query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
324 query_ptr, syntax_analyzer_result, *context,
325 NameSet(required_result_column_names.begin(), required_result_column_names.end()),
326 options.subquery_depth, !options.only_analyze);
327
328 if (!options.only_analyze)
329 {
330 if (query.sample_size() && (input || !storage || !storage->supportsSampling()))
331 throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
332
333 if (query.final() && (input || !storage || !storage->supportsFinal()))
334 throw Exception((!input && storage) ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL", ErrorCodes::ILLEGAL_FINAL);
335
336 if (query.prewhere() && (input || !storage || !storage->supportsPrewhere()))
337 throw Exception((!input && storage) ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE);
338
339 /// Save the new temporary tables in the query context
340 for (const auto & it : query_analyzer->getExternalTables())
341 if (!context->tryGetExternalTable(it.first))
342 context->addExternalTable(it.first, it.second);
343 }
344
345 if (!options.only_analyze || options.modify_inplace)
346 {
347 if (syntax_analyzer_result->rewrite_subqueries)
348 {
349 /// remake interpreter_subquery when PredicateOptimizer rewrites subqueries and main table is subquery
350 if (is_subquery)
351 interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
352 table_expression,
353 getSubqueryContext(*context),
354 options.subquery(),
355 required_columns);
356 }
357 }
358
359 if (interpreter_subquery)
360 {
361 /// If there is an aggregation in the outer query, WITH TOTALS is ignored in the subquery.
362 if (query_analyzer->hasAggregation())
363 interpreter_subquery->ignoreWithTotals();
364 }
365
366 required_columns = syntax_analyzer_result->requiredSourceColumns();
367
368 if (storage)
369 {
370 source_header = storage->getSampleBlockForColumns(required_columns);
371
372 /// Fix source_header for filter actions.
373 auto row_policy_filter = context->getRowPolicy()->getCondition(storage->getDatabaseName(), storage->getTableName(), RowPolicy::SELECT_FILTER);
374 if (row_policy_filter)
375 {
376 filter_info = std::make_shared<FilterInfo>();
377 filter_info->column_name = generateFilterActions(filter_info->actions, *context, storage, row_policy_filter, required_columns);
378 source_header = storage->getSampleBlockForColumns(filter_info->actions->getRequiredColumns());
379 }
380 }
381
382 if (!options.only_analyze && storage && filter_info && query.prewhere())
383 throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
384
385 /// Calculate structure of the result.
386 result_header = getSampleBlockImpl();
387 };
388
389 analyze();
390
391 bool need_analyze_again = false;
392 if (analysis_result.prewhere_constant_filter_description.always_false || analysis_result.prewhere_constant_filter_description.always_true)
393 {
394 if (analysis_result.prewhere_constant_filter_description.always_true)
395 query.setExpression(ASTSelectQuery::Expression::PREWHERE, {});
396 else
397 query.setExpression(ASTSelectQuery::Expression::PREWHERE, std::make_shared<ASTLiteral>(0u));
398 need_analyze_again = true;
399 }
400 if (analysis_result.where_constant_filter_description.always_false || analysis_result.where_constant_filter_description.always_true)
401 {
402 if (analysis_result.where_constant_filter_description.always_true)
403 query.setExpression(ASTSelectQuery::Expression::WHERE, {});
404 else
405 query.setExpression(ASTSelectQuery::Expression::WHERE, std::make_shared<ASTLiteral>(0u));
406 need_analyze_again = true;
407 }
408 if (query.prewhere() && query.where())
409 {
410 /// Filter block in WHERE instead to get better performance
411 query.setExpression(ASTSelectQuery::Expression::WHERE, makeASTFunction("and", query.prewhere()->clone(), query.where()->clone()));
412 need_analyze_again = true;
413 }
414 if (need_analyze_again)
415 analyze();
416
417 /// If there is no WHERE, filter blocks as usual
418 if (query.prewhere() && !query.where())
419 analysis_result.prewhere_info->need_filter = true;
420
421 /// Blocks used in expression analysis contains size 1 const columns for constant folding and
422 /// null non-const columns to avoid useless memory allocations. However, a valid block sample
423 /// requires all columns to be of size 0, thus we need to sanitize the block here.
424 sanitizeBlock(result_header);
425
426 /// Remove limits for some tables in the `system` database.
427 if (storage && (storage->getDatabaseName() == "system"))
428 {
429 String table_name = storage->getTableName();
430 if ((table_name == "quotas") || (table_name == "quota_usage") || (table_name == "one"))
431 {
432 options.ignore_quota = true;
433 options.ignore_limits = true;
434 }
435 }
436}
437
438
439void InterpreterSelectQuery::getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context)
440{
441 if (auto db_and_table = getDatabaseAndTable(query, 0))
442 {
443 table_name = db_and_table->table;
444 database_name = db_and_table->database;
445
446 /// If the database is not specified - use the current database.
447 if (database_name.empty() && !context.tryGetTable("", table_name))
448 database_name = context.getCurrentDatabase();
449 }
450 else /// If the table is not specified - use the table `system.one`.
451 {
452 database_name = "system";
453 table_name = "one";
454 }
455}
456
457
458Block InterpreterSelectQuery::getSampleBlock()
459{
460 return result_header;
461}
462
463
464BlockIO InterpreterSelectQuery::execute()
465{
466 Pipeline pipeline;
467 BlockIO res;
468 executeImpl(pipeline, input, res.pipeline);
469 executeUnion(pipeline, getSampleBlock());
470
471 res.in = pipeline.firstStream();
472 res.pipeline.addInterpreterContext(context);
473 res.pipeline.addStorageHolder(storage);
474 return res;
475}
476
477BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline)
478{
479 ///FIXME pipeline must be alive until query is finished
480 Pipeline pipeline;
481 executeImpl(pipeline, input, parent_pipeline);
482 unifyStreams(pipeline, getSampleBlock());
483 parent_pipeline.addInterpreterContext(context);
484 parent_pipeline.addStorageHolder(storage);
485 return pipeline.streams;
486}
487
488QueryPipeline InterpreterSelectQuery::executeWithProcessors()
489{
490 QueryPipeline query_pipeline;
491 query_pipeline.setMaxThreads(context->getSettingsRef().max_threads);
492 executeImpl(query_pipeline, input, query_pipeline);
493 query_pipeline.addInterpreterContext(context);
494 query_pipeline.addStorageHolder(storage);
495 return query_pipeline;
496}
497
498
499Block InterpreterSelectQuery::getSampleBlockImpl()
500{
501 auto & query = getSelectQuery();
502 const Settings & settings = context->getSettingsRef();
503
504 /// Do all AST changes here, because actions from analysis_result will be used later in readImpl.
505
506 /// PREWHERE optimization.
507 /// Turn off, if the table filter (row-level security) is applied.
508 if (storage && !context->getRowPolicy()->getCondition(storage->getDatabaseName(), storage->getTableName(), RowPolicy::SELECT_FILTER))
509 {
510 query_analyzer->makeSetsForIndex(query.where());
511 query_analyzer->makeSetsForIndex(query.prewhere());
512
513 auto optimize_prewhere = [&](auto & merge_tree)
514 {
515 SelectQueryInfo current_info;
516 current_info.query = query_ptr;
517 current_info.syntax_analyzer_result = syntax_analyzer_result;
518 current_info.sets = query_analyzer->getPreparedSets();
519
520 /// Try transferring some condition from WHERE to PREWHERE if enabled and viable
521 if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
522 MergeTreeWhereOptimizer{current_info, *context, merge_tree,
523 syntax_analyzer_result->requiredSourceColumns(), log};
524 };
525
526 if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
527 optimize_prewhere(*merge_tree_data);
528 }
529
530 if (storage && !options.only_analyze)
531 from_stage = storage->getQueryProcessingStage(*context);
532
533 analysis_result = analyzeExpressions(
534 getSelectQuery(),
535 *query_analyzer,
536 from_stage,
537 options.to_stage,
538 *context,
539 storage,
540 options.only_analyze,
541 filter_info,
542 source_header
543 );
544
545 if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
546 {
547 auto header = source_header;
548
549 if (analysis_result.prewhere_info)
550 {
551 analysis_result.prewhere_info->prewhere_actions->execute(header);
552 header = materializeBlock(header);
553 if (analysis_result.prewhere_info->remove_prewhere_column)
554 header.erase(analysis_result.prewhere_info->prewhere_column_name);
555 }
556 return header;
557 }
558
559 if (options.to_stage == QueryProcessingStage::Enum::WithMergeableState)
560 {
561 if (!analysis_result.need_aggregate)
562 return analysis_result.before_order_and_select->getSampleBlock();
563
564 auto header = analysis_result.before_aggregation->getSampleBlock();
565
566 Names key_names;
567 AggregateDescriptions aggregates;
568 query_analyzer->getAggregateInfo(key_names, aggregates);
569
570 Block res;
571
572 for (auto & key : key_names)
573 res.insert({nullptr, header.getByName(key).type, key});
574
575 for (auto & aggregate : aggregates)
576 {
577 size_t arguments_size = aggregate.argument_names.size();
578 DataTypes argument_types(arguments_size);
579 for (size_t j = 0; j < arguments_size; ++j)
580 argument_types[j] = header.getByName(aggregate.argument_names[j]).type;
581
582 DataTypePtr type = std::make_shared<DataTypeAggregateFunction>(aggregate.function, argument_types, aggregate.parameters);
583
584 res.insert({nullptr, type, aggregate.column_name});
585 }
586
587 return res;
588 }
589
590 return analysis_result.final_projection->getSampleBlock();
591}
592
593/// Check if there is an ignore function. It's used for disabling constant folding in query
594/// predicates because some performance tests use ignore function as a non-optimize guard.
595static bool hasIgnore(const ExpressionActions & actions)
596{
597 for (auto & action : actions.getActions())
598 {
599 if (action.type == action.APPLY_FUNCTION && action.function_base)
600 {
601 auto name = action.function_base->getName();
602 if (name == "ignore")
603 return true;
604 }
605 }
606 return false;
607}
608
609InterpreterSelectQuery::AnalysisResult
610InterpreterSelectQuery::analyzeExpressions(
611 const ASTSelectQuery & query,
612 SelectQueryExpressionAnalyzer & query_analyzer,
613 QueryProcessingStage::Enum from_stage,
614 QueryProcessingStage::Enum to_stage,
615 const Context & context,
616 const StoragePtr & storage,
617 bool only_types,
618 const FilterInfoPtr & filter_info,
619 const Block & source_header)
620{
621 AnalysisResult res;
622
623 /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
624 res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
625 && to_stage >= QueryProcessingStage::WithMergeableState;
626 /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
627 res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
628 && to_stage > QueryProcessingStage::WithMergeableState;
629
630 /** First we compose a chain of actions and remember the necessary steps from it.
631 * Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
632 * throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
633 */
634
635 bool has_filter = false;
636 bool has_prewhere = false;
637 bool has_where = false;
638 size_t where_step_num;
639
640 auto finalizeChain = [&](ExpressionActionsChain & chain)
641 {
642 chain.finalize();
643
644 if (has_prewhere)
645 {
646 const ExpressionActionsChain::Step & step = chain.steps.at(0);
647 res.prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
648
649 Names columns_to_remove;
650 for (size_t i = 1; i < step.required_output.size(); ++i)
651 {
652 if (step.can_remove_required_output[i])
653 columns_to_remove.push_back(step.required_output[i]);
654 }
655
656 if (!columns_to_remove.empty())
657 {
658 auto columns = res.prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList();
659 ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(columns, context);
660 for (const auto & column : columns_to_remove)
661 actions->add(ExpressionAction::removeColumn(column));
662
663 res.prewhere_info->remove_columns_actions = std::move(actions);
664 }
665
666 res.columns_to_remove_after_prewhere = std::move(columns_to_remove);
667 }
668 else if (has_filter)
669 {
670 /// Can't have prewhere and filter set simultaneously
671 res.filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
672 }
673 if (has_where)
674 res.remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
675
676 has_filter = has_prewhere = has_where = false;
677
678 chain.clear();
679 };
680
681 {
682 ExpressionActionsChain chain(context);
683 Names additional_required_columns_after_prewhere;
684
685 if (storage && (query.sample_size() || context.getSettingsRef().parallel_replicas_count > 1))
686 {
687 Names columns_for_sampling = storage->getColumnsRequiredForSampling();
688 additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
689 columns_for_sampling.begin(), columns_for_sampling.end());
690 }
691
692 if (storage && query.final())
693 {
694 Names columns_for_final = storage->getColumnsRequiredForFinal();
695 additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
696 columns_for_final.begin(), columns_for_final.end());
697 }
698
699 if (storage && filter_info)
700 {
701 has_filter = true;
702 res.filter_info = filter_info;
703 query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
704 }
705
706 if (query_analyzer.appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere))
707 {
708 has_prewhere = true;
709
710 res.prewhere_info = std::make_shared<PrewhereInfo>(
711 chain.steps.front().actions, query.prewhere()->getColumnName());
712
713 if (!hasIgnore(*res.prewhere_info->prewhere_actions))
714 {
715 Block before_prewhere_sample = source_header;
716 sanitizeBlock(before_prewhere_sample);
717 res.prewhere_info->prewhere_actions->execute(before_prewhere_sample);
718 auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName());
719 /// If the filter column is a constant, record it.
720 if (column_elem.column)
721 res.prewhere_constant_filter_description = ConstantFilterDescription(*column_elem.column);
722 }
723 chain.addStep();
724 }
725
726 res.need_aggregate = query_analyzer.hasAggregation();
727
728 query_analyzer.appendArrayJoin(chain, only_types || !res.first_stage);
729
730 if (query_analyzer.appendJoin(chain, only_types || !res.first_stage))
731 {
732 res.before_join = chain.getLastActions();
733 if (!res.hasJoin())
734 throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR);
735 chain.addStep();
736 }
737
738 if (query_analyzer.appendWhere(chain, only_types || !res.first_stage))
739 {
740 where_step_num = chain.steps.size() - 1;
741 has_where = res.has_where = true;
742 res.before_where = chain.getLastActions();
743 if (!hasIgnore(*res.before_where))
744 {
745 Block before_where_sample;
746 if (chain.steps.size() > 1)
747 before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock();
748 else
749 before_where_sample = source_header;
750 sanitizeBlock(before_where_sample);
751 res.before_where->execute(before_where_sample);
752 auto & column_elem = before_where_sample.getByName(query.where()->getColumnName());
753 /// If the filter column is a constant, record it.
754 if (column_elem.column)
755 res.where_constant_filter_description = ConstantFilterDescription(*column_elem.column);
756 }
757 chain.addStep();
758 }
759
760 if (res.need_aggregate)
761 {
762 query_analyzer.appendGroupBy(chain, only_types || !res.first_stage);
763 query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !res.first_stage);
764 res.before_aggregation = chain.getLastActions();
765
766 finalizeChain(chain);
767
768 if (query_analyzer.appendHaving(chain, only_types || !res.second_stage))
769 {
770 res.has_having = true;
771 res.before_having = chain.getLastActions();
772 chain.addStep();
773 }
774 }
775
776 bool has_stream_with_non_joned_rows = (res.before_join && res.before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
777 res.optimize_read_in_order =
778 context.getSettingsRef().optimize_read_in_order
779 && storage && query.orderBy()
780 && !query_analyzer.hasAggregation()
781 && !query.final()
782 && !has_stream_with_non_joned_rows;
783
784 /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
785 query_analyzer.appendSelect(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
786 res.selected_columns = chain.getLastStep().required_output;
787 res.has_order_by = query_analyzer.appendOrderBy(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage), res.optimize_read_in_order);
788 res.before_order_and_select = chain.getLastActions();
789 chain.addStep();
790
791 if (query_analyzer.appendLimitBy(chain, only_types || !res.second_stage))
792 {
793 res.has_limit_by = true;
794 res.before_limit_by = chain.getLastActions();
795 chain.addStep();
796 }
797
798 query_analyzer.appendProjectResult(chain);
799 res.final_projection = chain.getLastActions();
800
801 finalizeChain(chain);
802 }
803
804 /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
805 if (res.filter_info)
806 res.filter_info->actions->prependProjectInput();
807 if (res.has_where)
808 res.before_where->prependProjectInput();
809 if (res.has_having)
810 res.before_having->prependProjectInput();
811
812 res.subqueries_for_sets = query_analyzer.getSubqueriesForSets();
813
814 /// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
815 if (res.prewhere_info)
816 {
817 auto check_actions = [](const ExpressionActionsPtr & actions)
818 {
819 if (actions)
820 for (const auto & action : actions->getActions())
821 if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
822 throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
823 };
824
825 check_actions(res.prewhere_info->prewhere_actions);
826 check_actions(res.prewhere_info->alias_actions);
827 check_actions(res.prewhere_info->remove_columns_actions);
828 }
829
830 return res;
831}
832
833static Field getWithFillFieldValue(const ASTPtr & node, const Context & context)
834{
835 const auto & [field, type] = evaluateConstantExpression(node, context);
836
837 if (!isColumnedAsNumber(type))
838 throw Exception("Illegal type " + type->getName() + " of WITH FILL expression, must be numeric type", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
839
840 return field;
841}
842
843static FillColumnDescription getWithFillDescription(const ASTOrderByElement & order_by_elem, const Context & context)
844{
845 FillColumnDescription descr;
846 if (order_by_elem.fill_from)
847 descr.fill_from = getWithFillFieldValue(order_by_elem.fill_from, context);
848 if (order_by_elem.fill_to)
849 descr.fill_to = getWithFillFieldValue(order_by_elem.fill_to, context);
850 if (order_by_elem.fill_step)
851 descr.fill_step = getWithFillFieldValue(order_by_elem.fill_step, context);
852 else
853 descr.fill_step = order_by_elem.direction;
854
855 if (applyVisitor(FieldVisitorAccurateEquals(), descr.fill_step, Field{0}))
856 throw Exception("WITH FILL STEP value cannot be zero", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
857
858 if (order_by_elem.direction == 1)
859 {
860 if (applyVisitor(FieldVisitorAccurateLess(), descr.fill_step, Field{0}))
861 throw Exception("WITH FILL STEP value cannot be negative for sorting in ascending direction",
862 ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
863
864 if (!descr.fill_from.isNull() && !descr.fill_to.isNull() &&
865 applyVisitor(FieldVisitorAccurateLess(), descr.fill_to, descr.fill_from))
866 {
867 throw Exception("WITH FILL TO value cannot be less than FROM value for sorting in ascending direction",
868 ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
869 }
870 }
871 else
872 {
873 if (applyVisitor(FieldVisitorAccurateLess(), Field{0}, descr.fill_step))
874 throw Exception("WITH FILL STEP value cannot be positive for sorting in descending direction",
875 ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
876
877 if (!descr.fill_from.isNull() && !descr.fill_to.isNull() &&
878 applyVisitor(FieldVisitorAccurateLess(), descr.fill_from, descr.fill_to))
879 {
880 throw Exception("WITH FILL FROM value cannot be less than TO value for sorting in descending direction",
881 ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
882 }
883 }
884
885 return descr;
886}
887
888static SortDescription getSortDescription(const ASTSelectQuery & query, const Context & context)
889{
890 SortDescription order_descr;
891 order_descr.reserve(query.orderBy()->children.size());
892 for (const auto & elem : query.orderBy()->children)
893 {
894 String name = elem->children.front()->getColumnName();
895 const auto & order_by_elem = elem->as<ASTOrderByElement &>();
896
897 std::shared_ptr<Collator> collator;
898 if (order_by_elem.collation)
899 collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
900
901 if (order_by_elem.with_fill)
902 {
903 FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
904 order_descr.emplace_back(name, order_by_elem.direction,
905 order_by_elem.nulls_direction, collator, true, fill_desc);
906 }
907 else
908 order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
909 }
910
911 return order_descr;
912}
913
914static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
915{
916 const auto & [field, type] = evaluateConstantExpression(node, context);
917
918 if (!isNativeNumber(type))
919 throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
920
921 Field converted = convertFieldToType(field, DataTypeUInt64());
922 if (converted.isNull())
923 throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION);
924
925 return converted.safeGet<UInt64>();
926}
927
928
929static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery & query, const Context & context)
930{
931 UInt64 length = 0;
932 UInt64 offset = 0;
933
934 if (query.limitLength())
935 {
936 length = getLimitUIntValue(query.limitLength(), context);
937 if (query.limitOffset() && length)
938 offset = getLimitUIntValue(query.limitOffset(), context);
939 }
940
941 return {length, offset};
942}
943
944
945static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
946{
947 /// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
948 if (!query.distinct && !query.limitBy() && !query.limit_with_ties)
949 {
950 auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
951 return limit_length + limit_offset;
952 }
953 return 0;
954}
955
956
957template <typename TPipeline>
958void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, QueryPipeline & save_context_and_storage)
959{
960 /** Streams of data. When the query is executed in parallel, we have several data streams.
961 * If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
962 * if there is an ORDER BY, then glue the streams using UnionBlockInputStream, and then MergeSortingBlockInputStream,
963 * if not, then glue it using UnionBlockInputStream,
964 * then apply LIMIT.
965 * If there is GROUP BY, then we will perform all operations up to GROUP BY, inclusive, in parallel;
966 * a parallel GROUP BY will glue streams into one,
967 * then perform the remaining operations with one resulting stream.
968 */
969
970 constexpr bool pipeline_with_processors = std::is_same<TPipeline, QueryPipeline>::value;
971
972 /// Now we will compose block streams that perform the necessary actions.
973 auto & query = getSelectQuery();
974 const Settings & settings = context->getSettingsRef();
975 auto & expressions = analysis_result;
976
977 if (options.only_analyze)
978 {
979 if constexpr (pipeline_with_processors)
980 pipeline.init(Pipe(std::make_shared<NullSource>(source_header)));
981 else
982 pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
983
984 if (expressions.prewhere_info)
985 {
986 if constexpr (pipeline_with_processors)
987 pipeline.addSimpleTransform([&](const Block & header)
988 {
989 return std::make_shared<FilterTransform>(
990 header,
991 expressions.prewhere_info->prewhere_actions,
992 expressions.prewhere_info->prewhere_column_name,
993 expressions.prewhere_info->remove_prewhere_column);
994 });
995 else
996 pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
997 pipeline.streams.back(), expressions.prewhere_info->prewhere_actions,
998 expressions.prewhere_info->prewhere_column_name, expressions.prewhere_info->remove_prewhere_column);
999
1000 // To remove additional columns in dry run
1001 // For example, sample column which can be removed in this stage
1002 if (expressions.prewhere_info->remove_columns_actions)
1003 {
1004 if constexpr (pipeline_with_processors)
1005 {
1006 pipeline.addSimpleTransform([&](const Block & header)
1007 {
1008 return std::make_shared<ExpressionTransform>(header, expressions.prewhere_info->remove_columns_actions);
1009 });
1010 }
1011 else
1012 pipeline.streams.back() = std::make_shared<ExpressionBlockInputStream>(pipeline.streams.back(), expressions.prewhere_info->remove_columns_actions);
1013 }
1014 }
1015 }
1016 else
1017 {
1018 if (prepared_input)
1019 {
1020 if constexpr (pipeline_with_processors)
1021 pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
1022 else
1023 pipeline.streams.push_back(prepared_input);
1024 }
1025
1026 if (from_stage == QueryProcessingStage::WithMergeableState &&
1027 options.to_stage == QueryProcessingStage::WithMergeableState)
1028 throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
1029
1030 if (storage && expressions.filter_info && expressions.prewhere_info)
1031 throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
1032
1033 /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
1034 executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere, save_context_and_storage);
1035
1036 LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(options.to_stage));
1037 }
1038
1039 if (options.to_stage > QueryProcessingStage::FetchColumns)
1040 {
1041 /// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
1042 bool aggregate_overflow_row =
1043 expressions.need_aggregate &&
1044 query.group_by_with_totals &&
1045 settings.max_rows_to_group_by &&
1046 settings.group_by_overflow_mode == OverflowMode::ANY &&
1047 settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
1048
1049 /// Do I need to immediately finalize the aggregate functions after the aggregation?
1050 bool aggregate_final =
1051 expressions.need_aggregate &&
1052 options.to_stage > QueryProcessingStage::WithMergeableState &&
1053 !query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
1054
1055 if (expressions.first_stage)
1056 {
1057 if (expressions.filter_info)
1058 {
1059 if constexpr (pipeline_with_processors)
1060 {
1061 pipeline.addSimpleTransform([&](const Block & block, QueryPipeline::StreamType stream_type) -> ProcessorPtr
1062 {
1063 if (stream_type == QueryPipeline::StreamType::Totals)
1064 return nullptr;
1065
1066 return std::make_shared<FilterTransform>(
1067 block,
1068 expressions.filter_info->actions,
1069 expressions.filter_info->column_name,
1070 expressions.filter_info->do_remove_column);
1071 });
1072 }
1073 else
1074 {
1075 pipeline.transform([&](auto & stream)
1076 {
1077 stream = std::make_shared<FilterBlockInputStream>(
1078 stream,
1079 expressions.filter_info->actions,
1080 expressions.filter_info->column_name,
1081 expressions.filter_info->do_remove_column);
1082 });
1083 }
1084 }
1085
1086 if (expressions.hasJoin())
1087 {
1088 Block header_before_join;
1089
1090 if constexpr (pipeline_with_processors)
1091 {
1092 header_before_join = pipeline.getHeader();
1093
1094 /// In case joined subquery has totals, and we don't, add default chunk to totals.
1095 bool default_totals = false;
1096 if (!pipeline.hasTotals())
1097 {
1098 pipeline.addDefaultTotals();
1099 default_totals = true;
1100 }
1101
1102 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType type)
1103 {
1104 bool on_totals = type == QueryPipeline::StreamType::Totals;
1105 return std::make_shared<ExpressionTransform>(header, expressions.before_join, on_totals, default_totals);
1106 });
1107 }
1108 else
1109 {
1110 header_before_join = pipeline.firstStream()->getHeader();
1111 /// Applies to all sources except stream_with_non_joined_data.
1112 for (auto & stream : pipeline.streams)
1113 stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
1114
1115 if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimizations)
1116 {
1117 if (size_t rows_in_block = settings.partial_merge_join_rows_in_left_blocks)
1118 for (auto & stream : pipeline.streams)
1119 stream = std::make_shared<SquashingBlockInputStream>(stream, rows_in_block, 0, true);
1120 }
1121 }
1122
1123 if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
1124 {
1125 Block join_result_sample = ExpressionBlockInputStream(
1126 std::make_shared<OneBlockInputStream>(header_before_join), expressions.before_join).getHeader();
1127
1128 if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
1129 {
1130 if constexpr (pipeline_with_processors)
1131 {
1132 auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
1133 pipeline.addDelayedStream(source);
1134 }
1135 else
1136 pipeline.stream_with_non_joined_data = std::move(stream);
1137 }
1138 }
1139 }
1140
1141 if (expressions.has_where)
1142 executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
1143
1144 if (expressions.need_aggregate)
1145 executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
1146 else
1147 {
1148 executeExpression(pipeline, expressions.before_order_and_select);
1149 executeDistinct(pipeline, true, expressions.selected_columns);
1150 }
1151
1152 /** For distributed query processing,
1153 * if no GROUP, HAVING set,
1154 * but there is an ORDER or LIMIT,
1155 * then we will perform the preliminary sorting and LIMIT on the remote server.
1156 */
1157 if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having)
1158 {
1159 if (expressions.has_order_by)
1160 executeOrder(pipeline, query_info.input_sorting_info);
1161
1162 if (expressions.has_order_by && query.limitLength())
1163 executeDistinct(pipeline, false, expressions.selected_columns);
1164
1165 if (expressions.has_limit_by)
1166 {
1167 executeExpression(pipeline, expressions.before_limit_by);
1168 executeLimitBy(pipeline);
1169 }
1170
1171 if (query.limitLength())
1172 executePreLimit(pipeline);
1173 }
1174
1175 // If there is no global subqueries, we can run subqueries only when receive them on server.
1176 if (!query_analyzer->hasGlobalSubqueries() && !expressions.subqueries_for_sets.empty())
1177 executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
1178 }
1179
1180 if (expressions.second_stage)
1181 {
1182 bool need_second_distinct_pass = false;
1183 bool need_merge_streams = false;
1184
1185 if (expressions.need_aggregate)
1186 {
1187 /// If you need to combine aggregated results from multiple servers
1188 if (!expressions.first_stage)
1189 executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);
1190
1191 if (!aggregate_final)
1192 {
1193 if (query.group_by_with_totals)
1194 {
1195 bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
1196 executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, final);
1197 }
1198
1199 if (query.group_by_with_rollup)
1200 executeRollupOrCube(pipeline, Modificator::ROLLUP);
1201 else if (query.group_by_with_cube)
1202 executeRollupOrCube(pipeline, Modificator::CUBE);
1203
1204 if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.has_having)
1205 {
1206 if (query.group_by_with_totals)
1207 throw Exception("WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
1208 executeHaving(pipeline, expressions.before_having);
1209 }
1210 }
1211 else if (expressions.has_having)
1212 executeHaving(pipeline, expressions.before_having);
1213
1214 executeExpression(pipeline, expressions.before_order_and_select);
1215 executeDistinct(pipeline, true, expressions.selected_columns);
1216
1217 }
1218 else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube)
1219 throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::LOGICAL_ERROR);
1220
1221 need_second_distinct_pass = query.distinct && pipeline.hasMixedStreams();
1222
1223 if (expressions.has_order_by)
1224 {
1225 /** If there is an ORDER BY for distributed query processing,
1226 * but there is no aggregation, then on the remote servers ORDER BY was made
1227 * - therefore, we merge the sorted streams from remote servers.
1228 */
1229
1230 if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
1231 executeMergeSorted(pipeline);
1232 else /// Otherwise, just sort.
1233 executeOrder(pipeline, query_info.input_sorting_info);
1234 }
1235
1236 /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
1237 * limiting the number of rows in each up to `offset + limit`.
1238 */
1239 if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
1240 {
1241 executePreLimit(pipeline);
1242 }
1243
1244 if (need_second_distinct_pass
1245 || query.limitLength()
1246 || query.limitBy()
1247 || pipeline.hasDelayedStream())
1248 {
1249 need_merge_streams = true;
1250 }
1251
1252 if (need_merge_streams)
1253 {
1254 if constexpr (pipeline_with_processors)
1255 pipeline.resize(1);
1256 else
1257 executeUnion(pipeline, {});
1258 }
1259
1260 /** If there was more than one stream,
1261 * then DISTINCT needs to be performed once again after merging all streams.
1262 */
1263 if (need_second_distinct_pass)
1264 executeDistinct(pipeline, false, expressions.selected_columns);
1265
1266 if (expressions.has_limit_by)
1267 {
1268 executeExpression(pipeline, expressions.before_limit_by);
1269 executeLimitBy(pipeline);
1270 }
1271
1272 executeWithFill(pipeline);
1273
1274 /** We must do projection after DISTINCT because projection may remove some columns.
1275 */
1276 executeProjection(pipeline, expressions.final_projection);
1277
1278 /** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
1279 */
1280 executeExtremes(pipeline);
1281
1282 executeLimit(pipeline);
1283 }
1284 }
1285
1286 if (query_analyzer->hasGlobalSubqueries() && !expressions.subqueries_for_sets.empty())
1287 executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
1288}
1289
1290template <typename TPipeline>
1291void InterpreterSelectQuery::executeFetchColumns(
1292 QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
1293 const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere,
1294 QueryPipeline & save_context_and_storage)
1295{
1296 constexpr bool pipeline_with_processors = std::is_same<TPipeline, QueryPipeline>::value;
1297
1298 auto & query = getSelectQuery();
1299 const Settings & settings = context->getSettingsRef();
1300
1301 /// Optimization for trivial query like SELECT count() FROM table.
1302 auto check_trivial_count_query = [&]() -> std::optional<AggregateDescription>
1303 {
1304 if (!settings.optimize_trivial_count_query || !syntax_analyzer_result->maybe_optimize_trivial_count || !storage
1305 || query.sample_size() || query.sample_offset() || query.final() || query.prewhere() || query.where()
1306 || !query_analyzer->hasAggregation() || processing_stage != QueryProcessingStage::FetchColumns)
1307 return {};
1308
1309 Names key_names;
1310 AggregateDescriptions aggregates;
1311 query_analyzer->getAggregateInfo(key_names, aggregates);
1312
1313 if (aggregates.size() != 1)
1314 return {};
1315
1316 const AggregateDescription & desc = aggregates[0];
1317 if (typeid_cast<AggregateFunctionCount *>(desc.function.get()))
1318 return desc;
1319
1320 return {};
1321 };
1322
1323 if (auto desc = check_trivial_count_query())
1324 {
1325 auto func = desc->function;
1326 std::optional<UInt64> num_rows = storage->totalRows();
1327 if (num_rows)
1328 {
1329 AggregateFunctionCount & agg_count = static_cast<AggregateFunctionCount &>(*func);
1330
1331 /// We will process it up to "WithMergeableState".
1332 std::vector<char> state(agg_count.sizeOfData());
1333 AggregateDataPtr place = state.data();
1334
1335 agg_count.create(place);
1336 SCOPE_EXIT(agg_count.destroy(place));
1337
1338 agg_count.set(place, *num_rows);
1339
1340 auto column = ColumnAggregateFunction::create(func);
1341 column->insertFrom(place);
1342
1343 auto header = analysis_result.before_aggregation->getSampleBlock();
1344 size_t arguments_size = desc->argument_names.size();
1345 DataTypes argument_types(arguments_size);
1346 for (size_t j = 0; j < arguments_size; ++j)
1347 argument_types[j] = header.getByName(desc->argument_names[j]).type;
1348
1349 Block block_with_count{
1350 {std::move(column), std::make_shared<DataTypeAggregateFunction>(func, argument_types, desc->parameters), desc->column_name}};
1351
1352 auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
1353 if constexpr (pipeline_with_processors)
1354 pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(istream)));
1355 else
1356 pipeline.streams.emplace_back(istream);
1357 from_stage = QueryProcessingStage::WithMergeableState;
1358 analysis_result.first_stage = false;
1359 return;
1360 }
1361 }
1362
1363 /// Actions to calculate ALIAS if required.
1364 ExpressionActionsPtr alias_actions;
1365
1366 if (storage)
1367 {
1368 /// Append columns from the table filter to required
1369 auto row_policy_filter = context->getRowPolicy()->getCondition(storage->getDatabaseName(), storage->getTableName(), RowPolicy::SELECT_FILTER);
1370 if (row_policy_filter)
1371 {
1372 auto initial_required_columns = required_columns;
1373 ExpressionActionsPtr actions;
1374 generateFilterActions(actions, *context, storage, row_policy_filter, initial_required_columns);
1375 auto required_columns_from_filter = actions->getRequiredColumns();
1376
1377 for (const auto & column : required_columns_from_filter)
1378 {
1379 if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
1380 required_columns.push_back(column);
1381 }
1382 }
1383
1384 /// Detect, if ALIAS columns are required for query execution
1385 auto alias_columns_required = false;
1386 const ColumnsDescription & storage_columns = storage->getColumns();
1387 for (const auto & column_name : required_columns)
1388 {
1389 auto column_default = storage_columns.getDefault(column_name);
1390 if (column_default && column_default->kind == ColumnDefaultKind::Alias)
1391 {
1392 alias_columns_required = true;
1393 break;
1394 }
1395 }
1396
1397 /// There are multiple sources of required columns:
1398 /// - raw required columns,
1399 /// - columns deduced from ALIAS columns,
1400 /// - raw required columns from PREWHERE,
1401 /// - columns deduced from ALIAS columns from PREWHERE.
1402 /// PREWHERE is a special case, since we need to resolve it and pass directly to `IStorage::read()`
1403 /// before any other executions.
1404 if (alias_columns_required)
1405 {
1406 NameSet required_columns_from_prewhere; /// Set of all (including ALIAS) required columns for PREWHERE
1407 NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE
1408
1409 if (prewhere_info)
1410 {
1411 /// Get some columns directly from PREWHERE expression actions
1412 auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns();
1413 required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
1414 }
1415
1416 /// Expression, that contains all raw required columns
1417 ASTPtr required_columns_all_expr = std::make_shared<ASTExpressionList>();
1418
1419 /// Expression, that contains raw required columns for PREWHERE
1420 ASTPtr required_columns_from_prewhere_expr = std::make_shared<ASTExpressionList>();
1421
1422 /// Sort out already known required columns between expressions,
1423 /// also populate `required_aliases_from_prewhere`.
1424 for (const auto & column : required_columns)
1425 {
1426 ASTPtr column_expr;
1427 const auto column_default = storage_columns.getDefault(column);
1428 bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
1429 if (is_alias)
1430 column_expr = setAlias(column_default->expression->clone(), column);
1431 else
1432 column_expr = std::make_shared<ASTIdentifier>(column);
1433
1434 if (required_columns_from_prewhere.count(column))
1435 {
1436 required_columns_from_prewhere_expr->children.emplace_back(std::move(column_expr));
1437
1438 if (is_alias)
1439 required_aliases_from_prewhere.insert(column);
1440 }
1441 else
1442 required_columns_all_expr->children.emplace_back(std::move(column_expr));
1443 }
1444
1445 /// Columns, which we will get after prewhere and filter executions.
1446 NamesAndTypesList required_columns_after_prewhere;
1447 NameSet required_columns_after_prewhere_set;
1448
1449 /// Collect required columns from prewhere expression actions.
1450 if (prewhere_info)
1451 {
1452 NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
1453 Block prewhere_actions_result = prewhere_info->prewhere_actions->getSampleBlock();
1454
1455 /// Populate required columns with the columns, added by PREWHERE actions and not removed afterwards.
1456 /// XXX: looks hacky that we already know which columns after PREWHERE we won't need for sure.
1457 for (const auto & column : prewhere_actions_result)
1458 {
1459 if (prewhere_info->remove_prewhere_column && column.name == prewhere_info->prewhere_column_name)
1460 continue;
1461
1462 if (columns_to_remove.count(column.name))
1463 continue;
1464
1465 required_columns_all_expr->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
1466 required_columns_after_prewhere.emplace_back(column.name, column.type);
1467 }
1468
1469 required_columns_after_prewhere_set
1470 = ext::map<NameSet>(required_columns_after_prewhere, [](const auto & it) { return it.name; });
1471 }
1472
1473 auto syntax_result = SyntaxAnalyzer(*context).analyze(required_columns_all_expr, required_columns_after_prewhere, {}, storage);
1474 alias_actions = ExpressionAnalyzer(required_columns_all_expr, syntax_result, *context).getActions(true);
1475
1476 /// The set of required columns could be added as a result of adding an action to calculate ALIAS.
1477 required_columns = alias_actions->getRequiredColumns();
1478
1479 /// Do not remove prewhere filter if it is a column which is used as alias.
1480 if (prewhere_info && prewhere_info->remove_prewhere_column)
1481 if (required_columns.end()
1482 != std::find(required_columns.begin(), required_columns.end(), prewhere_info->prewhere_column_name))
1483 prewhere_info->remove_prewhere_column = false;
1484
1485 /// Remove columns which will be added by prewhere.
1486 required_columns.erase(std::remove_if(required_columns.begin(), required_columns.end(), [&](const String & name)
1487 {
1488 return !!required_columns_after_prewhere_set.count(name);
1489 }), required_columns.end());
1490
1491 if (prewhere_info)
1492 {
1493 /// Don't remove columns which are needed to be aliased.
1494 auto new_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->getRequiredColumnsWithTypes(), *context);
1495 for (const auto & action : prewhere_info->prewhere_actions->getActions())
1496 {
1497 if (action.type != ExpressionAction::REMOVE_COLUMN
1498 || required_columns.end() == std::find(required_columns.begin(), required_columns.end(), action.source_name))
1499 new_actions->add(action);
1500 }
1501 prewhere_info->prewhere_actions = std::move(new_actions);
1502
1503 auto analyzed_result
1504 = SyntaxAnalyzer(*context).analyze(required_columns_from_prewhere_expr, storage->getColumns().getAllPhysical());
1505 prewhere_info->alias_actions
1506 = ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, *context).getActions(true, false);
1507
1508 /// Add (physical?) columns required by alias actions.
1509 auto required_columns_from_alias = prewhere_info->alias_actions->getRequiredColumns();
1510 Block prewhere_actions_result = prewhere_info->prewhere_actions->getSampleBlock();
1511 for (auto & column : required_columns_from_alias)
1512 if (!prewhere_actions_result.has(column))
1513 if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
1514 required_columns.push_back(column);
1515
1516 /// Add physical columns required by prewhere actions.
1517 for (const auto & column : required_columns_from_prewhere)
1518 if (required_aliases_from_prewhere.count(column) == 0)
1519 if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
1520 required_columns.push_back(column);
1521 }
1522 }
1523 }
1524
1525 /// Limitation on the number of columns to read.
1526 /// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
1527 if (!options.only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
1528 throw Exception("Limit for number of columns to read exceeded. "
1529 "Requested: " + toString(required_columns.size())
1530 + ", maximum: " + settings.max_columns_to_read.toString(),
1531 ErrorCodes::TOO_MANY_COLUMNS);
1532
1533 /** With distributed query processing, almost no computations are done in the threads,
1534 * but wait and receive data from remote servers.
1535 * If we have 20 remote servers, and max_threads = 8, then it would not be very good
1536 * connect and ask only 8 servers at a time.
1537 * To simultaneously query more remote servers,
1538 * instead of max_threads, max_distributed_connections is used.
1539 */
1540 bool is_remote = false;
1541 if (storage && storage->isRemote())
1542 {
1543 is_remote = true;
1544 max_streams = settings.max_distributed_connections;
1545 }
1546
1547 UInt64 max_block_size = settings.max_block_size;
1548
1549 auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context);
1550
1551 /** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY, WITH TIES but LIMIT is specified, and limit + offset < max_block_size,
1552 * then as the block size we will use limit + offset (not to read more from the table than requested),
1553 * and also set the number of threads to 1.
1554 */
1555 if (!query.distinct
1556 && !query.limit_with_ties
1557 && !query.prewhere()
1558 && !query.where()
1559 && !query.groupBy()
1560 && !query.having()
1561 && !query.orderBy()
1562 && !query.limitBy()
1563 && query.limitLength()
1564 && !query_analyzer->hasAggregation()
1565 && limit_length + limit_offset < max_block_size)
1566 {
1567 max_block_size = std::max(UInt64(1), limit_length + limit_offset);
1568 max_streams = 1;
1569 }
1570
1571 if (!max_block_size)
1572 throw Exception("Setting 'max_block_size' cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
1573
1574 /// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
1575 if (pipeline.initialized())
1576 {
1577 /// Prepared input.
1578 }
1579 else if (interpreter_subquery)
1580 {
1581 /// Subquery.
1582 /// If we need less number of columns that subquery have - update the interpreter.
1583 if (required_columns.size() < source_header.columns())
1584 {
1585 ASTPtr subquery = extractTableExpression(query, 0);
1586 if (!subquery)
1587 throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
1588
1589 interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
1590 subquery, getSubqueryContext(*context),
1591 options.copy().subquery().noModify(), required_columns);
1592
1593 if (query_analyzer->hasAggregation())
1594 interpreter_subquery->ignoreWithTotals();
1595 }
1596
1597 if constexpr (pipeline_with_processors)
1598 /// Just use pipeline from subquery.
1599 pipeline = interpreter_subquery->executeWithProcessors();
1600 else
1601 pipeline.streams = interpreter_subquery->executeWithMultipleStreams(save_context_and_storage);
1602 }
1603 else if (storage)
1604 {
1605 /// Table.
1606
1607 if (max_streams == 0)
1608 throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR);
1609
1610 /// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
1611 if (max_streams > 1 && !is_remote)
1612 max_streams *= settings.max_streams_to_max_threads_ratio;
1613
1614 query_info.query = query_ptr;
1615 query_info.syntax_analyzer_result = syntax_analyzer_result;
1616 query_info.sets = query_analyzer->getPreparedSets();
1617 query_info.prewhere_info = prewhere_info;
1618
1619 /// Create optimizer with prepared actions.
1620 /// Maybe we will need to calc input_sorting_info later, e.g. while reading from StorageMerge.
1621 if (analysis_result.optimize_read_in_order)
1622 {
1623 query_info.order_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
1624 query_analyzer->getOrderByActions(),
1625 getSortDescription(query, *context),
1626 query_info.syntax_analyzer_result);
1627
1628 query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
1629 }
1630
1631
1632 BlockInputStreams streams;
1633 Pipes pipes;
1634
1635 /// Will work with pipes directly if storage support processors.
1636 /// Code is temporarily copy-pasted while moving to new pipeline.
1637 bool use_pipes = pipeline_with_processors && storage->supportProcessorsPipeline();
1638
1639 if (use_pipes)
1640 pipes = storage->readWithProcessors(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
1641 else
1642 streams = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
1643
1644 if (streams.empty() && !use_pipes)
1645 {
1646 streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
1647
1648 if (query_info.prewhere_info)
1649 {
1650 if (query_info.prewhere_info->alias_actions)
1651 {
1652 streams.back() = std::make_shared<ExpressionBlockInputStream>(
1653 streams.back(),
1654 query_info.prewhere_info->alias_actions);
1655 }
1656
1657 streams.back() = std::make_shared<FilterBlockInputStream>(
1658 streams.back(),
1659 prewhere_info->prewhere_actions,
1660 prewhere_info->prewhere_column_name,
1661 prewhere_info->remove_prewhere_column);
1662
1663 // To remove additional columns
1664 // In some cases, we did not read any marks so that the pipeline.streams is empty
1665 // Thus, some columns in prewhere are not removed as expected
1666 // This leads to mismatched header in distributed table
1667 if (query_info.prewhere_info->remove_columns_actions)
1668 {
1669 streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions);
1670 }
1671 }
1672 }
1673
1674 /// Copy-paste from prev if.
1675 if (pipes.empty() && use_pipes)
1676 {
1677 Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
1678
1679 if (query_info.prewhere_info)
1680 {
1681 if (query_info.prewhere_info->alias_actions)
1682 pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
1683 pipe.getHeader(), query_info.prewhere_info->alias_actions));
1684
1685 pipe.addSimpleTransform(std::make_shared<FilterTransform>(
1686 pipe.getHeader(),
1687 prewhere_info->prewhere_actions,
1688 prewhere_info->prewhere_column_name,
1689 prewhere_info->remove_prewhere_column));
1690
1691 if (query_info.prewhere_info->remove_columns_actions)
1692 pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
1693 }
1694
1695 pipes.emplace_back(std::move(pipe));
1696 }
1697
1698 for (auto & stream : streams)
1699 stream->addTableLock(table_lock);
1700
1701 if constexpr (pipeline_with_processors)
1702 {
1703 /// Table lock is stored inside pipeline here.
1704 if (use_pipes)
1705 pipeline.addTableLock(table_lock);
1706 }
1707
1708 /// Set the limits and quota for reading data, the speed and time of the query.
1709 {
1710 IBlockInputStream::LocalLimits limits;
1711 limits.mode = IBlockInputStream::LIMITS_TOTAL;
1712 limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
1713 limits.speed_limits.max_execution_time = settings.max_execution_time;
1714 limits.timeout_overflow_mode = settings.timeout_overflow_mode;
1715
1716 /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
1717 * because the initiating server has a summary of the execution of the request on all servers.
1718 *
1719 * But limits on data size to read and maximum execution time are reasonable to check both on initiator and
1720 * additionally on each remote server, because these limits are checked per block of data processed,
1721 * and remote servers may process way more blocks of data than are received by initiator.
1722 */
1723 if (options.to_stage == QueryProcessingStage::Complete)
1724 {
1725 limits.speed_limits.min_execution_rps = settings.min_execution_speed;
1726 limits.speed_limits.max_execution_rps = settings.max_execution_speed;
1727 limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
1728 limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
1729 limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
1730 }
1731
1732 auto quota = context->getQuota();
1733
1734 for (auto & stream : streams)
1735 {
1736 if (!options.ignore_limits)
1737 stream->setLimits(limits);
1738
1739 if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
1740 stream->setQuota(quota);
1741 }
1742
1743 /// Copy-paste
1744 for (auto & pipe : pipes)
1745 {
1746 if (!options.ignore_limits)
1747 pipe.setLimits(limits);
1748
1749 if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
1750 pipe.setQuota(quota);
1751 }
1752 }
1753
1754 if constexpr (pipeline_with_processors)
1755 {
1756 if (streams.size() == 1 || pipes.size() == 1)
1757 pipeline.setMaxThreads(streams.size());
1758
1759 /// Unify streams. They must have same headers.
1760 if (streams.size() > 1)
1761 {
1762 /// Unify streams in case they have different headers.
1763 auto first_header = streams.at(0)->getHeader();
1764
1765 if (first_header.columns() > 1 && first_header.has("_dummy"))
1766 first_header.erase("_dummy");
1767
1768 for (auto & stream : streams)
1769 {
1770 auto header = stream->getHeader();
1771 auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
1772 if (!blocksHaveEqualStructure(first_header, header))
1773 stream = std::make_shared<ConvertingBlockInputStream>(*context, stream, first_header, mode);
1774 }
1775 }
1776
1777 for (auto & stream : streams)
1778 {
1779 bool force_add_agg_info = processing_stage == QueryProcessingStage::WithMergeableState;
1780 auto source = std::make_shared<SourceFromInputStream>(stream, force_add_agg_info);
1781
1782 if (processing_stage == QueryProcessingStage::Complete)
1783 source->addTotalsPort();
1784
1785 pipes.emplace_back(std::move(source));
1786 }
1787
1788 /// Pin sources for merge tree tables.
1789// bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
1790// if (pin_sources)
1791// {
1792// for (size_t i = 0; i < pipes.size(); ++i)
1793// pipes[i].pinSources(i);
1794// }
1795
1796 pipeline.init(std::move(pipes));
1797 }
1798 else
1799 pipeline.streams = std::move(streams);
1800 }
1801 else
1802 throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
1803
1804 /// Aliases in table declaration.
1805 if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions)
1806 {
1807 if constexpr (pipeline_with_processors)
1808 {
1809 pipeline.addSimpleTransform([&](const Block & header)
1810 {
1811 return std::make_shared<ExpressionTransform>(header, alias_actions);
1812 });
1813 }
1814 else
1815 {
1816 pipeline.transform([&](auto & stream)
1817 {
1818 stream = std::make_shared<ExpressionBlockInputStream>(stream, alias_actions);
1819 });
1820 }
1821 }
1822}
1823
1824
1825void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter)
1826{
1827 pipeline.transform([&](auto & stream)
1828 {
1829 stream = std::make_shared<FilterBlockInputStream>(stream, expression, getSelectQuery().where()->getColumnName(), remove_fiter);
1830 });
1831}
1832
1833void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter)
1834{
1835 pipeline.addSimpleTransform([&](const Block & block)
1836 {
1837 return std::make_shared<FilterTransform>(block, expression, getSelectQuery().where()->getColumnName(), remove_fiter);
1838 });
1839}
1840
1841void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
1842{
1843 pipeline.transform([&](auto & stream)
1844 {
1845 stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
1846 });
1847
1848 Names key_names;
1849 AggregateDescriptions aggregates;
1850 query_analyzer->getAggregateInfo(key_names, aggregates);
1851
1852 Block header = pipeline.firstStream()->getHeader();
1853 ColumnNumbers keys;
1854 for (const auto & name : key_names)
1855 keys.push_back(header.getPositionByName(name));
1856 for (auto & descr : aggregates)
1857 if (descr.arguments.empty())
1858 for (const auto & name : descr.argument_names)
1859 descr.arguments.push_back(header.getPositionByName(name));
1860
1861 const Settings & settings = context->getSettingsRef();
1862
1863 /** Two-level aggregation is useful in two cases:
1864 * 1. Parallel aggregation is done, and the results should be merged in parallel.
1865 * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
1866 */
1867 bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0;
1868
1869 Aggregator::Params params(header, keys, aggregates,
1870 overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
1871 allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
1872 allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
1873 settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
1874 context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
1875
1876 /// If there are several sources, then we perform parallel aggregation
1877 if (pipeline.streams.size() > 1)
1878 {
1879 pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
1880 pipeline.streams, pipeline.stream_with_non_joined_data, params, final,
1881 max_streams,
1882 settings.aggregation_memory_efficient_merge_threads
1883 ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
1884 : static_cast<size_t>(settings.max_threads));
1885
1886 pipeline.stream_with_non_joined_data = nullptr;
1887 pipeline.streams.resize(1);
1888 }
1889 else
1890 {
1891 BlockInputStreams inputs;
1892 if (!pipeline.streams.empty())
1893 inputs.push_back(pipeline.firstStream());
1894 else
1895 pipeline.streams.resize(1);
1896
1897 if (pipeline.stream_with_non_joined_data)
1898 inputs.push_back(pipeline.stream_with_non_joined_data);
1899
1900 pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(std::make_shared<ConcatBlockInputStream>(inputs), params, final);
1901
1902 pipeline.stream_with_non_joined_data = nullptr;
1903 }
1904}
1905
1906
1907void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
1908{
1909 pipeline.addSimpleTransform([&](const Block & header)
1910 {
1911 return std::make_shared<ExpressionTransform>(header, expression);
1912 });
1913
1914 Names key_names;
1915 AggregateDescriptions aggregates;
1916 query_analyzer->getAggregateInfo(key_names, aggregates);
1917
1918 Block header_before_aggregation = pipeline.getHeader();
1919 ColumnNumbers keys;
1920 for (const auto & name : key_names)
1921 keys.push_back(header_before_aggregation.getPositionByName(name));
1922 for (auto & descr : aggregates)
1923 if (descr.arguments.empty())
1924 for (const auto & name : descr.argument_names)
1925 descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
1926
1927 const Settings & settings = context->getSettingsRef();
1928
1929 /** Two-level aggregation is useful in two cases:
1930 * 1. Parallel aggregation is done, and the results should be merged in parallel.
1931 * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
1932 */
1933 bool allow_to_use_two_level_group_by = pipeline.getNumMainStreams() > 1 || settings.max_bytes_before_external_group_by != 0;
1934
1935 Aggregator::Params params(header_before_aggregation, keys, aggregates,
1936 overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
1937 allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
1938 allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
1939 settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
1940 context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
1941
1942 auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
1943
1944 pipeline.dropTotalsIfHas();
1945
1946 /// If there are several sources, then we perform parallel aggregation
1947 if (pipeline.getNumMainStreams() > 1)
1948 {
1949 /// Add resize transform to uniformly distribute data between aggregating streams.
1950 pipeline.resize(pipeline.getNumMainStreams(), true);
1951
1952 auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumMainStreams());
1953 auto merge_threads = settings.aggregation_memory_efficient_merge_threads
1954 ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
1955 : static_cast<size_t>(settings.max_threads);
1956
1957 size_t counter = 0;
1958 pipeline.addSimpleTransform([&](const Block & header)
1959 {
1960 return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, max_streams, merge_threads);
1961 });
1962
1963 pipeline.resize(1);
1964 }
1965 else
1966 {
1967 pipeline.resize(1);
1968
1969 pipeline.addSimpleTransform([&](const Block & header)
1970 {
1971 return std::make_shared<AggregatingTransform>(header, transform_params);
1972 });
1973 }
1974}
1975
1976
1977void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final)
1978{
1979 Names key_names;
1980 AggregateDescriptions aggregates;
1981 query_analyzer->getAggregateInfo(key_names, aggregates);
1982
1983 Block header = pipeline.firstStream()->getHeader();
1984
1985 ColumnNumbers keys;
1986 for (const auto & name : key_names)
1987 keys.push_back(header.getPositionByName(name));
1988
1989 /** There are two modes of distributed aggregation.
1990 *
1991 * 1. In different threads read from the remote servers blocks.
1992 * Save all the blocks in the RAM. Merge blocks.
1993 * If the aggregation is two-level - parallelize to the number of buckets.
1994 *
1995 * 2. In one thread, read blocks from different servers in order.
1996 * RAM stores only one block from each server.
1997 * If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level.
1998 *
1999 * The second option consumes less memory (up to 256 times less)
2000 * in the case of two-level aggregation, which is used for large results after GROUP BY,
2001 * but it can work more slowly.
2002 */
2003
2004 const Settings & settings = context->getSettingsRef();
2005
2006 Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_threads);
2007
2008 if (!settings.distributed_aggregation_memory_efficient)
2009 {
2010 /// We union several sources into one, parallelizing the work.
2011 executeUnion(pipeline, {});
2012
2013 /// Now merge the aggregated blocks
2014 pipeline.firstStream() = std::make_shared<MergingAggregatedBlockInputStream>(pipeline.firstStream(), params, final, settings.max_threads);
2015 }
2016 else
2017 {
2018 pipeline.firstStream() = std::make_shared<MergingAggregatedMemoryEfficientBlockInputStream>(pipeline.streams, params, final,
2019 max_streams,
2020 settings.aggregation_memory_efficient_merge_threads
2021 ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
2022 : static_cast<size_t>(settings.max_threads));
2023
2024 pipeline.streams.resize(1);
2025 }
2026}
2027
2028void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final)
2029{
2030 Names key_names;
2031 AggregateDescriptions aggregates;
2032 query_analyzer->getAggregateInfo(key_names, aggregates);
2033
2034 Block header_before_merge = pipeline.getHeader();
2035
2036 ColumnNumbers keys;
2037 for (const auto & name : key_names)
2038 keys.push_back(header_before_merge.getPositionByName(name));
2039
2040 /** There are two modes of distributed aggregation.
2041 *
2042 * 1. In different threads read from the remote servers blocks.
2043 * Save all the blocks in the RAM. Merge blocks.
2044 * If the aggregation is two-level - parallelize to the number of buckets.
2045 *
2046 * 2. In one thread, read blocks from different servers in order.
2047 * RAM stores only one block from each server.
2048 * If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level.
2049 *
2050 * The second option consumes less memory (up to 256 times less)
2051 * in the case of two-level aggregation, which is used for large results after GROUP BY,
2052 * but it can work more slowly.
2053 */
2054
2055 const Settings & settings = context->getSettingsRef();
2056
2057 Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads);
2058
2059 auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
2060
2061 if (!settings.distributed_aggregation_memory_efficient)
2062 {
2063 /// We union several sources into one, parallelizing the work.
2064 pipeline.resize(1);
2065
2066 /// Now merge the aggregated blocks
2067 pipeline.addSimpleTransform([&](const Block & header)
2068 {
2069 return std::make_shared<MergingAggregatedTransform>(header, transform_params, settings.max_threads);
2070 });
2071 }
2072 else
2073 {
2074 /// pipeline.resize(max_streams); - Seem we don't need it.
2075 auto num_merge_threads = settings.aggregation_memory_efficient_merge_threads
2076 ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
2077 : static_cast<size_t>(settings.max_threads);
2078
2079 auto pipe = createMergingAggregatedMemoryEfficientPipe(
2080 pipeline.getHeader(),
2081 transform_params,
2082 pipeline.getNumStreams(),
2083 num_merge_threads);
2084
2085 pipeline.addPipe(std::move(pipe));
2086 }
2087}
2088
2089
2090void InterpreterSelectQuery::executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression)
2091{
2092 pipeline.transform([&](auto & stream)
2093 {
2094 stream = std::make_shared<FilterBlockInputStream>(stream, expression, getSelectQuery().having()->getColumnName());
2095 });
2096}
2097
2098void InterpreterSelectQuery::executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression)
2099{
2100 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2101 {
2102 if (stream_type == QueryPipeline::StreamType::Totals)
2103 return nullptr;
2104
2105 /// TODO: do we need to save filter there?
2106 return std::make_shared<FilterTransform>(header, expression, getSelectQuery().having()->getColumnName(), false);
2107 });
2108}
2109
2110
2111void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
2112{
2113 executeUnion(pipeline, {});
2114
2115 const Settings & settings = context->getSettingsRef();
2116
2117 pipeline.firstStream() = std::make_shared<TotalsHavingBlockInputStream>(
2118 pipeline.firstStream(),
2119 overflow_row,
2120 expression,
2121 has_having ? getSelectQuery().having()->getColumnName() : "",
2122 settings.totals_mode,
2123 settings.totals_auto_threshold,
2124 final);
2125}
2126
2127void InterpreterSelectQuery::executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
2128{
2129 const Settings & settings = context->getSettingsRef();
2130
2131 auto totals_having = std::make_shared<TotalsHavingTransform>(
2132 pipeline.getHeader(), overflow_row, expression,
2133 has_having ? getSelectQuery().having()->getColumnName() : "",
2134 settings.totals_mode, settings.totals_auto_threshold, final);
2135
2136 pipeline.addTotalsHavingTransform(std::move(totals_having));
2137}
2138
2139
2140void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificator modificator)
2141{
2142 executeUnion(pipeline, {});
2143
2144 Names key_names;
2145 AggregateDescriptions aggregates;
2146 query_analyzer->getAggregateInfo(key_names, aggregates);
2147
2148 Block header = pipeline.firstStream()->getHeader();
2149
2150 ColumnNumbers keys;
2151
2152 for (const auto & name : key_names)
2153 keys.push_back(header.getPositionByName(name));
2154
2155 const Settings & settings = context->getSettingsRef();
2156
2157 Aggregator::Params params(header, keys, aggregates,
2158 false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
2159 SettingUInt64(0), SettingUInt64(0),
2160 settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
2161 context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
2162
2163 if (modificator == Modificator::ROLLUP)
2164 pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
2165 else
2166 pipeline.firstStream() = std::make_shared<CubeBlockInputStream>(pipeline.firstStream(), params);
2167}
2168
2169void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator)
2170{
2171 pipeline.resize(1);
2172
2173 Names key_names;
2174 AggregateDescriptions aggregates;
2175 query_analyzer->getAggregateInfo(key_names, aggregates);
2176
2177 Block header_before_transform = pipeline.getHeader();
2178
2179 ColumnNumbers keys;
2180
2181 for (const auto & name : key_names)
2182 keys.push_back(header_before_transform.getPositionByName(name));
2183
2184 const Settings & settings = context->getSettingsRef();
2185
2186 Aggregator::Params params(header_before_transform, keys, aggregates,
2187 false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
2188 SettingUInt64(0), SettingUInt64(0),
2189 settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
2190 context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
2191
2192 auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
2193
2194 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2195 {
2196 if (stream_type == QueryPipeline::StreamType::Totals)
2197 return nullptr;
2198
2199 if (modificator == Modificator::ROLLUP)
2200 return std::make_shared<RollupTransform>(header, std::move(transform_params));
2201 else
2202 return std::make_shared<CubeTransform>(header, std::move(transform_params));
2203 });
2204}
2205
2206
2207void InterpreterSelectQuery::executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression)
2208{
2209 pipeline.transform([&](auto & stream)
2210 {
2211 stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
2212 });
2213}
2214
2215void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression)
2216{
2217 pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
2218 {
2219 return std::make_shared<ExpressionTransform>(header, expression);
2220 });
2221}
2222
2223void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoPtr input_sorting_info)
2224{
2225 auto & query = getSelectQuery();
2226 SortDescription output_order_descr = getSortDescription(query, *context);
2227 const Settings & settings = context->getSettingsRef();
2228 UInt64 limit = getLimitForSorting(query, *context);
2229
2230 if (input_sorting_info)
2231 {
2232 /* Case of sorting with optimization using sorting key.
2233 * We have several threads, each of them reads batch of parts in direct
2234 * or reverse order of sorting key using one input stream per part
2235 * and then merge them into one sorted stream.
2236 * At this stage we merge per-thread streams into one.
2237 * If the input is sorted by some prefix of the sorting key required for output,
2238 * we have to finish sorting after the merge.
2239 */
2240
2241 bool need_finish_sorting = (input_sorting_info->order_key_prefix_descr.size() < output_order_descr.size());
2242
2243 UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
2244 executeMergeSorted(pipeline, input_sorting_info->order_key_prefix_descr, limit_for_merging);
2245
2246 if (need_finish_sorting)
2247 {
2248 pipeline.transform([&](auto & stream)
2249 {
2250 stream = std::make_shared<PartialSortingBlockInputStream>(stream, output_order_descr, limit);
2251 });
2252
2253 pipeline.firstStream() = std::make_shared<FinishSortingBlockInputStream>(
2254 pipeline.firstStream(), input_sorting_info->order_key_prefix_descr,
2255 output_order_descr, settings.max_block_size, limit);
2256 }
2257 }
2258 else
2259 {
2260 pipeline.transform([&](auto & stream)
2261 {
2262 auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, output_order_descr, limit);
2263
2264 /// Limits on sorting
2265 IBlockInputStream::LocalLimits limits;
2266 limits.mode = IBlockInputStream::LIMITS_TOTAL;
2267 limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
2268 sorting_stream->setLimits(limits);
2269
2270 stream = sorting_stream;
2271 });
2272
2273 /// If there are several streams, we merge them into one
2274 executeUnion(pipeline, {});
2275
2276 /// Merge the sorted blocks.
2277 pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
2278 pipeline.firstStream(), output_order_descr, settings.max_block_size, limit,
2279 settings.max_bytes_before_remerge_sort,
2280 settings.max_bytes_before_external_sort, context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
2281 }
2282}
2283
2284void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr input_sorting_info)
2285{
2286 auto & query = getSelectQuery();
2287 SortDescription output_order_descr = getSortDescription(query, *context);
2288 UInt64 limit = getLimitForSorting(query, *context);
2289
2290 const Settings & settings = context->getSettingsRef();
2291
2292 /// TODO: Limits on sorting
2293// IBlockInputStream::LocalLimits limits;
2294// limits.mode = IBlockInputStream::LIMITS_TOTAL;
2295// limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
2296
2297 if (input_sorting_info)
2298 {
2299 /* Case of sorting with optimization using sorting key.
2300 * We have several threads, each of them reads batch of parts in direct
2301 * or reverse order of sorting key using one input stream per part
2302 * and then merge them into one sorted stream.
2303 * At this stage we merge per-thread streams into one.
2304 */
2305
2306 bool need_finish_sorting = (input_sorting_info->order_key_prefix_descr.size() < output_order_descr.size());
2307
2308 if (pipeline.getNumStreams() > 1)
2309 {
2310 UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
2311 auto transform = std::make_shared<MergingSortedTransform>(
2312 pipeline.getHeader(),
2313 pipeline.getNumStreams(),
2314 input_sorting_info->order_key_prefix_descr,
2315 settings.max_block_size, limit_for_merging);
2316
2317 pipeline.addPipe({ std::move(transform) });
2318 }
2319
2320 if (need_finish_sorting)
2321 {
2322 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
2323 {
2324 bool do_count_rows = stream_type == QueryPipeline::StreamType::Main;
2325 return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit, do_count_rows);
2326 });
2327
2328 pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
2329 {
2330 return std::make_shared<FinishSortingTransform>(
2331 header, input_sorting_info->order_key_prefix_descr,
2332 output_order_descr, settings.max_block_size, limit);
2333 });
2334 }
2335
2336 return;
2337 }
2338
2339 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
2340 {
2341 bool do_count_rows = stream_type == QueryPipeline::StreamType::Main;
2342 return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit, do_count_rows);
2343 });
2344
2345 /// If there are several streams, we merge them into one
2346 pipeline.resize(1);
2347
2348 /// Merge the sorted blocks.
2349 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2350 {
2351 if (stream_type == QueryPipeline::StreamType::Totals)
2352 return nullptr;
2353
2354 return std::make_shared<MergeSortingTransform>(
2355 header, output_order_descr, settings.max_block_size, limit,
2356 settings.max_bytes_before_remerge_sort,
2357 settings.max_bytes_before_external_sort, context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
2358 });
2359}
2360
2361
2362void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
2363{
2364 auto & query = getSelectQuery();
2365 SortDescription order_descr = getSortDescription(query, *context);
2366 UInt64 limit = getLimitForSorting(query, *context);
2367
2368 /// If there are several streams, then we merge them into one
2369 if (pipeline.hasMoreThanOneStream())
2370 {
2371 unifyStreams(pipeline, pipeline.firstStream()->getHeader());
2372 executeMergeSorted(pipeline, order_descr, limit);
2373 }
2374}
2375
2376
2377void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit)
2378{
2379 if (pipeline.hasMoreThanOneStream())
2380 {
2381 const Settings & settings = context->getSettingsRef();
2382
2383 /** MergingSortedBlockInputStream reads the sources sequentially.
2384 * To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream.
2385 */
2386 pipeline.transform([&](auto & stream)
2387 {
2388 stream = std::make_shared<AsynchronousBlockInputStream>(stream);
2389 });
2390
2391 pipeline.firstStream() = std::make_shared<MergingSortedBlockInputStream>(
2392 pipeline.streams, sort_description, settings.max_block_size, limit);
2393 pipeline.streams.resize(1);
2394 }
2395}
2396
2397void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline)
2398{
2399 auto & query = getSelectQuery();
2400 SortDescription order_descr = getSortDescription(query, *context);
2401 UInt64 limit = getLimitForSorting(query, *context);
2402
2403 executeMergeSorted(pipeline, order_descr, limit);
2404}
2405
2406void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit)
2407{
2408 /// If there are several streams, then we merge them into one
2409 if (pipeline.getNumStreams() > 1)
2410 {
2411 const Settings & settings = context->getSettingsRef();
2412
2413 auto transform = std::make_shared<MergingSortedTransform>(
2414 pipeline.getHeader(),
2415 pipeline.getNumStreams(),
2416 sort_description,
2417 settings.max_block_size, limit);
2418
2419 pipeline.addPipe({ std::move(transform) });
2420 }
2421}
2422
2423
2424void InterpreterSelectQuery::executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression)
2425{
2426 pipeline.transform([&](auto & stream)
2427 {
2428 stream = std::make_shared<ExpressionBlockInputStream>(stream, expression);
2429 });
2430}
2431
2432void InterpreterSelectQuery::executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression)
2433{
2434 pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
2435 {
2436 return std::make_shared<ExpressionTransform>(header, expression);
2437 });
2438}
2439
2440
2441void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_order, Names columns)
2442{
2443 auto & query = getSelectQuery();
2444 if (query.distinct)
2445 {
2446 const Settings & settings = context->getSettingsRef();
2447
2448 auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context);
2449 UInt64 limit_for_distinct = 0;
2450
2451 /// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
2452 if ((!query.orderBy() || !before_order) && !query.limit_with_ties)
2453 limit_for_distinct = limit_length + limit_offset;
2454
2455 pipeline.transform([&](auto & stream)
2456 {
2457 SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
2458 stream = std::make_shared<DistinctBlockInputStream>(stream, limits, limit_for_distinct, columns);
2459 });
2460 }
2461}
2462
2463void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns)
2464{
2465 auto & query = getSelectQuery();
2466 if (query.distinct)
2467 {
2468 const Settings & settings = context->getSettingsRef();
2469
2470 auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context);
2471 UInt64 limit_for_distinct = 0;
2472
2473 /// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
2474 if (!query.orderBy() || !before_order)
2475 limit_for_distinct = limit_length + limit_offset;
2476
2477 SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
2478
2479 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2480 {
2481 if (stream_type == QueryPipeline::StreamType::Totals)
2482 return nullptr;
2483
2484 return std::make_shared<DistinctTransform>(header, limits, limit_for_distinct, columns);
2485 });
2486 }
2487}
2488
2489
2490void InterpreterSelectQuery::executeUnion(Pipeline & pipeline, Block header)
2491{
2492 /// If there are still several streams, then we combine them into one
2493 if (pipeline.hasMoreThanOneStream())
2494 {
2495 if (!header)
2496 header = pipeline.firstStream()->getHeader();
2497
2498 unifyStreams(pipeline, std::move(header));
2499
2500 pipeline.firstStream() = std::make_shared<UnionBlockInputStream>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
2501 pipeline.stream_with_non_joined_data = nullptr;
2502 pipeline.streams.resize(1);
2503 pipeline.union_stream = true;
2504 }
2505 else if (pipeline.stream_with_non_joined_data)
2506 {
2507 pipeline.streams.push_back(pipeline.stream_with_non_joined_data);
2508 pipeline.stream_with_non_joined_data = nullptr;
2509 }
2510}
2511
2512
2513/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined.
2514void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
2515{
2516 auto & query = getSelectQuery();
2517 /// If there is LIMIT
2518 if (query.limitLength())
2519 {
2520 auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context);
2521 SortDescription sort_descr;
2522 if (query.limit_with_ties)
2523 {
2524 if (!query.orderBy())
2525 throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
2526 sort_descr = getSortDescription(query, *context);
2527 }
2528 pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
2529 {
2530 stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false, false, query.limit_with_ties, sort_descr);
2531 });
2532 }
2533}
2534
2535/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined.
2536void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline)
2537{
2538 auto & query = getSelectQuery();
2539 /// If there is LIMIT
2540 if (query.limitLength())
2541 {
2542 auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context);
2543 pipeline.addSimpleTransform([&, limit = limit_length + limit_offset](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2544 {
2545 if (stream_type == QueryPipeline::StreamType::Totals)
2546 return nullptr;
2547
2548 return std::make_shared<LimitTransform>(header, limit, 0);
2549 });
2550 }
2551}
2552
2553
2554void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline)
2555{
2556 auto & query = getSelectQuery();
2557 if (!query.limitByLength() || !query.limitBy())
2558 return;
2559
2560 Names columns;
2561 for (const auto & elem : query.limitBy()->children)
2562 columns.emplace_back(elem->getColumnName());
2563 UInt64 length = getLimitUIntValue(query.limitByLength(), *context);
2564 UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0);
2565
2566 pipeline.transform([&](auto & stream)
2567 {
2568 stream = std::make_shared<LimitByBlockInputStream>(stream, length, offset, columns);
2569 });
2570}
2571
2572void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline)
2573{
2574 auto & query = getSelectQuery();
2575 if (!query.limitByLength() || !query.limitBy())
2576 return;
2577
2578 Names columns;
2579 for (const auto & elem : query.limitBy()->children)
2580 columns.emplace_back(elem->getColumnName());
2581
2582 UInt64 length = getLimitUIntValue(query.limitByLength(), *context);
2583 UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0);
2584
2585 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2586 {
2587 if (stream_type == QueryPipeline::StreamType::Totals)
2588 return nullptr;
2589
2590 return std::make_shared<LimitByTransform>(header, length, offset, columns);
2591 });
2592}
2593
2594
2595namespace
2596{
2597 bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query)
2598 {
2599 if (query.group_by_with_totals)
2600 return true;
2601
2602 /** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard.
2603 * In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end.
2604 */
2605
2606 if (auto query_table = extractTableExpression(query, 0))
2607 {
2608 if (const auto * ast_union = query_table->as<ASTSelectWithUnionQuery>())
2609 {
2610 for (const auto & elem : ast_union->list_of_selects->children)
2611 if (hasWithTotalsInAnySubqueryInFromClause(elem->as<ASTSelectQuery &>()))
2612 return true;
2613 }
2614 }
2615
2616 return false;
2617 }
2618}
2619
2620void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
2621{
2622 auto & query = getSelectQuery();
2623 /// If there is LIMIT
2624 if (query.limitLength())
2625 {
2626 /** Rare case:
2627 * if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels,
2628 * then when using LIMIT, you should read the data to the end, rather than cancel the query earlier,
2629 * because if you cancel the query, we will not get `totals` data from the remote server.
2630 *
2631 * Another case:
2632 * if there is WITH TOTALS and there is no ORDER BY, then read the data to the end,
2633 * otherwise TOTALS is counted according to incomplete data.
2634 */
2635 bool always_read_till_end = false;
2636
2637 if (query.group_by_with_totals && !query.orderBy())
2638 always_read_till_end = true;
2639
2640 if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
2641 always_read_till_end = true;
2642
2643 SortDescription order_descr;
2644 if (query.limit_with_ties)
2645 {
2646 if (!query.orderBy())
2647 throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
2648 order_descr = getSortDescription(query, *context);
2649 }
2650
2651 UInt64 limit_length;
2652 UInt64 limit_offset;
2653 std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context);
2654
2655 pipeline.transform([&](auto & stream)
2656 {
2657 stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end, false, query.limit_with_ties, order_descr);
2658 });
2659 }
2660}
2661
2662
2663void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline)
2664{
2665 auto & query = getSelectQuery();
2666 if (query.orderBy())
2667 {
2668 SortDescription order_descr = getSortDescription(query, *context);
2669 SortDescription fill_descr;
2670 for (auto & desc : order_descr)
2671 {
2672 if (desc.with_fill)
2673 fill_descr.push_back(desc);
2674 }
2675
2676 if (fill_descr.empty())
2677 return;
2678
2679 pipeline.transform([&](auto & stream)
2680 {
2681 stream = std::make_shared<FillingBlockInputStream>(stream, fill_descr);
2682 });
2683 }
2684}
2685
2686void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline)
2687{
2688 auto & query = getSelectQuery();
2689 if (query.orderBy())
2690 {
2691 SortDescription order_descr = getSortDescription(query, *context);
2692 SortDescription fill_descr;
2693 for (auto & desc : order_descr)
2694 {
2695 if (desc.with_fill)
2696 fill_descr.push_back(desc);
2697 }
2698
2699 if (fill_descr.empty())
2700 return;
2701
2702 pipeline.addSimpleTransform([&](const Block & header)
2703 {
2704 return std::make_shared<FillingTransform>(header, fill_descr);
2705 });
2706 }
2707}
2708
2709
2710void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
2711{
2712 auto & query = getSelectQuery();
2713 /// If there is LIMIT
2714 if (query.limitLength())
2715 {
2716 /** Rare case:
2717 * if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels,
2718 * then when using LIMIT, you should read the data to the end, rather than cancel the query earlier,
2719 * because if you cancel the query, we will not get `totals` data from the remote server.
2720 *
2721 * Another case:
2722 * if there is WITH TOTALS and there is no ORDER BY, then read the data to the end,
2723 * otherwise TOTALS is counted according to incomplete data.
2724 */
2725 bool always_read_till_end = false;
2726
2727 if (query.group_by_with_totals && !query.orderBy())
2728 always_read_till_end = true;
2729
2730 if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
2731 always_read_till_end = true;
2732
2733 UInt64 limit_length;
2734 UInt64 limit_offset;
2735 std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context);
2736
2737 SortDescription order_descr;
2738 if (query.limit_with_ties)
2739 {
2740 if (!query.orderBy())
2741 throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
2742 order_descr = getSortDescription(query, *context);
2743 }
2744
2745 pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
2746 {
2747 if (stream_type != QueryPipeline::StreamType::Main)
2748 return nullptr;
2749
2750 return std::make_shared<LimitTransform>(
2751 header, limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr);
2752 });
2753 }
2754}
2755
2756
2757void InterpreterSelectQuery::executeExtremes(Pipeline & pipeline)
2758{
2759 if (!context->getSettingsRef().extremes)
2760 return;
2761
2762 pipeline.transform([&](auto & stream)
2763 {
2764 stream->enableExtremes();
2765 });
2766}
2767
2768void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
2769{
2770 if (!context->getSettingsRef().extremes)
2771 return;
2772
2773 auto transform = std::make_shared<ExtremesTransform>(pipeline.getHeader());
2774 pipeline.addExtremesTransform(std::move(transform));
2775}
2776
2777
2778void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
2779{
2780 /// Merge streams to one. Use MergeSorting if data was read in sorted order, Union otherwise.
2781 if (query_info.input_sorting_info)
2782 {
2783 if (pipeline.stream_with_non_joined_data)
2784 throw Exception("Using read in order optimization, but has stream with non-joined data in pipeline", ErrorCodes::LOGICAL_ERROR);
2785 executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);
2786 }
2787 else
2788 executeUnion(pipeline, {});
2789
2790 pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
2791 pipeline.firstStream(), subqueries_for_sets, *context);
2792}
2793
2794void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
2795{
2796 if (query_info.input_sorting_info)
2797 {
2798 if (pipeline.hasDelayedStream())
2799 throw Exception("Using read in order optimization, but has delayed stream in pipeline", ErrorCodes::LOGICAL_ERROR);
2800 executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);
2801 }
2802
2803 const Settings & settings = context->getSettingsRef();
2804
2805 auto creating_sets = std::make_shared<CreatingSetsTransform>(
2806 pipeline.getHeader(), subqueries_for_sets,
2807 SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
2808 *context);
2809
2810 pipeline.addCreatingSetsTransform(std::move(creating_sets));
2811}
2812
2813
2814void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline, Block header)
2815{
2816 /// Unify streams in case they have different headers.
2817
2818 /// TODO: remove previos addition of _dummy column.
2819 if (header.columns() > 1 && header.has("_dummy"))
2820 header.erase("_dummy");
2821
2822 for (size_t i = 0; i < pipeline.streams.size(); ++i)
2823 {
2824 auto & stream = pipeline.streams[i];
2825 auto stream_header = stream->getHeader();
2826 auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
2827
2828 if (!blocksHaveEqualStructure(header, stream_header))
2829 stream = std::make_shared<ConvertingBlockInputStream>(*context, stream, header, mode);
2830 }
2831}
2832
2833
2834void InterpreterSelectQuery::ignoreWithTotals()
2835{
2836 getSelectQuery().group_by_with_totals = false;
2837}
2838
2839
2840void InterpreterSelectQuery::initSettings()
2841{
2842 auto & query = getSelectQuery();
2843 if (query.settings())
2844 InterpreterSetQuery(query.settings(), *context).executeForCurrentContext();
2845}
2846
2847}
2848