1 | #pragma once |
2 | |
3 | #include <memory> |
4 | |
5 | #include <Core/QueryProcessingStage.h> |
6 | #include <Parsers/ASTSelectQuery.h> |
7 | #include <DataStreams/IBlockStream_fwd.h> |
8 | #include <Interpreters/Context.h> |
9 | #include <Interpreters/ExpressionActions.h> |
10 | #include <Interpreters/ExpressionAnalyzer.h> |
11 | #include <Interpreters/IInterpreter.h> |
12 | #include <Interpreters/SelectQueryOptions.h> |
13 | #include <Storages/SelectQueryInfo.h> |
14 | #include <Storages/TableStructureLockHolder.h> |
15 | #include <Storages/ReadInOrderOptimizer.h> |
16 | |
17 | #include <Processors/QueryPipeline.h> |
18 | #include <Columns/FilterDescription.h> |
19 | |
20 | namespace Poco { class Logger; } |
21 | |
22 | namespace DB |
23 | { |
24 | |
25 | struct SubqueryForSet; |
26 | class InterpreterSelectWithUnionQuery; |
27 | |
28 | struct SyntaxAnalyzerResult; |
29 | using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; |
30 | |
31 | |
32 | /** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage. |
33 | */ |
34 | class InterpreterSelectQuery : public IInterpreter |
35 | { |
36 | public: |
37 | /** |
38 | * query_ptr |
39 | * - A query AST to interpret. |
40 | * |
41 | * required_result_column_names |
42 | * - don't calculate all columns except the specified ones from the query |
43 | * - it is used to remove calculation (and reading) of unnecessary columns from subqueries. |
44 | * empty means - use all columns. |
45 | */ |
46 | |
47 | InterpreterSelectQuery( |
48 | const ASTPtr & query_ptr_, |
49 | const Context & context_, |
50 | const SelectQueryOptions &, |
51 | const Names & required_result_column_names_ = Names{}); |
52 | |
53 | /// Read data not from the table specified in the query, but from the prepared source `input`. |
54 | InterpreterSelectQuery( |
55 | const ASTPtr & query_ptr_, |
56 | const Context & context_, |
57 | const BlockInputStreamPtr & input_, |
58 | const SelectQueryOptions & = {}); |
59 | |
60 | /// Read data not from the table specified in the query, but from the specified `storage_`. |
61 | InterpreterSelectQuery( |
62 | const ASTPtr & query_ptr_, |
63 | const Context & context_, |
64 | const StoragePtr & storage_, |
65 | const SelectQueryOptions & = {}); |
66 | |
67 | ~InterpreterSelectQuery() override; |
68 | |
69 | /// Execute a query. Get the stream of blocks to read. |
70 | BlockIO execute() override; |
71 | |
72 | /// Execute the query and return multuple streams for parallel processing. |
73 | BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline); |
74 | |
75 | QueryPipeline executeWithProcessors() override; |
76 | bool canExecuteWithProcessors() const override { return true; } |
77 | |
78 | bool ignoreLimits() const override { return options.ignore_limits; } |
79 | bool ignoreQuota() const override { return options.ignore_quota; } |
80 | |
81 | Block getSampleBlock(); |
82 | |
83 | void ignoreWithTotals(); |
84 | |
85 | ASTPtr getQuery() const { return query_ptr; } |
86 | |
87 | private: |
88 | InterpreterSelectQuery( |
89 | const ASTPtr & query_ptr_, |
90 | const Context & context_, |
91 | const BlockInputStreamPtr & input_, |
92 | const StoragePtr & storage_, |
93 | const SelectQueryOptions &, |
94 | const Names & required_result_column_names = {}); |
95 | |
96 | ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); } |
97 | |
98 | Block getSampleBlockImpl(); |
99 | |
100 | struct Pipeline |
101 | { |
102 | /** Streams of data. |
103 | * The source data streams are produced in the executeFetchColumns function. |
104 | * Then they are converted (wrapped in other streams) using the `execute*` functions, |
105 | * to get the whole pipeline running the query. |
106 | */ |
107 | BlockInputStreams streams; |
108 | |
109 | /** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows. |
110 | * It has a special meaning, since reading from it should be done after reading from the main streams. |
111 | * It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream. |
112 | */ |
113 | BlockInputStreamPtr stream_with_non_joined_data; |
114 | bool union_stream = false; |
115 | |
116 | BlockInputStreamPtr & firstStream() { return streams.at(0); } |
117 | |
118 | template <typename Transform> |
119 | void transform(Transform && transformation) |
120 | { |
121 | for (auto & stream : streams) |
122 | transformation(stream); |
123 | |
124 | if (stream_with_non_joined_data) |
125 | transformation(stream_with_non_joined_data); |
126 | } |
127 | |
128 | bool hasMoreThanOneStream() const |
129 | { |
130 | return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1; |
131 | } |
132 | |
133 | /// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken. |
134 | bool hasMixedStreams() const |
135 | { |
136 | return hasMoreThanOneStream() || union_stream; |
137 | } |
138 | |
139 | bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; } |
140 | bool initialized() const { return !streams.empty(); } |
141 | }; |
142 | |
143 | template <typename TPipeline> |
144 | void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, QueryPipeline & save_context_and_storage); |
145 | |
146 | struct AnalysisResult |
147 | { |
148 | bool hasJoin() const { return before_join.get(); } |
149 | bool has_where = false; |
150 | bool need_aggregate = false; |
151 | bool has_having = false; |
152 | bool has_order_by = false; |
153 | bool has_limit_by = false; |
154 | |
155 | bool remove_where_filter = false; |
156 | bool optimize_read_in_order = false; |
157 | |
158 | ExpressionActionsPtr before_join; /// including JOIN |
159 | ExpressionActionsPtr before_where; |
160 | ExpressionActionsPtr before_aggregation; |
161 | ExpressionActionsPtr before_having; |
162 | ExpressionActionsPtr before_order_and_select; |
163 | ExpressionActionsPtr before_limit_by; |
164 | ExpressionActionsPtr final_projection; |
165 | |
166 | /// Columns from the SELECT list, before renaming them to aliases. |
167 | Names selected_columns; |
168 | |
169 | /// Columns will be removed after prewhere actions execution. |
170 | Names columns_to_remove_after_prewhere; |
171 | |
172 | /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. |
173 | bool first_stage = false; |
174 | /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. |
175 | bool second_stage = false; |
176 | |
177 | SubqueriesForSets subqueries_for_sets; |
178 | PrewhereInfoPtr prewhere_info; |
179 | FilterInfoPtr filter_info; |
180 | ConstantFilterDescription prewhere_constant_filter_description; |
181 | ConstantFilterDescription where_constant_filter_description; |
182 | }; |
183 | |
184 | static AnalysisResult analyzeExpressions( |
185 | const ASTSelectQuery & query, |
186 | SelectQueryExpressionAnalyzer & query_analyzer, |
187 | QueryProcessingStage::Enum from_stage, |
188 | QueryProcessingStage::Enum to_stage, |
189 | const Context & context, |
190 | const StoragePtr & storage, |
191 | bool only_types, |
192 | const FilterInfoPtr & filter_info, |
193 | const Block & ); |
194 | |
195 | /** From which table to read. With JOIN, the "left" table is returned. |
196 | */ |
197 | static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context); |
198 | |
199 | /// Different stages of query execution. |
200 | |
201 | /// dry_run - don't read from table, use empty header block instead. |
202 | void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); |
203 | |
204 | template <typename TPipeline> |
205 | void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, |
206 | const PrewhereInfoPtr & prewhere_info, |
207 | const Names & columns_to_remove_after_prewhere, |
208 | QueryPipeline & save_context_and_storage); |
209 | |
210 | void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); |
211 | void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
212 | void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final); |
213 | void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
214 | void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); |
215 | void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression); |
216 | void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info); |
217 | void executeWithFill(Pipeline & pipeline); |
218 | void executeMergeSorted(Pipeline & pipeline); |
219 | void executePreLimit(Pipeline & pipeline); |
220 | void executeUnion(Pipeline & pipeline, Block ); |
221 | void executeLimitBy(Pipeline & pipeline); |
222 | void executeLimit(Pipeline & pipeline); |
223 | void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression); |
224 | void executeDistinct(Pipeline & pipeline, bool before_order, Names columns); |
225 | void executeExtremes(Pipeline & pipeline); |
226 | void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets); |
227 | void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit); |
228 | |
229 | void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter); |
230 | void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
231 | void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final); |
232 | void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
233 | void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); |
234 | void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); |
235 | void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info); |
236 | void executeWithFill(QueryPipeline & pipeline); |
237 | void executeMergeSorted(QueryPipeline & pipeline); |
238 | void executePreLimit(QueryPipeline & pipeline); |
239 | void executeLimitBy(QueryPipeline & pipeline); |
240 | void executeLimit(QueryPipeline & pipeline); |
241 | void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); |
242 | void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns); |
243 | void executeExtremes(QueryPipeline & pipeline); |
244 | void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets); |
245 | void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit); |
246 | |
247 | /// Add ConvertingBlockInputStream to specified header. |
248 | void unifyStreams(Pipeline & pipeline, Block ); |
249 | |
250 | enum class Modificator |
251 | { |
252 | ROLLUP = 0, |
253 | CUBE = 1 |
254 | }; |
255 | |
256 | void executeRollupOrCube(Pipeline & pipeline, Modificator modificator); |
257 | |
258 | void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator); |
259 | |
260 | /** If there is a SETTINGS section in the SELECT query, then apply settings from it. |
261 | * |
262 | * Section SETTINGS - settings for a specific query. |
263 | * Normally, the settings can be passed in other ways, not inside the query. |
264 | * But the use of this section is justified if you need to set the settings for one subquery. |
265 | */ |
266 | void initSettings(); |
267 | |
268 | SelectQueryOptions options; |
269 | ASTPtr query_ptr; |
270 | std::shared_ptr<Context> context; |
271 | SyntaxAnalyzerResultPtr syntax_analyzer_result; |
272 | std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer; |
273 | SelectQueryInfo query_info; |
274 | |
275 | /// Is calculated in getSampleBlock. Is used later in readImpl. |
276 | AnalysisResult analysis_result; |
277 | FilterInfoPtr filter_info; |
278 | |
279 | QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; |
280 | |
281 | /// How many streams we ask for storage to produce, and in how many threads we will do further processing. |
282 | size_t max_streams = 1; |
283 | |
284 | /// List of columns to read to execute the query. |
285 | Names required_columns; |
286 | /// Structure of query source (table, subquery, etc). |
287 | Block ; |
288 | /// Structure of query result. |
289 | Block ; |
290 | |
291 | /// The subquery interpreter, if the subquery |
292 | std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery; |
293 | |
294 | /// Table from where to read data, if not subquery. |
295 | StoragePtr storage; |
296 | TableStructureReadLockHolder table_lock; |
297 | |
298 | /// Used when we read from prepared input, not table or subquery. |
299 | BlockInputStreamPtr input; |
300 | |
301 | Poco::Logger * log; |
302 | }; |
303 | |
304 | } |
305 | |