1#include <Interpreters/InterpreterSelectWithUnionQuery.h>
2#include <Interpreters/InterpreterSelectQuery.h>
3#include <Parsers/ASTSelectWithUnionQuery.h>
4#include <Parsers/ASTSelectQuery.h>
5#include <DataStreams/UnionBlockInputStream.h>
6#include <DataStreams/NullBlockInputStream.h>
7#include <DataStreams/ConcatBlockInputStream.h>
8#include <DataStreams/ConvertingBlockInputStream.h>
9#include <Columns/getLeastSuperColumn.h>
10#include <Columns/ColumnConst.h>
11#include <Common/typeid_cast.h>
12#include <Parsers/queryToString.h>
13#include <Parsers/ASTExpressionList.h>
14
15#include <Processors/Sources/NullSource.h>
16#include <Processors/QueryPipeline.h>
17#include <Processors/Pipe.h>
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int LOGICAL_ERROR;
26 extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
27}
28
29
30InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
31 const ASTPtr & query_ptr_,
32 const Context & context_,
33 const SelectQueryOptions & options_,
34 const Names & required_result_column_names)
35 : options(options_),
36 query_ptr(query_ptr_),
37 context(std::make_shared<Context>(context_))
38{
39 const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
40
41 size_t num_selects = ast.list_of_selects->children.size();
42
43 if (!num_selects)
44 throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
45
46 /// Initialize interpreters for each SELECT query.
47 /// Note that we pass 'required_result_column_names' to first SELECT.
48 /// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT,
49 /// because names could be different.
50
51 nested_interpreters.reserve(num_selects);
52
53 std::vector<Names> required_result_column_names_for_other_selects(num_selects);
54 if (!required_result_column_names.empty() && num_selects > 1)
55 {
56 /// Result header if there are no filtering by 'required_result_column_names'.
57 /// We use it to determine positions of 'required_result_column_names' in SELECT clause.
58
59 Block full_result_header = InterpreterSelectQuery(
60 ast.list_of_selects->children.at(0), *context, options.copy().analyze().noModify()).getSampleBlock();
61
62 std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
63 for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num)
64 positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]);
65
66 for (size_t query_num = 1; query_num < num_selects; ++query_num)
67 {
68 Block full_result_header_for_current_select = InterpreterSelectQuery(
69 ast.list_of_selects->children.at(query_num), *context, options.copy().analyze().noModify()).getSampleBlock();
70
71 if (full_result_header_for_current_select.columns() != full_result_header.columns())
72 throw Exception("Different number of columns in UNION ALL elements:\n"
73 + full_result_header.dumpNames()
74 + "\nand\n"
75 + full_result_header_for_current_select.dumpNames() + "\n",
76 ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
77
78 required_result_column_names_for_other_selects[query_num].reserve(required_result_column_names.size());
79 for (const auto & pos : positions_of_required_result_columns)
80 required_result_column_names_for_other_selects[query_num].push_back(full_result_header_for_current_select.getByPosition(pos).name);
81 }
82 }
83
84 for (size_t query_num = 0; query_num < num_selects; ++query_num)
85 {
86 const Names & current_required_result_column_names
87 = query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num];
88
89 nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(
90 ast.list_of_selects->children.at(query_num),
91 *context,
92 options,
93 current_required_result_column_names));
94 }
95
96 /// Determine structure of the result.
97
98 if (num_selects == 1)
99 {
100 result_header = nested_interpreters.front()->getSampleBlock();
101 }
102 else
103 {
104 Blocks headers(num_selects);
105 for (size_t query_num = 0; query_num < num_selects; ++query_num)
106 headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
107
108 result_header = getCommonHeaderForUnion(headers);
109 }
110
111 /// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits.
112 bool all_nested_ignore_limits = true;
113 bool all_nested_ignore_quota = true;
114 for (auto & interpreter : nested_interpreters)
115 {
116 if (!interpreter->ignoreLimits())
117 all_nested_ignore_limits = false;
118 if (!interpreter->ignoreQuota())
119 all_nested_ignore_quota = false;
120 }
121 options.ignore_limits |= all_nested_ignore_limits;
122 options.ignore_quota |= all_nested_ignore_quota;
123}
124
125
126Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & headers)
127{
128 size_t num_selects = headers.size();
129 Block common_header = headers.front();
130 size_t num_columns = common_header.columns();
131
132 for (size_t query_num = 1; query_num < num_selects; ++query_num)
133 {
134 if (headers[query_num].columns() != num_columns)
135 throw Exception("Different number of columns in UNION ALL elements:\n"
136 + common_header.dumpNames()
137 + "\nand\n"
138 + headers[query_num].dumpNames() + "\n",
139 ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
140 }
141
142 std::vector<const ColumnWithTypeAndName *> columns(num_selects);
143
144 for (size_t column_num = 0; column_num < num_columns; ++column_num)
145 {
146 for (size_t i = 0; i < num_selects; ++i)
147 columns[i] = &headers[i].getByPosition(column_num);
148
149 ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num);
150 result_elem = getLeastSuperColumn(columns);
151 }
152
153 return common_header;
154}
155
156
157InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default;
158
159
160Block InterpreterSelectWithUnionQuery::getSampleBlock()
161{
162 return result_header;
163}
164
165Block InterpreterSelectWithUnionQuery::getSampleBlock(
166 const ASTPtr & query_ptr,
167 const Context & context)
168{
169 auto & cache = context.getSampleBlockCache();
170 /// Using query string because query_ptr changes for every internal SELECT
171 auto key = queryToString(query_ptr);
172 if (cache.find(key) != cache.end())
173 {
174 return cache[key];
175 }
176
177 return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
178}
179
180
181BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline)
182{
183 BlockInputStreams nested_streams;
184
185 for (auto & interpreter : nested_interpreters)
186 {
187 BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline);
188 nested_streams.insert(nested_streams.end(), streams.begin(), streams.end());
189 }
190
191 /// Unify data structure.
192 if (nested_interpreters.size() > 1)
193 {
194 for (auto & stream : nested_streams)
195 stream = std::make_shared<ConvertingBlockInputStream>(*context, stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position);
196 parent_pipeline.addInterpreterContext(context);
197 }
198
199 return nested_streams;
200}
201
202
203BlockIO InterpreterSelectWithUnionQuery::execute()
204{
205 const Settings & settings = context->getSettingsRef();
206
207 BlockIO res;
208 BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
209 BlockInputStreamPtr result_stream;
210
211 if (nested_streams.empty())
212 {
213 result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock());
214 }
215 else if (nested_streams.size() == 1)
216 {
217 result_stream = nested_streams.front();
218 nested_streams.clear();
219 }
220 else
221 {
222 result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads);
223 nested_streams.clear();
224 }
225
226 res.in = result_stream;
227 res.pipeline.addInterpreterContext(context);
228 return res;
229}
230
231
232QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
233{
234 QueryPipeline main_pipeline;
235 std::vector<QueryPipeline> pipelines;
236 bool has_main_pipeline = false;
237
238 Blocks headers;
239 headers.reserve(nested_interpreters.size());
240
241 for (auto & interpreter : nested_interpreters)
242 {
243 if (!has_main_pipeline)
244 {
245 has_main_pipeline = true;
246 main_pipeline = interpreter->executeWithProcessors();
247 headers.emplace_back(main_pipeline.getHeader());
248 }
249 else
250 {
251 pipelines.emplace_back(interpreter->executeWithProcessors());
252 headers.emplace_back(pipelines.back().getHeader());
253 }
254 }
255
256 if (!has_main_pipeline)
257 main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
258
259 if (!pipelines.empty())
260 {
261 auto common_header = getCommonHeaderForUnion(headers);
262 main_pipeline.unitePipelines(std::move(pipelines), common_header, *context);
263 }
264
265 main_pipeline.addInterpreterContext(context);
266
267 return main_pipeline;
268}
269
270
271void InterpreterSelectWithUnionQuery::ignoreWithTotals()
272{
273 for (auto & interpreter : nested_interpreters)
274 interpreter->ignoreWithTotals();
275}
276
277}
278