| 1 | #pragma once |
| 2 | |
| 3 | #include <memory> |
| 4 | |
| 5 | #include <Core/QueryProcessingStage.h> |
| 6 | #include <Parsers/ASTSelectQuery.h> |
| 7 | #include <DataStreams/IBlockStream_fwd.h> |
| 8 | #include <Interpreters/Context.h> |
| 9 | #include <Interpreters/ExpressionActions.h> |
| 10 | #include <Interpreters/ExpressionAnalyzer.h> |
| 11 | #include <Interpreters/IInterpreter.h> |
| 12 | #include <Interpreters/SelectQueryOptions.h> |
| 13 | #include <Storages/SelectQueryInfo.h> |
| 14 | #include <Storages/TableStructureLockHolder.h> |
| 15 | #include <Storages/ReadInOrderOptimizer.h> |
| 16 | |
| 17 | #include <Processors/QueryPipeline.h> |
| 18 | #include <Columns/FilterDescription.h> |
| 19 | |
| 20 | namespace Poco { class Logger; } |
| 21 | |
| 22 | namespace DB |
| 23 | { |
| 24 | |
| 25 | struct SubqueryForSet; |
| 26 | class InterpreterSelectWithUnionQuery; |
| 27 | |
| 28 | struct SyntaxAnalyzerResult; |
| 29 | using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; |
| 30 | |
| 31 | |
| 32 | /** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage. |
| 33 | */ |
| 34 | class InterpreterSelectQuery : public IInterpreter |
| 35 | { |
| 36 | public: |
| 37 | /** |
| 38 | * query_ptr |
| 39 | * - A query AST to interpret. |
| 40 | * |
| 41 | * required_result_column_names |
| 42 | * - don't calculate all columns except the specified ones from the query |
| 43 | * - it is used to remove calculation (and reading) of unnecessary columns from subqueries. |
| 44 | * empty means - use all columns. |
| 45 | */ |
| 46 | |
| 47 | InterpreterSelectQuery( |
| 48 | const ASTPtr & query_ptr_, |
| 49 | const Context & context_, |
| 50 | const SelectQueryOptions &, |
| 51 | const Names & required_result_column_names_ = Names{}); |
| 52 | |
| 53 | /// Read data not from the table specified in the query, but from the prepared source `input`. |
| 54 | InterpreterSelectQuery( |
| 55 | const ASTPtr & query_ptr_, |
| 56 | const Context & context_, |
| 57 | const BlockInputStreamPtr & input_, |
| 58 | const SelectQueryOptions & = {}); |
| 59 | |
| 60 | /// Read data not from the table specified in the query, but from the specified `storage_`. |
| 61 | InterpreterSelectQuery( |
| 62 | const ASTPtr & query_ptr_, |
| 63 | const Context & context_, |
| 64 | const StoragePtr & storage_, |
| 65 | const SelectQueryOptions & = {}); |
| 66 | |
| 67 | ~InterpreterSelectQuery() override; |
| 68 | |
| 69 | /// Execute a query. Get the stream of blocks to read. |
| 70 | BlockIO execute() override; |
| 71 | |
| 72 | /// Execute the query and return multuple streams for parallel processing. |
| 73 | BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline); |
| 74 | |
| 75 | QueryPipeline executeWithProcessors() override; |
| 76 | bool canExecuteWithProcessors() const override { return true; } |
| 77 | |
| 78 | bool ignoreLimits() const override { return options.ignore_limits; } |
| 79 | bool ignoreQuota() const override { return options.ignore_quota; } |
| 80 | |
| 81 | Block getSampleBlock(); |
| 82 | |
| 83 | void ignoreWithTotals(); |
| 84 | |
| 85 | ASTPtr getQuery() const { return query_ptr; } |
| 86 | |
| 87 | private: |
| 88 | InterpreterSelectQuery( |
| 89 | const ASTPtr & query_ptr_, |
| 90 | const Context & context_, |
| 91 | const BlockInputStreamPtr & input_, |
| 92 | const StoragePtr & storage_, |
| 93 | const SelectQueryOptions &, |
| 94 | const Names & required_result_column_names = {}); |
| 95 | |
| 96 | ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); } |
| 97 | |
| 98 | Block getSampleBlockImpl(); |
| 99 | |
| 100 | struct Pipeline |
| 101 | { |
| 102 | /** Streams of data. |
| 103 | * The source data streams are produced in the executeFetchColumns function. |
| 104 | * Then they are converted (wrapped in other streams) using the `execute*` functions, |
| 105 | * to get the whole pipeline running the query. |
| 106 | */ |
| 107 | BlockInputStreams streams; |
| 108 | |
| 109 | /** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows. |
| 110 | * It has a special meaning, since reading from it should be done after reading from the main streams. |
| 111 | * It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream. |
| 112 | */ |
| 113 | BlockInputStreamPtr stream_with_non_joined_data; |
| 114 | bool union_stream = false; |
| 115 | |
| 116 | BlockInputStreamPtr & firstStream() { return streams.at(0); } |
| 117 | |
| 118 | template <typename Transform> |
| 119 | void transform(Transform && transformation) |
| 120 | { |
| 121 | for (auto & stream : streams) |
| 122 | transformation(stream); |
| 123 | |
| 124 | if (stream_with_non_joined_data) |
| 125 | transformation(stream_with_non_joined_data); |
| 126 | } |
| 127 | |
| 128 | bool hasMoreThanOneStream() const |
| 129 | { |
| 130 | return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1; |
| 131 | } |
| 132 | |
| 133 | /// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken. |
| 134 | bool hasMixedStreams() const |
| 135 | { |
| 136 | return hasMoreThanOneStream() || union_stream; |
| 137 | } |
| 138 | |
| 139 | bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; } |
| 140 | bool initialized() const { return !streams.empty(); } |
| 141 | }; |
| 142 | |
| 143 | template <typename TPipeline> |
| 144 | void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, QueryPipeline & save_context_and_storage); |
| 145 | |
| 146 | struct AnalysisResult |
| 147 | { |
| 148 | bool hasJoin() const { return before_join.get(); } |
| 149 | bool has_where = false; |
| 150 | bool need_aggregate = false; |
| 151 | bool has_having = false; |
| 152 | bool has_order_by = false; |
| 153 | bool has_limit_by = false; |
| 154 | |
| 155 | bool remove_where_filter = false; |
| 156 | bool optimize_read_in_order = false; |
| 157 | |
| 158 | ExpressionActionsPtr before_join; /// including JOIN |
| 159 | ExpressionActionsPtr before_where; |
| 160 | ExpressionActionsPtr before_aggregation; |
| 161 | ExpressionActionsPtr before_having; |
| 162 | ExpressionActionsPtr before_order_and_select; |
| 163 | ExpressionActionsPtr before_limit_by; |
| 164 | ExpressionActionsPtr final_projection; |
| 165 | |
| 166 | /// Columns from the SELECT list, before renaming them to aliases. |
| 167 | Names selected_columns; |
| 168 | |
| 169 | /// Columns will be removed after prewhere actions execution. |
| 170 | Names columns_to_remove_after_prewhere; |
| 171 | |
| 172 | /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. |
| 173 | bool first_stage = false; |
| 174 | /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. |
| 175 | bool second_stage = false; |
| 176 | |
| 177 | SubqueriesForSets subqueries_for_sets; |
| 178 | PrewhereInfoPtr prewhere_info; |
| 179 | FilterInfoPtr filter_info; |
| 180 | ConstantFilterDescription prewhere_constant_filter_description; |
| 181 | ConstantFilterDescription where_constant_filter_description; |
| 182 | }; |
| 183 | |
| 184 | static AnalysisResult analyzeExpressions( |
| 185 | const ASTSelectQuery & query, |
| 186 | SelectQueryExpressionAnalyzer & query_analyzer, |
| 187 | QueryProcessingStage::Enum from_stage, |
| 188 | QueryProcessingStage::Enum to_stage, |
| 189 | const Context & context, |
| 190 | const StoragePtr & storage, |
| 191 | bool only_types, |
| 192 | const FilterInfoPtr & filter_info, |
| 193 | const Block & ); |
| 194 | |
| 195 | /** From which table to read. With JOIN, the "left" table is returned. |
| 196 | */ |
| 197 | static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context); |
| 198 | |
| 199 | /// Different stages of query execution. |
| 200 | |
| 201 | /// dry_run - don't read from table, use empty header block instead. |
| 202 | void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); |
| 203 | |
| 204 | template <typename TPipeline> |
| 205 | void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, |
| 206 | const PrewhereInfoPtr & prewhere_info, |
| 207 | const Names & columns_to_remove_after_prewhere, |
| 208 | QueryPipeline & save_context_and_storage); |
| 209 | |
| 210 | void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); |
| 211 | void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
| 212 | void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final); |
| 213 | void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
| 214 | void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); |
| 215 | void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression); |
| 216 | void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info); |
| 217 | void executeWithFill(Pipeline & pipeline); |
| 218 | void executeMergeSorted(Pipeline & pipeline); |
| 219 | void executePreLimit(Pipeline & pipeline); |
| 220 | void executeUnion(Pipeline & pipeline, Block ); |
| 221 | void executeLimitBy(Pipeline & pipeline); |
| 222 | void executeLimit(Pipeline & pipeline); |
| 223 | void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression); |
| 224 | void executeDistinct(Pipeline & pipeline, bool before_order, Names columns); |
| 225 | void executeExtremes(Pipeline & pipeline); |
| 226 | void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets); |
| 227 | void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit); |
| 228 | |
| 229 | void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter); |
| 230 | void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
| 231 | void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final); |
| 232 | void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); |
| 233 | void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); |
| 234 | void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); |
| 235 | void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info); |
| 236 | void executeWithFill(QueryPipeline & pipeline); |
| 237 | void executeMergeSorted(QueryPipeline & pipeline); |
| 238 | void executePreLimit(QueryPipeline & pipeline); |
| 239 | void executeLimitBy(QueryPipeline & pipeline); |
| 240 | void executeLimit(QueryPipeline & pipeline); |
| 241 | void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); |
| 242 | void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns); |
| 243 | void executeExtremes(QueryPipeline & pipeline); |
| 244 | void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets); |
| 245 | void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit); |
| 246 | |
| 247 | /// Add ConvertingBlockInputStream to specified header. |
| 248 | void unifyStreams(Pipeline & pipeline, Block ); |
| 249 | |
| 250 | enum class Modificator |
| 251 | { |
| 252 | ROLLUP = 0, |
| 253 | CUBE = 1 |
| 254 | }; |
| 255 | |
| 256 | void executeRollupOrCube(Pipeline & pipeline, Modificator modificator); |
| 257 | |
| 258 | void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator); |
| 259 | |
| 260 | /** If there is a SETTINGS section in the SELECT query, then apply settings from it. |
| 261 | * |
| 262 | * Section SETTINGS - settings for a specific query. |
| 263 | * Normally, the settings can be passed in other ways, not inside the query. |
| 264 | * But the use of this section is justified if you need to set the settings for one subquery. |
| 265 | */ |
| 266 | void initSettings(); |
| 267 | |
| 268 | SelectQueryOptions options; |
| 269 | ASTPtr query_ptr; |
| 270 | std::shared_ptr<Context> context; |
| 271 | SyntaxAnalyzerResultPtr syntax_analyzer_result; |
| 272 | std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer; |
| 273 | SelectQueryInfo query_info; |
| 274 | |
| 275 | /// Is calculated in getSampleBlock. Is used later in readImpl. |
| 276 | AnalysisResult analysis_result; |
| 277 | FilterInfoPtr filter_info; |
| 278 | |
| 279 | QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; |
| 280 | |
| 281 | /// How many streams we ask for storage to produce, and in how many threads we will do further processing. |
| 282 | size_t max_streams = 1; |
| 283 | |
| 284 | /// List of columns to read to execute the query. |
| 285 | Names required_columns; |
| 286 | /// Structure of query source (table, subquery, etc). |
| 287 | Block ; |
| 288 | /// Structure of query result. |
| 289 | Block ; |
| 290 | |
| 291 | /// The subquery interpreter, if the subquery |
| 292 | std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery; |
| 293 | |
| 294 | /// Table from where to read data, if not subquery. |
| 295 | StoragePtr storage; |
| 296 | TableStructureReadLockHolder table_lock; |
| 297 | |
| 298 | /// Used when we read from prepared input, not table or subquery. |
| 299 | BlockInputStreamPtr input; |
| 300 | |
| 301 | Poco::Logger * log; |
| 302 | }; |
| 303 | |
| 304 | } |
| 305 | |