| 1 | #pragma once | 
|---|
| 2 |  | 
|---|
| 3 | #include <memory> | 
|---|
| 4 |  | 
|---|
| 5 | #include <Core/QueryProcessingStage.h> | 
|---|
| 6 | #include <Parsers/ASTSelectQuery.h> | 
|---|
| 7 | #include <DataStreams/IBlockStream_fwd.h> | 
|---|
| 8 | #include <Interpreters/Context.h> | 
|---|
| 9 | #include <Interpreters/ExpressionActions.h> | 
|---|
| 10 | #include <Interpreters/ExpressionAnalyzer.h> | 
|---|
| 11 | #include <Interpreters/IInterpreter.h> | 
|---|
| 12 | #include <Interpreters/SelectQueryOptions.h> | 
|---|
| 13 | #include <Storages/SelectQueryInfo.h> | 
|---|
| 14 | #include <Storages/TableStructureLockHolder.h> | 
|---|
| 15 | #include <Storages/ReadInOrderOptimizer.h> | 
|---|
| 16 |  | 
|---|
| 17 | #include <Processors/QueryPipeline.h> | 
|---|
| 18 | #include <Columns/FilterDescription.h> | 
|---|
| 19 |  | 
|---|
| 20 | namespace Poco { class Logger; } | 
|---|
| 21 |  | 
|---|
| 22 | namespace DB | 
|---|
| 23 | { | 
|---|
| 24 |  | 
|---|
| 25 | struct SubqueryForSet; | 
|---|
| 26 | class InterpreterSelectWithUnionQuery; | 
|---|
| 27 |  | 
|---|
| 28 | struct SyntaxAnalyzerResult; | 
|---|
| 29 | using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; | 
|---|
| 30 |  | 
|---|
| 31 |  | 
|---|
| 32 | /** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage. | 
|---|
| 33 | */ | 
|---|
| 34 | class InterpreterSelectQuery : public IInterpreter | 
|---|
| 35 | { | 
|---|
| 36 | public: | 
|---|
| 37 | /** | 
|---|
| 38 | * query_ptr | 
|---|
| 39 | * - A query AST to interpret. | 
|---|
| 40 | * | 
|---|
| 41 | * required_result_column_names | 
|---|
| 42 | * - don't calculate all columns except the specified ones from the query | 
|---|
| 43 | *  - it is used to remove calculation (and reading) of unnecessary columns from subqueries. | 
|---|
| 44 | *   empty means - use all columns. | 
|---|
| 45 | */ | 
|---|
| 46 |  | 
|---|
| 47 | InterpreterSelectQuery( | 
|---|
| 48 | const ASTPtr & query_ptr_, | 
|---|
| 49 | const Context & context_, | 
|---|
| 50 | const SelectQueryOptions &, | 
|---|
| 51 | const Names & required_result_column_names_ = Names{}); | 
|---|
| 52 |  | 
|---|
| 53 | /// Read data not from the table specified in the query, but from the prepared source `input`. | 
|---|
| 54 | InterpreterSelectQuery( | 
|---|
| 55 | const ASTPtr & query_ptr_, | 
|---|
| 56 | const Context & context_, | 
|---|
| 57 | const BlockInputStreamPtr & input_, | 
|---|
| 58 | const SelectQueryOptions & = {}); | 
|---|
| 59 |  | 
|---|
| 60 | /// Read data not from the table specified in the query, but from the specified `storage_`. | 
|---|
| 61 | InterpreterSelectQuery( | 
|---|
| 62 | const ASTPtr & query_ptr_, | 
|---|
| 63 | const Context & context_, | 
|---|
| 64 | const StoragePtr & storage_, | 
|---|
| 65 | const SelectQueryOptions & = {}); | 
|---|
| 66 |  | 
|---|
| 67 | ~InterpreterSelectQuery() override; | 
|---|
| 68 |  | 
|---|
| 69 | /// Execute a query. Get the stream of blocks to read. | 
|---|
| 70 | BlockIO execute() override; | 
|---|
| 71 |  | 
|---|
| 72 | /// Execute the query and return multuple streams for parallel processing. | 
|---|
| 73 | BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline); | 
|---|
| 74 |  | 
|---|
| 75 | QueryPipeline executeWithProcessors() override; | 
|---|
| 76 | bool canExecuteWithProcessors() const override { return true; } | 
|---|
| 77 |  | 
|---|
| 78 | bool ignoreLimits() const override { return options.ignore_limits; } | 
|---|
| 79 | bool ignoreQuota() const override { return options.ignore_quota; } | 
|---|
| 80 |  | 
|---|
| 81 | Block getSampleBlock(); | 
|---|
| 82 |  | 
|---|
| 83 | void ignoreWithTotals(); | 
|---|
| 84 |  | 
|---|
| 85 | ASTPtr getQuery() const { return query_ptr; } | 
|---|
| 86 |  | 
|---|
| 87 | private: | 
|---|
| 88 | InterpreterSelectQuery( | 
|---|
| 89 | const ASTPtr & query_ptr_, | 
|---|
| 90 | const Context & context_, | 
|---|
| 91 | const BlockInputStreamPtr & input_, | 
|---|
| 92 | const StoragePtr & storage_, | 
|---|
| 93 | const SelectQueryOptions &, | 
|---|
| 94 | const Names & required_result_column_names = {}); | 
|---|
| 95 |  | 
|---|
| 96 | ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); } | 
|---|
| 97 |  | 
|---|
| 98 | Block getSampleBlockImpl(); | 
|---|
| 99 |  | 
|---|
| 100 | struct Pipeline | 
|---|
| 101 | { | 
|---|
| 102 | /** Streams of data. | 
|---|
| 103 | * The source data streams are produced in the executeFetchColumns function. | 
|---|
| 104 | * Then they are converted (wrapped in other streams) using the `execute*` functions, | 
|---|
| 105 | *  to get the whole pipeline running the query. | 
|---|
| 106 | */ | 
|---|
| 107 | BlockInputStreams streams; | 
|---|
| 108 |  | 
|---|
| 109 | /** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows. | 
|---|
| 110 | * It has a special meaning, since reading from it should be done after reading from the main streams. | 
|---|
| 111 | * It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream. | 
|---|
| 112 | */ | 
|---|
| 113 | BlockInputStreamPtr stream_with_non_joined_data; | 
|---|
| 114 | bool union_stream = false; | 
|---|
| 115 |  | 
|---|
| 116 | BlockInputStreamPtr & firstStream() { return streams.at(0); } | 
|---|
| 117 |  | 
|---|
| 118 | template <typename Transform> | 
|---|
| 119 | void transform(Transform && transformation) | 
|---|
| 120 | { | 
|---|
| 121 | for (auto & stream : streams) | 
|---|
| 122 | transformation(stream); | 
|---|
| 123 |  | 
|---|
| 124 | if (stream_with_non_joined_data) | 
|---|
| 125 | transformation(stream_with_non_joined_data); | 
|---|
| 126 | } | 
|---|
| 127 |  | 
|---|
| 128 | bool hasMoreThanOneStream() const | 
|---|
| 129 | { | 
|---|
| 130 | return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1; | 
|---|
| 131 | } | 
|---|
| 132 |  | 
|---|
| 133 | /// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken. | 
|---|
| 134 | bool hasMixedStreams() const | 
|---|
| 135 | { | 
|---|
| 136 | return hasMoreThanOneStream() || union_stream; | 
|---|
| 137 | } | 
|---|
| 138 |  | 
|---|
| 139 | bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; } | 
|---|
| 140 | bool initialized() const { return !streams.empty(); } | 
|---|
| 141 | }; | 
|---|
| 142 |  | 
|---|
| 143 | template <typename TPipeline> | 
|---|
| 144 | void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, QueryPipeline & save_context_and_storage); | 
|---|
| 145 |  | 
|---|
| 146 | struct AnalysisResult | 
|---|
| 147 | { | 
|---|
| 148 | bool hasJoin() const { return before_join.get(); } | 
|---|
| 149 | bool has_where      = false; | 
|---|
| 150 | bool need_aggregate = false; | 
|---|
| 151 | bool has_having     = false; | 
|---|
| 152 | bool has_order_by   = false; | 
|---|
| 153 | bool has_limit_by   = false; | 
|---|
| 154 |  | 
|---|
| 155 | bool remove_where_filter = false; | 
|---|
| 156 | bool optimize_read_in_order = false; | 
|---|
| 157 |  | 
|---|
| 158 | ExpressionActionsPtr before_join;   /// including JOIN | 
|---|
| 159 | ExpressionActionsPtr before_where; | 
|---|
| 160 | ExpressionActionsPtr before_aggregation; | 
|---|
| 161 | ExpressionActionsPtr before_having; | 
|---|
| 162 | ExpressionActionsPtr before_order_and_select; | 
|---|
| 163 | ExpressionActionsPtr before_limit_by; | 
|---|
| 164 | ExpressionActionsPtr final_projection; | 
|---|
| 165 |  | 
|---|
| 166 | /// Columns from the SELECT list, before renaming them to aliases. | 
|---|
| 167 | Names selected_columns; | 
|---|
| 168 |  | 
|---|
| 169 | /// Columns will be removed after prewhere actions execution. | 
|---|
| 170 | Names columns_to_remove_after_prewhere; | 
|---|
| 171 |  | 
|---|
| 172 | /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. | 
|---|
| 173 | bool first_stage = false; | 
|---|
| 174 | /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. | 
|---|
| 175 | bool second_stage = false; | 
|---|
| 176 |  | 
|---|
| 177 | SubqueriesForSets subqueries_for_sets; | 
|---|
| 178 | PrewhereInfoPtr prewhere_info; | 
|---|
| 179 | FilterInfoPtr filter_info; | 
|---|
| 180 | ConstantFilterDescription prewhere_constant_filter_description; | 
|---|
| 181 | ConstantFilterDescription where_constant_filter_description; | 
|---|
| 182 | }; | 
|---|
| 183 |  | 
|---|
| 184 | static AnalysisResult analyzeExpressions( | 
|---|
| 185 | const ASTSelectQuery & query, | 
|---|
| 186 | SelectQueryExpressionAnalyzer & query_analyzer, | 
|---|
| 187 | QueryProcessingStage::Enum from_stage, | 
|---|
| 188 | QueryProcessingStage::Enum to_stage, | 
|---|
| 189 | const Context & context, | 
|---|
| 190 | const StoragePtr & storage, | 
|---|
| 191 | bool only_types, | 
|---|
| 192 | const FilterInfoPtr & filter_info, | 
|---|
| 193 | const Block & ); | 
|---|
| 194 |  | 
|---|
| 195 | /** From which table to read. With JOIN, the "left" table is returned. | 
|---|
| 196 | */ | 
|---|
| 197 | static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context); | 
|---|
| 198 |  | 
|---|
| 199 | /// Different stages of query execution. | 
|---|
| 200 |  | 
|---|
| 201 | /// dry_run - don't read from table, use empty header block instead. | 
|---|
| 202 | void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); | 
|---|
| 203 |  | 
|---|
| 204 | template <typename TPipeline> | 
|---|
| 205 | void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, | 
|---|
| 206 | const PrewhereInfoPtr & prewhere_info, | 
|---|
| 207 | const Names & columns_to_remove_after_prewhere, | 
|---|
| 208 | QueryPipeline & save_context_and_storage); | 
|---|
| 209 |  | 
|---|
| 210 | void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); | 
|---|
| 211 | void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); | 
|---|
| 212 | void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final); | 
|---|
| 213 | void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); | 
|---|
| 214 | void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); | 
|---|
| 215 | void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression); | 
|---|
| 216 | void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info); | 
|---|
| 217 | void executeWithFill(Pipeline & pipeline); | 
|---|
| 218 | void executeMergeSorted(Pipeline & pipeline); | 
|---|
| 219 | void executePreLimit(Pipeline & pipeline); | 
|---|
| 220 | void executeUnion(Pipeline & pipeline, Block ); | 
|---|
| 221 | void executeLimitBy(Pipeline & pipeline); | 
|---|
| 222 | void executeLimit(Pipeline & pipeline); | 
|---|
| 223 | void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression); | 
|---|
| 224 | void executeDistinct(Pipeline & pipeline, bool before_order, Names columns); | 
|---|
| 225 | void executeExtremes(Pipeline & pipeline); | 
|---|
| 226 | void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets); | 
|---|
| 227 | void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit); | 
|---|
| 228 |  | 
|---|
| 229 | void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter); | 
|---|
| 230 | void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); | 
|---|
| 231 | void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final); | 
|---|
| 232 | void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); | 
|---|
| 233 | void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); | 
|---|
| 234 | void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); | 
|---|
| 235 | void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info); | 
|---|
| 236 | void executeWithFill(QueryPipeline & pipeline); | 
|---|
| 237 | void executeMergeSorted(QueryPipeline & pipeline); | 
|---|
| 238 | void executePreLimit(QueryPipeline & pipeline); | 
|---|
| 239 | void executeLimitBy(QueryPipeline & pipeline); | 
|---|
| 240 | void executeLimit(QueryPipeline & pipeline); | 
|---|
| 241 | void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); | 
|---|
| 242 | void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns); | 
|---|
| 243 | void executeExtremes(QueryPipeline & pipeline); | 
|---|
| 244 | void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets); | 
|---|
| 245 | void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit); | 
|---|
| 246 |  | 
|---|
| 247 | /// Add ConvertingBlockInputStream to specified header. | 
|---|
| 248 | void unifyStreams(Pipeline & pipeline, Block ); | 
|---|
| 249 |  | 
|---|
| 250 | enum class Modificator | 
|---|
| 251 | { | 
|---|
| 252 | ROLLUP = 0, | 
|---|
| 253 | CUBE = 1 | 
|---|
| 254 | }; | 
|---|
| 255 |  | 
|---|
| 256 | void executeRollupOrCube(Pipeline & pipeline, Modificator modificator); | 
|---|
| 257 |  | 
|---|
| 258 | void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator); | 
|---|
| 259 |  | 
|---|
| 260 | /** If there is a SETTINGS section in the SELECT query, then apply settings from it. | 
|---|
| 261 | * | 
|---|
| 262 | * Section SETTINGS - settings for a specific query. | 
|---|
| 263 | * Normally, the settings can be passed in other ways, not inside the query. | 
|---|
| 264 | * But the use of this section is justified if you need to set the settings for one subquery. | 
|---|
| 265 | */ | 
|---|
| 266 | void initSettings(); | 
|---|
| 267 |  | 
|---|
| 268 | SelectQueryOptions options; | 
|---|
| 269 | ASTPtr query_ptr; | 
|---|
| 270 | std::shared_ptr<Context> context; | 
|---|
| 271 | SyntaxAnalyzerResultPtr syntax_analyzer_result; | 
|---|
| 272 | std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer; | 
|---|
| 273 | SelectQueryInfo query_info; | 
|---|
| 274 |  | 
|---|
| 275 | /// Is calculated in getSampleBlock. Is used later in readImpl. | 
|---|
| 276 | AnalysisResult analysis_result; | 
|---|
| 277 | FilterInfoPtr filter_info; | 
|---|
| 278 |  | 
|---|
| 279 | QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; | 
|---|
| 280 |  | 
|---|
| 281 | /// How many streams we ask for storage to produce, and in how many threads we will do further processing. | 
|---|
| 282 | size_t max_streams = 1; | 
|---|
| 283 |  | 
|---|
| 284 | /// List of columns to read to execute the query. | 
|---|
| 285 | Names required_columns; | 
|---|
| 286 | /// Structure of query source (table, subquery, etc). | 
|---|
| 287 | Block ; | 
|---|
| 288 | /// Structure of query result. | 
|---|
| 289 | Block ; | 
|---|
| 290 |  | 
|---|
| 291 | /// The subquery interpreter, if the subquery | 
|---|
| 292 | std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery; | 
|---|
| 293 |  | 
|---|
| 294 | /// Table from where to read data, if not subquery. | 
|---|
| 295 | StoragePtr storage; | 
|---|
| 296 | TableStructureReadLockHolder table_lock; | 
|---|
| 297 |  | 
|---|
| 298 | /// Used when we read from prepared input, not table or subquery. | 
|---|
| 299 | BlockInputStreamPtr input; | 
|---|
| 300 |  | 
|---|
| 301 | Poco::Logger * log; | 
|---|
| 302 | }; | 
|---|
| 303 |  | 
|---|
| 304 | } | 
|---|
| 305 |  | 
|---|