1 | #include <Interpreters/InterpreterSelectWithUnionQuery.h> |
2 | #include <Interpreters/InterpreterSelectQuery.h> |
3 | #include <Parsers/ASTSelectWithUnionQuery.h> |
4 | #include <Parsers/ASTSelectQuery.h> |
5 | #include <DataStreams/UnionBlockInputStream.h> |
6 | #include <DataStreams/NullBlockInputStream.h> |
7 | #include <DataStreams/ConcatBlockInputStream.h> |
8 | #include <DataStreams/ConvertingBlockInputStream.h> |
9 | #include <Columns/getLeastSuperColumn.h> |
10 | #include <Columns/ColumnConst.h> |
11 | #include <Common/typeid_cast.h> |
12 | #include <Parsers/queryToString.h> |
13 | #include <Parsers/ASTExpressionList.h> |
14 | |
15 | #include <Processors/Sources/NullSource.h> |
16 | #include <Processors/QueryPipeline.h> |
17 | #include <Processors/Pipe.h> |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int LOGICAL_ERROR; |
26 | extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH; |
27 | } |
28 | |
29 | |
30 | InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( |
31 | const ASTPtr & query_ptr_, |
32 | const Context & context_, |
33 | const SelectQueryOptions & options_, |
34 | const Names & required_result_column_names) |
35 | : options(options_), |
36 | query_ptr(query_ptr_), |
37 | context(std::make_shared<Context>(context_)) |
38 | { |
39 | const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>(); |
40 | |
41 | size_t num_selects = ast.list_of_selects->children.size(); |
42 | |
43 | if (!num_selects) |
44 | throw Exception("Logical error: no children in ASTSelectWithUnionQuery" , ErrorCodes::LOGICAL_ERROR); |
45 | |
46 | /// Initialize interpreters for each SELECT query. |
47 | /// Note that we pass 'required_result_column_names' to first SELECT. |
48 | /// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT, |
49 | /// because names could be different. |
50 | |
51 | nested_interpreters.reserve(num_selects); |
52 | |
53 | std::vector<Names> required_result_column_names_for_other_selects(num_selects); |
54 | if (!required_result_column_names.empty() && num_selects > 1) |
55 | { |
56 | /// Result header if there are no filtering by 'required_result_column_names'. |
57 | /// We use it to determine positions of 'required_result_column_names' in SELECT clause. |
58 | |
59 | Block = InterpreterSelectQuery( |
60 | ast.list_of_selects->children.at(0), *context, options.copy().analyze().noModify()).getSampleBlock(); |
61 | |
62 | std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size()); |
63 | for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num) |
64 | positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]); |
65 | |
66 | for (size_t query_num = 1; query_num < num_selects; ++query_num) |
67 | { |
68 | Block = InterpreterSelectQuery( |
69 | ast.list_of_selects->children.at(query_num), *context, options.copy().analyze().noModify()).getSampleBlock(); |
70 | |
71 | if (full_result_header_for_current_select.columns() != full_result_header.columns()) |
72 | throw Exception("Different number of columns in UNION ALL elements:\n" |
73 | + full_result_header.dumpNames() |
74 | + "\nand\n" |
75 | + full_result_header_for_current_select.dumpNames() + "\n" , |
76 | ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); |
77 | |
78 | required_result_column_names_for_other_selects[query_num].reserve(required_result_column_names.size()); |
79 | for (const auto & pos : positions_of_required_result_columns) |
80 | required_result_column_names_for_other_selects[query_num].push_back(full_result_header_for_current_select.getByPosition(pos).name); |
81 | } |
82 | } |
83 | |
84 | for (size_t query_num = 0; query_num < num_selects; ++query_num) |
85 | { |
86 | const Names & current_required_result_column_names |
87 | = query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num]; |
88 | |
89 | nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>( |
90 | ast.list_of_selects->children.at(query_num), |
91 | *context, |
92 | options, |
93 | current_required_result_column_names)); |
94 | } |
95 | |
96 | /// Determine structure of the result. |
97 | |
98 | if (num_selects == 1) |
99 | { |
100 | result_header = nested_interpreters.front()->getSampleBlock(); |
101 | } |
102 | else |
103 | { |
104 | Blocks (num_selects); |
105 | for (size_t query_num = 0; query_num < num_selects; ++query_num) |
106 | headers[query_num] = nested_interpreters[query_num]->getSampleBlock(); |
107 | |
108 | result_header = getCommonHeaderForUnion(headers); |
109 | } |
110 | |
111 | /// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits. |
112 | bool all_nested_ignore_limits = true; |
113 | bool all_nested_ignore_quota = true; |
114 | for (auto & interpreter : nested_interpreters) |
115 | { |
116 | if (!interpreter->ignoreLimits()) |
117 | all_nested_ignore_limits = false; |
118 | if (!interpreter->ignoreQuota()) |
119 | all_nested_ignore_quota = false; |
120 | } |
121 | options.ignore_limits |= all_nested_ignore_limits; |
122 | options.ignore_quota |= all_nested_ignore_quota; |
123 | } |
124 | |
125 | |
126 | Block InterpreterSelectWithUnionQuery::(const Blocks & ) |
127 | { |
128 | size_t num_selects = headers.size(); |
129 | Block = headers.front(); |
130 | size_t num_columns = common_header.columns(); |
131 | |
132 | for (size_t query_num = 1; query_num < num_selects; ++query_num) |
133 | { |
134 | if (headers[query_num].columns() != num_columns) |
135 | throw Exception("Different number of columns in UNION ALL elements:\n" |
136 | + common_header.dumpNames() |
137 | + "\nand\n" |
138 | + headers[query_num].dumpNames() + "\n" , |
139 | ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); |
140 | } |
141 | |
142 | std::vector<const ColumnWithTypeAndName *> columns(num_selects); |
143 | |
144 | for (size_t column_num = 0; column_num < num_columns; ++column_num) |
145 | { |
146 | for (size_t i = 0; i < num_selects; ++i) |
147 | columns[i] = &headers[i].getByPosition(column_num); |
148 | |
149 | ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num); |
150 | result_elem = getLeastSuperColumn(columns); |
151 | } |
152 | |
153 | return common_header; |
154 | } |
155 | |
156 | |
157 | InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default; |
158 | |
159 | |
160 | Block InterpreterSelectWithUnionQuery::getSampleBlock() |
161 | { |
162 | return result_header; |
163 | } |
164 | |
165 | Block InterpreterSelectWithUnionQuery::getSampleBlock( |
166 | const ASTPtr & query_ptr, |
167 | const Context & context) |
168 | { |
169 | auto & cache = context.getSampleBlockCache(); |
170 | /// Using query string because query_ptr changes for every internal SELECT |
171 | auto key = queryToString(query_ptr); |
172 | if (cache.find(key) != cache.end()) |
173 | { |
174 | return cache[key]; |
175 | } |
176 | |
177 | return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock(); |
178 | } |
179 | |
180 | |
181 | BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline) |
182 | { |
183 | BlockInputStreams nested_streams; |
184 | |
185 | for (auto & interpreter : nested_interpreters) |
186 | { |
187 | BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline); |
188 | nested_streams.insert(nested_streams.end(), streams.begin(), streams.end()); |
189 | } |
190 | |
191 | /// Unify data structure. |
192 | if (nested_interpreters.size() > 1) |
193 | { |
194 | for (auto & stream : nested_streams) |
195 | stream = std::make_shared<ConvertingBlockInputStream>(*context, stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position); |
196 | parent_pipeline.addInterpreterContext(context); |
197 | } |
198 | |
199 | return nested_streams; |
200 | } |
201 | |
202 | |
203 | BlockIO InterpreterSelectWithUnionQuery::execute() |
204 | { |
205 | const Settings & settings = context->getSettingsRef(); |
206 | |
207 | BlockIO res; |
208 | BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline); |
209 | BlockInputStreamPtr result_stream; |
210 | |
211 | if (nested_streams.empty()) |
212 | { |
213 | result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock()); |
214 | } |
215 | else if (nested_streams.size() == 1) |
216 | { |
217 | result_stream = nested_streams.front(); |
218 | nested_streams.clear(); |
219 | } |
220 | else |
221 | { |
222 | result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads); |
223 | nested_streams.clear(); |
224 | } |
225 | |
226 | res.in = result_stream; |
227 | res.pipeline.addInterpreterContext(context); |
228 | return res; |
229 | } |
230 | |
231 | |
232 | QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors() |
233 | { |
234 | QueryPipeline main_pipeline; |
235 | std::vector<QueryPipeline> pipelines; |
236 | bool has_main_pipeline = false; |
237 | |
238 | Blocks ; |
239 | headers.reserve(nested_interpreters.size()); |
240 | |
241 | for (auto & interpreter : nested_interpreters) |
242 | { |
243 | if (!has_main_pipeline) |
244 | { |
245 | has_main_pipeline = true; |
246 | main_pipeline = interpreter->executeWithProcessors(); |
247 | headers.emplace_back(main_pipeline.getHeader()); |
248 | } |
249 | else |
250 | { |
251 | pipelines.emplace_back(interpreter->executeWithProcessors()); |
252 | headers.emplace_back(pipelines.back().getHeader()); |
253 | } |
254 | } |
255 | |
256 | if (!has_main_pipeline) |
257 | main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock()))); |
258 | |
259 | if (!pipelines.empty()) |
260 | { |
261 | auto = getCommonHeaderForUnion(headers); |
262 | main_pipeline.unitePipelines(std::move(pipelines), common_header, *context); |
263 | } |
264 | |
265 | main_pipeline.addInterpreterContext(context); |
266 | |
267 | return main_pipeline; |
268 | } |
269 | |
270 | |
271 | void InterpreterSelectWithUnionQuery::ignoreWithTotals() |
272 | { |
273 | for (auto & interpreter : nested_interpreters) |
274 | interpreter->ignoreWithTotals(); |
275 | } |
276 | |
277 | } |
278 | |