| 1 | #include <Interpreters/InterpreterSelectWithUnionQuery.h> |
| 2 | #include <Interpreters/InterpreterSelectQuery.h> |
| 3 | #include <Parsers/ASTSelectWithUnionQuery.h> |
| 4 | #include <Parsers/ASTSelectQuery.h> |
| 5 | #include <DataStreams/UnionBlockInputStream.h> |
| 6 | #include <DataStreams/NullBlockInputStream.h> |
| 7 | #include <DataStreams/ConcatBlockInputStream.h> |
| 8 | #include <DataStreams/ConvertingBlockInputStream.h> |
| 9 | #include <Columns/getLeastSuperColumn.h> |
| 10 | #include <Columns/ColumnConst.h> |
| 11 | #include <Common/typeid_cast.h> |
| 12 | #include <Parsers/queryToString.h> |
| 13 | #include <Parsers/ASTExpressionList.h> |
| 14 | |
| 15 | #include <Processors/Sources/NullSource.h> |
| 16 | #include <Processors/QueryPipeline.h> |
| 17 | #include <Processors/Pipe.h> |
| 18 | |
| 19 | |
| 20 | namespace DB |
| 21 | { |
| 22 | |
| 23 | namespace ErrorCodes |
| 24 | { |
| 25 | extern const int LOGICAL_ERROR; |
| 26 | extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH; |
| 27 | } |
| 28 | |
| 29 | |
| 30 | InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( |
| 31 | const ASTPtr & query_ptr_, |
| 32 | const Context & context_, |
| 33 | const SelectQueryOptions & options_, |
| 34 | const Names & required_result_column_names) |
| 35 | : options(options_), |
| 36 | query_ptr(query_ptr_), |
| 37 | context(std::make_shared<Context>(context_)) |
| 38 | { |
| 39 | const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>(); |
| 40 | |
| 41 | size_t num_selects = ast.list_of_selects->children.size(); |
| 42 | |
| 43 | if (!num_selects) |
| 44 | throw Exception("Logical error: no children in ASTSelectWithUnionQuery" , ErrorCodes::LOGICAL_ERROR); |
| 45 | |
| 46 | /// Initialize interpreters for each SELECT query. |
| 47 | /// Note that we pass 'required_result_column_names' to first SELECT. |
| 48 | /// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT, |
| 49 | /// because names could be different. |
| 50 | |
| 51 | nested_interpreters.reserve(num_selects); |
| 52 | |
| 53 | std::vector<Names> required_result_column_names_for_other_selects(num_selects); |
| 54 | if (!required_result_column_names.empty() && num_selects > 1) |
| 55 | { |
| 56 | /// Result header if there are no filtering by 'required_result_column_names'. |
| 57 | /// We use it to determine positions of 'required_result_column_names' in SELECT clause. |
| 58 | |
| 59 | Block = InterpreterSelectQuery( |
| 60 | ast.list_of_selects->children.at(0), *context, options.copy().analyze().noModify()).getSampleBlock(); |
| 61 | |
| 62 | std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size()); |
| 63 | for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num) |
| 64 | positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]); |
| 65 | |
| 66 | for (size_t query_num = 1; query_num < num_selects; ++query_num) |
| 67 | { |
| 68 | Block = InterpreterSelectQuery( |
| 69 | ast.list_of_selects->children.at(query_num), *context, options.copy().analyze().noModify()).getSampleBlock(); |
| 70 | |
| 71 | if (full_result_header_for_current_select.columns() != full_result_header.columns()) |
| 72 | throw Exception("Different number of columns in UNION ALL elements:\n" |
| 73 | + full_result_header.dumpNames() |
| 74 | + "\nand\n" |
| 75 | + full_result_header_for_current_select.dumpNames() + "\n" , |
| 76 | ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); |
| 77 | |
| 78 | required_result_column_names_for_other_selects[query_num].reserve(required_result_column_names.size()); |
| 79 | for (const auto & pos : positions_of_required_result_columns) |
| 80 | required_result_column_names_for_other_selects[query_num].push_back(full_result_header_for_current_select.getByPosition(pos).name); |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | for (size_t query_num = 0; query_num < num_selects; ++query_num) |
| 85 | { |
| 86 | const Names & current_required_result_column_names |
| 87 | = query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num]; |
| 88 | |
| 89 | nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>( |
| 90 | ast.list_of_selects->children.at(query_num), |
| 91 | *context, |
| 92 | options, |
| 93 | current_required_result_column_names)); |
| 94 | } |
| 95 | |
| 96 | /// Determine structure of the result. |
| 97 | |
| 98 | if (num_selects == 1) |
| 99 | { |
| 100 | result_header = nested_interpreters.front()->getSampleBlock(); |
| 101 | } |
| 102 | else |
| 103 | { |
| 104 | Blocks (num_selects); |
| 105 | for (size_t query_num = 0; query_num < num_selects; ++query_num) |
| 106 | headers[query_num] = nested_interpreters[query_num]->getSampleBlock(); |
| 107 | |
| 108 | result_header = getCommonHeaderForUnion(headers); |
| 109 | } |
| 110 | |
| 111 | /// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits. |
| 112 | bool all_nested_ignore_limits = true; |
| 113 | bool all_nested_ignore_quota = true; |
| 114 | for (auto & interpreter : nested_interpreters) |
| 115 | { |
| 116 | if (!interpreter->ignoreLimits()) |
| 117 | all_nested_ignore_limits = false; |
| 118 | if (!interpreter->ignoreQuota()) |
| 119 | all_nested_ignore_quota = false; |
| 120 | } |
| 121 | options.ignore_limits |= all_nested_ignore_limits; |
| 122 | options.ignore_quota |= all_nested_ignore_quota; |
| 123 | } |
| 124 | |
| 125 | |
| 126 | Block InterpreterSelectWithUnionQuery::(const Blocks & ) |
| 127 | { |
| 128 | size_t num_selects = headers.size(); |
| 129 | Block = headers.front(); |
| 130 | size_t num_columns = common_header.columns(); |
| 131 | |
| 132 | for (size_t query_num = 1; query_num < num_selects; ++query_num) |
| 133 | { |
| 134 | if (headers[query_num].columns() != num_columns) |
| 135 | throw Exception("Different number of columns in UNION ALL elements:\n" |
| 136 | + common_header.dumpNames() |
| 137 | + "\nand\n" |
| 138 | + headers[query_num].dumpNames() + "\n" , |
| 139 | ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); |
| 140 | } |
| 141 | |
| 142 | std::vector<const ColumnWithTypeAndName *> columns(num_selects); |
| 143 | |
| 144 | for (size_t column_num = 0; column_num < num_columns; ++column_num) |
| 145 | { |
| 146 | for (size_t i = 0; i < num_selects; ++i) |
| 147 | columns[i] = &headers[i].getByPosition(column_num); |
| 148 | |
| 149 | ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num); |
| 150 | result_elem = getLeastSuperColumn(columns); |
| 151 | } |
| 152 | |
| 153 | return common_header; |
| 154 | } |
| 155 | |
| 156 | |
| 157 | InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default; |
| 158 | |
| 159 | |
| 160 | Block InterpreterSelectWithUnionQuery::getSampleBlock() |
| 161 | { |
| 162 | return result_header; |
| 163 | } |
| 164 | |
| 165 | Block InterpreterSelectWithUnionQuery::getSampleBlock( |
| 166 | const ASTPtr & query_ptr, |
| 167 | const Context & context) |
| 168 | { |
| 169 | auto & cache = context.getSampleBlockCache(); |
| 170 | /// Using query string because query_ptr changes for every internal SELECT |
| 171 | auto key = queryToString(query_ptr); |
| 172 | if (cache.find(key) != cache.end()) |
| 173 | { |
| 174 | return cache[key]; |
| 175 | } |
| 176 | |
| 177 | return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock(); |
| 178 | } |
| 179 | |
| 180 | |
| 181 | BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline) |
| 182 | { |
| 183 | BlockInputStreams nested_streams; |
| 184 | |
| 185 | for (auto & interpreter : nested_interpreters) |
| 186 | { |
| 187 | BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline); |
| 188 | nested_streams.insert(nested_streams.end(), streams.begin(), streams.end()); |
| 189 | } |
| 190 | |
| 191 | /// Unify data structure. |
| 192 | if (nested_interpreters.size() > 1) |
| 193 | { |
| 194 | for (auto & stream : nested_streams) |
| 195 | stream = std::make_shared<ConvertingBlockInputStream>(*context, stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position); |
| 196 | parent_pipeline.addInterpreterContext(context); |
| 197 | } |
| 198 | |
| 199 | return nested_streams; |
| 200 | } |
| 201 | |
| 202 | |
| 203 | BlockIO InterpreterSelectWithUnionQuery::execute() |
| 204 | { |
| 205 | const Settings & settings = context->getSettingsRef(); |
| 206 | |
| 207 | BlockIO res; |
| 208 | BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline); |
| 209 | BlockInputStreamPtr result_stream; |
| 210 | |
| 211 | if (nested_streams.empty()) |
| 212 | { |
| 213 | result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock()); |
| 214 | } |
| 215 | else if (nested_streams.size() == 1) |
| 216 | { |
| 217 | result_stream = nested_streams.front(); |
| 218 | nested_streams.clear(); |
| 219 | } |
| 220 | else |
| 221 | { |
| 222 | result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads); |
| 223 | nested_streams.clear(); |
| 224 | } |
| 225 | |
| 226 | res.in = result_stream; |
| 227 | res.pipeline.addInterpreterContext(context); |
| 228 | return res; |
| 229 | } |
| 230 | |
| 231 | |
| 232 | QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors() |
| 233 | { |
| 234 | QueryPipeline main_pipeline; |
| 235 | std::vector<QueryPipeline> pipelines; |
| 236 | bool has_main_pipeline = false; |
| 237 | |
| 238 | Blocks ; |
| 239 | headers.reserve(nested_interpreters.size()); |
| 240 | |
| 241 | for (auto & interpreter : nested_interpreters) |
| 242 | { |
| 243 | if (!has_main_pipeline) |
| 244 | { |
| 245 | has_main_pipeline = true; |
| 246 | main_pipeline = interpreter->executeWithProcessors(); |
| 247 | headers.emplace_back(main_pipeline.getHeader()); |
| 248 | } |
| 249 | else |
| 250 | { |
| 251 | pipelines.emplace_back(interpreter->executeWithProcessors()); |
| 252 | headers.emplace_back(pipelines.back().getHeader()); |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | if (!has_main_pipeline) |
| 257 | main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock()))); |
| 258 | |
| 259 | if (!pipelines.empty()) |
| 260 | { |
| 261 | auto = getCommonHeaderForUnion(headers); |
| 262 | main_pipeline.unitePipelines(std::move(pipelines), common_header, *context); |
| 263 | } |
| 264 | |
| 265 | main_pipeline.addInterpreterContext(context); |
| 266 | |
| 267 | return main_pipeline; |
| 268 | } |
| 269 | |
| 270 | |
| 271 | void InterpreterSelectWithUnionQuery::ignoreWithTotals() |
| 272 | { |
| 273 | for (auto & interpreter : nested_interpreters) |
| 274 | interpreter->ignoreWithTotals(); |
| 275 | } |
| 276 | |
| 277 | } |
| 278 | |