1#pragma once
2
3#include <memory>
4
5#include <Core/QueryProcessingStage.h>
6#include <Parsers/ASTSelectQuery.h>
7#include <DataStreams/IBlockStream_fwd.h>
8#include <Interpreters/Context.h>
9#include <Interpreters/ExpressionActions.h>
10#include <Interpreters/ExpressionAnalyzer.h>
11#include <Interpreters/IInterpreter.h>
12#include <Interpreters/SelectQueryOptions.h>
13#include <Storages/SelectQueryInfo.h>
14#include <Storages/TableStructureLockHolder.h>
15#include <Storages/ReadInOrderOptimizer.h>
16
17#include <Processors/QueryPipeline.h>
18#include <Columns/FilterDescription.h>
19
20namespace Poco { class Logger; }
21
22namespace DB
23{
24
25struct SubqueryForSet;
26class InterpreterSelectWithUnionQuery;
27
28struct SyntaxAnalyzerResult;
29using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
30
31
32/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
33 */
34class InterpreterSelectQuery : public IInterpreter
35{
36public:
37 /**
38 * query_ptr
39 * - A query AST to interpret.
40 *
41 * required_result_column_names
42 * - don't calculate all columns except the specified ones from the query
43 * - it is used to remove calculation (and reading) of unnecessary columns from subqueries.
44 * empty means - use all columns.
45 */
46
47 InterpreterSelectQuery(
48 const ASTPtr & query_ptr_,
49 const Context & context_,
50 const SelectQueryOptions &,
51 const Names & required_result_column_names_ = Names{});
52
53 /// Read data not from the table specified in the query, but from the prepared source `input`.
54 InterpreterSelectQuery(
55 const ASTPtr & query_ptr_,
56 const Context & context_,
57 const BlockInputStreamPtr & input_,
58 const SelectQueryOptions & = {});
59
60 /// Read data not from the table specified in the query, but from the specified `storage_`.
61 InterpreterSelectQuery(
62 const ASTPtr & query_ptr_,
63 const Context & context_,
64 const StoragePtr & storage_,
65 const SelectQueryOptions & = {});
66
67 ~InterpreterSelectQuery() override;
68
69 /// Execute a query. Get the stream of blocks to read.
70 BlockIO execute() override;
71
72 /// Execute the query and return multuple streams for parallel processing.
73 BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
74
75 QueryPipeline executeWithProcessors() override;
76 bool canExecuteWithProcessors() const override { return true; }
77
78 bool ignoreLimits() const override { return options.ignore_limits; }
79 bool ignoreQuota() const override { return options.ignore_quota; }
80
81 Block getSampleBlock();
82
83 void ignoreWithTotals();
84
85 ASTPtr getQuery() const { return query_ptr; }
86
87private:
88 InterpreterSelectQuery(
89 const ASTPtr & query_ptr_,
90 const Context & context_,
91 const BlockInputStreamPtr & input_,
92 const StoragePtr & storage_,
93 const SelectQueryOptions &,
94 const Names & required_result_column_names = {});
95
96 ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
97
98 Block getSampleBlockImpl();
99
100 struct Pipeline
101 {
102 /** Streams of data.
103 * The source data streams are produced in the executeFetchColumns function.
104 * Then they are converted (wrapped in other streams) using the `execute*` functions,
105 * to get the whole pipeline running the query.
106 */
107 BlockInputStreams streams;
108
109 /** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows.
110 * It has a special meaning, since reading from it should be done after reading from the main streams.
111 * It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
112 */
113 BlockInputStreamPtr stream_with_non_joined_data;
114 bool union_stream = false;
115
116 BlockInputStreamPtr & firstStream() { return streams.at(0); }
117
118 template <typename Transform>
119 void transform(Transform && transformation)
120 {
121 for (auto & stream : streams)
122 transformation(stream);
123
124 if (stream_with_non_joined_data)
125 transformation(stream_with_non_joined_data);
126 }
127
128 bool hasMoreThanOneStream() const
129 {
130 return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
131 }
132
133 /// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken.
134 bool hasMixedStreams() const
135 {
136 return hasMoreThanOneStream() || union_stream;
137 }
138
139 bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
140 bool initialized() const { return !streams.empty(); }
141 };
142
143 template <typename TPipeline>
144 void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, QueryPipeline & save_context_and_storage);
145
146 struct AnalysisResult
147 {
148 bool hasJoin() const { return before_join.get(); }
149 bool has_where = false;
150 bool need_aggregate = false;
151 bool has_having = false;
152 bool has_order_by = false;
153 bool has_limit_by = false;
154
155 bool remove_where_filter = false;
156 bool optimize_read_in_order = false;
157
158 ExpressionActionsPtr before_join; /// including JOIN
159 ExpressionActionsPtr before_where;
160 ExpressionActionsPtr before_aggregation;
161 ExpressionActionsPtr before_having;
162 ExpressionActionsPtr before_order_and_select;
163 ExpressionActionsPtr before_limit_by;
164 ExpressionActionsPtr final_projection;
165
166 /// Columns from the SELECT list, before renaming them to aliases.
167 Names selected_columns;
168
169 /// Columns will be removed after prewhere actions execution.
170 Names columns_to_remove_after_prewhere;
171
172 /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
173 bool first_stage = false;
174 /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
175 bool second_stage = false;
176
177 SubqueriesForSets subqueries_for_sets;
178 PrewhereInfoPtr prewhere_info;
179 FilterInfoPtr filter_info;
180 ConstantFilterDescription prewhere_constant_filter_description;
181 ConstantFilterDescription where_constant_filter_description;
182 };
183
184 static AnalysisResult analyzeExpressions(
185 const ASTSelectQuery & query,
186 SelectQueryExpressionAnalyzer & query_analyzer,
187 QueryProcessingStage::Enum from_stage,
188 QueryProcessingStage::Enum to_stage,
189 const Context & context,
190 const StoragePtr & storage,
191 bool only_types,
192 const FilterInfoPtr & filter_info,
193 const Block & source_header);
194
195 /** From which table to read. With JOIN, the "left" table is returned.
196 */
197 static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context);
198
199 /// Different stages of query execution.
200
201 /// dry_run - don't read from table, use empty header block instead.
202 void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
203
204 template <typename TPipeline>
205 void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
206 const PrewhereInfoPtr & prewhere_info,
207 const Names & columns_to_remove_after_prewhere,
208 QueryPipeline & save_context_and_storage);
209
210 void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
211 void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
212 void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
213 void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
214 void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
215 void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
216 void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info);
217 void executeWithFill(Pipeline & pipeline);
218 void executeMergeSorted(Pipeline & pipeline);
219 void executePreLimit(Pipeline & pipeline);
220 void executeUnion(Pipeline & pipeline, Block header);
221 void executeLimitBy(Pipeline & pipeline);
222 void executeLimit(Pipeline & pipeline);
223 void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
224 void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
225 void executeExtremes(Pipeline & pipeline);
226 void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
227 void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
228
229 void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter);
230 void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
231 void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
232 void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
233 void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
234 void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
235 void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info);
236 void executeWithFill(QueryPipeline & pipeline);
237 void executeMergeSorted(QueryPipeline & pipeline);
238 void executePreLimit(QueryPipeline & pipeline);
239 void executeLimitBy(QueryPipeline & pipeline);
240 void executeLimit(QueryPipeline & pipeline);
241 void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
242 void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns);
243 void executeExtremes(QueryPipeline & pipeline);
244 void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
245 void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
246
247 /// Add ConvertingBlockInputStream to specified header.
248 void unifyStreams(Pipeline & pipeline, Block header);
249
250 enum class Modificator
251 {
252 ROLLUP = 0,
253 CUBE = 1
254 };
255
256 void executeRollupOrCube(Pipeline & pipeline, Modificator modificator);
257
258 void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
259
260 /** If there is a SETTINGS section in the SELECT query, then apply settings from it.
261 *
262 * Section SETTINGS - settings for a specific query.
263 * Normally, the settings can be passed in other ways, not inside the query.
264 * But the use of this section is justified if you need to set the settings for one subquery.
265 */
266 void initSettings();
267
268 SelectQueryOptions options;
269 ASTPtr query_ptr;
270 std::shared_ptr<Context> context;
271 SyntaxAnalyzerResultPtr syntax_analyzer_result;
272 std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer;
273 SelectQueryInfo query_info;
274
275 /// Is calculated in getSampleBlock. Is used later in readImpl.
276 AnalysisResult analysis_result;
277 FilterInfoPtr filter_info;
278
279 QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
280
281 /// How many streams we ask for storage to produce, and in how many threads we will do further processing.
282 size_t max_streams = 1;
283
284 /// List of columns to read to execute the query.
285 Names required_columns;
286 /// Structure of query source (table, subquery, etc).
287 Block source_header;
288 /// Structure of query result.
289 Block result_header;
290
291 /// The subquery interpreter, if the subquery
292 std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery;
293
294 /// Table from where to read data, if not subquery.
295 StoragePtr storage;
296 TableStructureReadLockHolder table_lock;
297
298 /// Used when we read from prepared input, not table or subquery.
299 BlockInputStreamPtr input;
300
301 Poco::Logger * log;
302};
303
304}
305