| 1 | #include "MutationsInterpreter.h" |
| 2 | |
| 3 | #include <Functions/FunctionFactory.h> |
| 4 | #include <Functions/IFunction.h> |
| 5 | #include <Interpreters/InDepthNodeVisitor.h> |
| 6 | #include <Interpreters/InterpreterSelectQuery.h> |
| 7 | #include <Interpreters/MutationsInterpreter.h> |
| 8 | #include <Interpreters/SyntaxAnalyzer.h> |
| 9 | #include <Storages/MergeTree/MergeTreeData.h> |
| 10 | #include <DataStreams/FilterBlockInputStream.h> |
| 11 | #include <DataStreams/ExpressionBlockInputStream.h> |
| 12 | #include <DataStreams/CreatingSetsBlockInputStream.h> |
| 13 | #include <DataStreams/MaterializingBlockInputStream.h> |
| 14 | #include <DataStreams/NullBlockInputStream.h> |
| 15 | #include <Parsers/ASTIdentifier.h> |
| 16 | #include <Parsers/ASTFunction.h> |
| 17 | #include <Parsers/ASTLiteral.h> |
| 18 | #include <Parsers/ASTExpressionList.h> |
| 19 | #include <Parsers/ASTSelectQuery.h> |
| 20 | #include <Parsers/formatAST.h> |
| 21 | #include <IO/WriteHelpers.h> |
| 22 | |
| 23 | |
| 24 | namespace DB |
| 25 | { |
| 26 | |
| 27 | namespace ErrorCodes |
| 28 | { |
| 29 | extern const int UNKNOWN_MUTATION_COMMAND; |
| 30 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
| 31 | extern const int CANNOT_UPDATE_COLUMN; |
| 32 | } |
| 33 | |
| 34 | namespace |
| 35 | { |
| 36 | struct FirstNonDeterministicFuncData |
| 37 | { |
| 38 | using TypeToVisit = ASTFunction; |
| 39 | |
| 40 | explicit FirstNonDeterministicFuncData(const Context & context_) |
| 41 | : context{context_} |
| 42 | {} |
| 43 | |
| 44 | const Context & context; |
| 45 | std::optional<String> nondeterministic_function_name; |
| 46 | |
| 47 | void visit(ASTFunction & function, ASTPtr &) |
| 48 | { |
| 49 | if (nondeterministic_function_name) |
| 50 | return; |
| 51 | |
| 52 | const auto func = FunctionFactory::instance().get(function.name, context); |
| 53 | if (!func->isDeterministic()) |
| 54 | nondeterministic_function_name = func->getName(); |
| 55 | } |
| 56 | }; |
| 57 | |
| 58 | using FirstNonDeterministicFuncFinder = |
| 59 | InDepthNodeVisitor<OneTypeMatcher<FirstNonDeterministicFuncData>, true>; |
| 60 | |
| 61 | std::optional<String> findFirstNonDeterministicFuncName(const MutationCommand & command, const Context & context) |
| 62 | { |
| 63 | FirstNonDeterministicFuncData finder_data(context); |
| 64 | |
| 65 | switch (command.type) |
| 66 | { |
| 67 | case MutationCommand::UPDATE: |
| 68 | { |
| 69 | auto update_assignments_ast = command.ast->as<const ASTAlterCommand &>().update_assignments->clone(); |
| 70 | FirstNonDeterministicFuncFinder(finder_data).visit(update_assignments_ast); |
| 71 | |
| 72 | if (finder_data.nondeterministic_function_name) |
| 73 | return finder_data.nondeterministic_function_name; |
| 74 | |
| 75 | [[fallthrough]]; |
| 76 | } |
| 77 | |
| 78 | case MutationCommand::DELETE: |
| 79 | { |
| 80 | auto predicate_ast = command.predicate->clone(); |
| 81 | FirstNonDeterministicFuncFinder(finder_data).visit(predicate_ast); |
| 82 | |
| 83 | return finder_data.nondeterministic_function_name; |
| 84 | } |
| 85 | |
| 86 | default: |
| 87 | break; |
| 88 | } |
| 89 | |
| 90 | return {}; |
| 91 | } |
| 92 | |
| 93 | ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands) |
| 94 | { |
| 95 | /// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query. |
| 96 | /// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that |
| 97 | /// changes how many rows satisfy the predicates of the subsequent commands). |
| 98 | /// But we can be sure that if count = 0, then no rows will be touched. |
| 99 | |
| 100 | auto select = std::make_shared<ASTSelectQuery>(); |
| 101 | |
| 102 | select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>()); |
| 103 | auto count_func = std::make_shared<ASTFunction>(); |
| 104 | count_func->name = "count" ; |
| 105 | count_func->arguments = std::make_shared<ASTExpressionList>(); |
| 106 | select->select()->children.push_back(count_func); |
| 107 | |
| 108 | if (commands.size() == 1) |
| 109 | select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone()); |
| 110 | else |
| 111 | { |
| 112 | auto coalesced_predicates = std::make_shared<ASTFunction>(); |
| 113 | coalesced_predicates->name = "or" ; |
| 114 | coalesced_predicates->arguments = std::make_shared<ASTExpressionList>(); |
| 115 | coalesced_predicates->children.push_back(coalesced_predicates->arguments); |
| 116 | |
| 117 | for (const MutationCommand & command : commands) |
| 118 | coalesced_predicates->arguments->children.push_back(command.predicate->clone()); |
| 119 | |
| 120 | select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates)); |
| 121 | } |
| 122 | |
| 123 | return select; |
| 124 | } |
| 125 | |
| 126 | }; |
| 127 | |
| 128 | bool isStorageTouchedByMutations( |
| 129 | StoragePtr storage, |
| 130 | const std::vector<MutationCommand> & commands, |
| 131 | Context context_copy) |
| 132 | { |
| 133 | if (commands.empty()) |
| 134 | return false; |
| 135 | |
| 136 | for (const MutationCommand & command : commands) |
| 137 | { |
| 138 | if (!command.predicate) /// The command touches all rows. |
| 139 | return true; |
| 140 | } |
| 141 | |
| 142 | context_copy.getSettingsRef().max_streams_to_max_threads_ratio = 1; |
| 143 | context_copy.getSettingsRef().max_threads = 1; |
| 144 | |
| 145 | ASTPtr select_query = prepareQueryAffectedAST(commands); |
| 146 | |
| 147 | /// Interpreter must be alive, when we use result of execute() method. |
| 148 | /// For some reason it may copy context and and give it into ExpressionBlockInputStream |
| 149 | /// after that we will use context from destroyed stack frame in our stream. |
| 150 | InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits()); |
| 151 | BlockInputStreamPtr in = interpreter.execute().in; |
| 152 | |
| 153 | Block block = in->read(); |
| 154 | if (!block.rows()) |
| 155 | return false; |
| 156 | else if (block.rows() != 1) |
| 157 | throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1" , |
| 158 | ErrorCodes::LOGICAL_ERROR); |
| 159 | |
| 160 | auto count = (*block.getByName("count()" ).column)[0].get<UInt64>(); |
| 161 | return count != 0; |
| 162 | |
| 163 | } |
| 164 | |
| 165 | MutationsInterpreter::MutationsInterpreter( |
| 166 | StoragePtr storage_, |
| 167 | std::vector<MutationCommand> commands_, |
| 168 | const Context & context_, |
| 169 | bool can_execute_) |
| 170 | : storage(std::move(storage_)) |
| 171 | , commands(std::move(commands_)) |
| 172 | , context(context_) |
| 173 | , can_execute(can_execute_) |
| 174 | { |
| 175 | mutation_ast = prepare(!can_execute); |
| 176 | SelectQueryOptions limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits(); |
| 177 | select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits); |
| 178 | } |
| 179 | |
| 180 | static NameSet getKeyColumns(const StoragePtr & storage) |
| 181 | { |
| 182 | const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()); |
| 183 | if (!merge_tree_data) |
| 184 | return {}; |
| 185 | |
| 186 | NameSet key_columns; |
| 187 | |
| 188 | if (merge_tree_data->partition_key_expr) |
| 189 | for (const String & col : merge_tree_data->partition_key_expr->getRequiredColumns()) |
| 190 | key_columns.insert(col); |
| 191 | |
| 192 | auto sorting_key_expr = merge_tree_data->sorting_key_expr; |
| 193 | if (sorting_key_expr) |
| 194 | for (const String & col : sorting_key_expr->getRequiredColumns()) |
| 195 | key_columns.insert(col); |
| 196 | /// We don't process sample_by_ast separately because it must be among the primary key columns. |
| 197 | |
| 198 | if (!merge_tree_data->merging_params.sign_column.empty()) |
| 199 | key_columns.insert(merge_tree_data->merging_params.sign_column); |
| 200 | |
| 201 | if (!merge_tree_data->merging_params.version_column.empty()) |
| 202 | key_columns.insert(merge_tree_data->merging_params.version_column); |
| 203 | |
| 204 | return key_columns; |
| 205 | } |
| 206 | |
| 207 | static void validateUpdateColumns( |
| 208 | const StoragePtr & storage, const NameSet & updated_columns, |
| 209 | const std::unordered_map<String, Names> & column_to_affected_materialized) |
| 210 | { |
| 211 | NameSet key_columns = getKeyColumns(storage); |
| 212 | |
| 213 | for (const String & column_name : updated_columns) |
| 214 | { |
| 215 | auto found = false; |
| 216 | for (const auto & col : storage->getColumns().getOrdinary()) |
| 217 | { |
| 218 | if (col.name == column_name) |
| 219 | { |
| 220 | found = true; |
| 221 | break; |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | if (!found) |
| 226 | { |
| 227 | for (const auto & col : storage->getColumns().getMaterialized()) |
| 228 | { |
| 229 | if (col.name == column_name) |
| 230 | throw Exception("Cannot UPDATE materialized column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN); |
| 231 | } |
| 232 | |
| 233 | throw Exception("There is no column " + backQuote(column_name) + " in table" , ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); |
| 234 | } |
| 235 | |
| 236 | if (key_columns.count(column_name)) |
| 237 | throw Exception("Cannot UPDATE key column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN); |
| 238 | |
| 239 | auto materialized_it = column_to_affected_materialized.find(column_name); |
| 240 | if (materialized_it != column_to_affected_materialized.end()) |
| 241 | { |
| 242 | for (const String & materialized : materialized_it->second) |
| 243 | { |
| 244 | if (key_columns.count(materialized)) |
| 245 | throw Exception("Updated column " + backQuote(column_name) + " affects MATERIALIZED column " |
| 246 | + backQuote(materialized) + ", which is a key column. Cannot UPDATE it." , |
| 247 | ErrorCodes::CANNOT_UPDATE_COLUMN); |
| 248 | } |
| 249 | } |
| 250 | } |
| 251 | } |
| 252 | |
| 253 | |
| 254 | ASTPtr MutationsInterpreter::prepare(bool dry_run) |
| 255 | { |
| 256 | if (is_prepared) |
| 257 | throw Exception("MutationsInterpreter is already prepared. It is a bug." , ErrorCodes::LOGICAL_ERROR); |
| 258 | |
| 259 | if (commands.empty()) |
| 260 | throw Exception("Empty mutation commands list" , ErrorCodes::LOGICAL_ERROR); |
| 261 | |
| 262 | const ColumnsDescription & columns_desc = storage->getColumns(); |
| 263 | const IndicesDescription & indices_desc = storage->getIndices(); |
| 264 | NamesAndTypesList all_columns = columns_desc.getAllPhysical(); |
| 265 | |
| 266 | NameSet updated_columns; |
| 267 | for (const MutationCommand & command : commands) |
| 268 | { |
| 269 | for (const auto & kv : command.column_to_update_expression) |
| 270 | updated_columns.insert(kv.first); |
| 271 | } |
| 272 | |
| 273 | /// We need to know which columns affect which MATERIALIZED columns and data skipping indices |
| 274 | /// to recalculate them if dependencies are updated. |
| 275 | std::unordered_map<String, Names> column_to_affected_materialized; |
| 276 | NameSet affected_indices_columns; |
| 277 | if (!updated_columns.empty()) |
| 278 | { |
| 279 | for (const auto & column : columns_desc) |
| 280 | { |
| 281 | if (column.default_desc.kind == ColumnDefaultKind::Materialized) |
| 282 | { |
| 283 | auto query = column.default_desc.expression->clone(); |
| 284 | auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); |
| 285 | for (const String & dependency : syntax_result->requiredSourceColumns()) |
| 286 | { |
| 287 | if (updated_columns.count(dependency)) |
| 288 | column_to_affected_materialized[dependency].push_back(column.name); |
| 289 | } |
| 290 | } |
| 291 | } |
| 292 | for (const auto & index : indices_desc.indices) |
| 293 | { |
| 294 | auto query = index->expr->clone(); |
| 295 | auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); |
| 296 | const auto required_columns = syntax_result->requiredSourceColumns(); |
| 297 | |
| 298 | for (const String & dependency : required_columns) |
| 299 | { |
| 300 | if (updated_columns.count(dependency)) |
| 301 | { |
| 302 | affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns)); |
| 303 | break; |
| 304 | } |
| 305 | } |
| 306 | } |
| 307 | |
| 308 | validateUpdateColumns(storage, updated_columns, column_to_affected_materialized); |
| 309 | } |
| 310 | |
| 311 | /// First, break a sequence of commands into stages. |
| 312 | for (const auto & command : commands) |
| 313 | { |
| 314 | if (command.type == MutationCommand::DELETE) |
| 315 | { |
| 316 | if (stages.empty() || !stages.back().column_to_updated.empty()) |
| 317 | stages.emplace_back(context); |
| 318 | |
| 319 | auto negated_predicate = makeASTFunction("not" , command.predicate->clone()); |
| 320 | stages.back().filters.push_back(negated_predicate); |
| 321 | } |
| 322 | else if (command.type == MutationCommand::UPDATE) |
| 323 | { |
| 324 | if (stages.empty() || !stages.back().column_to_updated.empty()) |
| 325 | stages.emplace_back(context); |
| 326 | if (stages.size() == 1) /// First stage only supports filtering and can't update columns. |
| 327 | stages.emplace_back(context); |
| 328 | |
| 329 | NameSet affected_materialized; |
| 330 | |
| 331 | for (const auto & kv : command.column_to_update_expression) |
| 332 | { |
| 333 | const String & column = kv.first; |
| 334 | |
| 335 | auto materialized_it = column_to_affected_materialized.find(column); |
| 336 | if (materialized_it != column_to_affected_materialized.end()) |
| 337 | { |
| 338 | for (const String & mat_column : materialized_it->second) |
| 339 | affected_materialized.emplace(mat_column); |
| 340 | } |
| 341 | |
| 342 | /// Just to be sure, that we don't change type |
| 343 | /// after update expression execution. |
| 344 | const auto & update_expr = kv.second; |
| 345 | auto updated_column = makeASTFunction("CAST" , |
| 346 | makeASTFunction("if" , |
| 347 | command.predicate->clone(), |
| 348 | update_expr->clone(), |
| 349 | std::make_shared<ASTIdentifier>(column)), |
| 350 | std::make_shared<ASTLiteral>(columns_desc.getPhysical(column).type->getName())); |
| 351 | stages.back().column_to_updated.emplace(column, updated_column); |
| 352 | } |
| 353 | |
| 354 | if (!affected_materialized.empty()) |
| 355 | { |
| 356 | stages.emplace_back(context); |
| 357 | for (const auto & column : columns_desc) |
| 358 | { |
| 359 | if (column.default_desc.kind == ColumnDefaultKind::Materialized) |
| 360 | { |
| 361 | stages.back().column_to_updated.emplace( |
| 362 | column.name, |
| 363 | column.default_desc.expression->clone()); |
| 364 | } |
| 365 | } |
| 366 | } |
| 367 | } |
| 368 | else if (command.type == MutationCommand::MATERIALIZE_INDEX) |
| 369 | { |
| 370 | auto it = std::find_if( |
| 371 | std::cbegin(indices_desc.indices), std::end(indices_desc.indices), |
| 372 | [&](const std::shared_ptr<ASTIndexDeclaration> & index) |
| 373 | { |
| 374 | return index->name == command.index_name; |
| 375 | }); |
| 376 | if (it == std::cend(indices_desc.indices)) |
| 377 | throw Exception("Unknown index: " + command.index_name, ErrorCodes::BAD_ARGUMENTS); |
| 378 | |
| 379 | auto query = (*it)->expr->clone(); |
| 380 | auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); |
| 381 | const auto required_columns = syntax_result->requiredSourceColumns(); |
| 382 | affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns)); |
| 383 | } |
| 384 | else |
| 385 | throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); |
| 386 | } |
| 387 | |
| 388 | /// We cares about affected indices because we also need to rewrite them |
| 389 | /// when one of index columns updated or filtered with delete |
| 390 | if (!affected_indices_columns.empty()) |
| 391 | { |
| 392 | if (!stages.empty()) |
| 393 | { |
| 394 | std::vector<Stage> stages_copy; |
| 395 | /// Copy all filled stages except index calculation stage. |
| 396 | for (const auto & stage : stages) |
| 397 | { |
| 398 | stages_copy.emplace_back(context); |
| 399 | stages_copy.back().column_to_updated = stage.column_to_updated; |
| 400 | stages_copy.back().output_columns = stage.output_columns; |
| 401 | stages_copy.back().filters = stage.filters; |
| 402 | } |
| 403 | |
| 404 | const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true); |
| 405 | InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()}; |
| 406 | |
| 407 | auto = interpreter.getSampleBlock(); |
| 408 | auto in = std::make_shared<NullBlockInputStream>(first_stage_header); |
| 409 | updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader()); |
| 410 | } |
| 411 | /// Special step to recalculate affected indices. |
| 412 | stages.emplace_back(context); |
| 413 | for (const auto & column : affected_indices_columns) |
| 414 | stages.back().column_to_updated.emplace( |
| 415 | column, std::make_shared<ASTIdentifier>(column)); |
| 416 | } |
| 417 | |
| 418 | is_prepared = true; |
| 419 | |
| 420 | return prepareInterpreterSelectQuery(stages, dry_run); |
| 421 | } |
| 422 | |
| 423 | ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run) |
| 424 | { |
| 425 | NamesAndTypesList all_columns = storage->getColumns().getAllPhysical(); |
| 426 | |
| 427 | /// Next, for each stage calculate columns changed by this and previous stages. |
| 428 | for (size_t i = 0; i < prepared_stages.size(); ++i) |
| 429 | { |
| 430 | if (!prepared_stages[i].filters.empty()) |
| 431 | { |
| 432 | for (const auto & column : all_columns) |
| 433 | prepared_stages[i].output_columns.insert(column.name); |
| 434 | continue; |
| 435 | } |
| 436 | |
| 437 | if (i > 0) |
| 438 | prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; |
| 439 | |
| 440 | if (prepared_stages[i].output_columns.size() < all_columns.size()) |
| 441 | { |
| 442 | for (const auto & kv : prepared_stages[i].column_to_updated) |
| 443 | prepared_stages[i].output_columns.insert(kv.first); |
| 444 | } |
| 445 | } |
| 446 | |
| 447 | /// Now, calculate `expressions_chain` for each stage except the first. |
| 448 | /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. |
| 449 | for (size_t i = prepared_stages.size() - 1; i > 0; --i) |
| 450 | { |
| 451 | auto & stage = prepared_stages[i]; |
| 452 | |
| 453 | ASTPtr all_asts = std::make_shared<ASTExpressionList>(); |
| 454 | |
| 455 | for (const auto & ast : stage.filters) |
| 456 | all_asts->children.push_back(ast); |
| 457 | |
| 458 | for (const auto & kv : stage.column_to_updated) |
| 459 | all_asts->children.push_back(kv.second); |
| 460 | |
| 461 | /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. |
| 462 | for (const String & column : stage.output_columns) |
| 463 | all_asts->children.push_back(std::make_shared<ASTIdentifier>(column)); |
| 464 | |
| 465 | auto syntax_result = SyntaxAnalyzer(context).analyze(all_asts, all_columns); |
| 466 | stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context); |
| 467 | |
| 468 | ExpressionActionsChain & actions_chain = stage.expressions_chain; |
| 469 | |
| 470 | for (const auto & ast : stage.filters) |
| 471 | { |
| 472 | if (!actions_chain.steps.empty()) |
| 473 | actions_chain.addStep(); |
| 474 | stage.analyzer->appendExpression(actions_chain, ast, dry_run); |
| 475 | stage.filter_column_names.push_back(ast->getColumnName()); |
| 476 | } |
| 477 | |
| 478 | if (!stage.column_to_updated.empty()) |
| 479 | { |
| 480 | if (!actions_chain.steps.empty()) |
| 481 | actions_chain.addStep(); |
| 482 | |
| 483 | for (const auto & kv : stage.column_to_updated) |
| 484 | stage.analyzer->appendExpression(actions_chain, kv.second, dry_run); |
| 485 | |
| 486 | for (const auto & kv : stage.column_to_updated) |
| 487 | { |
| 488 | actions_chain.getLastActions()->add(ExpressionAction::copyColumn( |
| 489 | kv.second->getColumnName(), kv.first, /* can_replace = */ true)); |
| 490 | } |
| 491 | } |
| 492 | |
| 493 | /// Remove all intermediate columns. |
| 494 | actions_chain.addStep(); |
| 495 | actions_chain.getLastStep().required_output.assign(stage.output_columns.begin(), stage.output_columns.end()); |
| 496 | |
| 497 | actions_chain.finalize(); |
| 498 | |
| 499 | /// Propagate information about columns needed as input. |
| 500 | for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes()) |
| 501 | prepared_stages[i - 1].output_columns.insert(column.name); |
| 502 | } |
| 503 | |
| 504 | /// Execute first stage as a SELECT statement. |
| 505 | |
| 506 | auto select = std::make_shared<ASTSelectQuery>(); |
| 507 | |
| 508 | select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>()); |
| 509 | for (const auto & column_name : prepared_stages[0].output_columns) |
| 510 | select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name)); |
| 511 | |
| 512 | if (!prepared_stages[0].filters.empty()) |
| 513 | { |
| 514 | ASTPtr where_expression; |
| 515 | if (prepared_stages[0].filters.size() == 1) |
| 516 | where_expression = prepared_stages[0].filters[0]; |
| 517 | else |
| 518 | { |
| 519 | auto coalesced_predicates = std::make_shared<ASTFunction>(); |
| 520 | coalesced_predicates->name = "and" ; |
| 521 | coalesced_predicates->arguments = std::make_shared<ASTExpressionList>(); |
| 522 | coalesced_predicates->children.push_back(coalesced_predicates->arguments); |
| 523 | coalesced_predicates->arguments->children = prepared_stages[0].filters; |
| 524 | where_expression = std::move(coalesced_predicates); |
| 525 | } |
| 526 | select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); |
| 527 | } |
| 528 | |
| 529 | return select; |
| 530 | } |
| 531 | |
| 532 | BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const |
| 533 | { |
| 534 | for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage) |
| 535 | { |
| 536 | const Stage & stage = prepared_stages[i_stage]; |
| 537 | |
| 538 | for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) |
| 539 | { |
| 540 | const auto & step = stage.expressions_chain.steps[i]; |
| 541 | if (i < stage.filter_column_names.size()) |
| 542 | { |
| 543 | /// Execute DELETEs. |
| 544 | in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]); |
| 545 | } |
| 546 | else |
| 547 | { |
| 548 | /// Execute UPDATE or final projection. |
| 549 | in = std::make_shared<ExpressionBlockInputStream>(in, step.actions); |
| 550 | } |
| 551 | } |
| 552 | |
| 553 | const SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets(); |
| 554 | if (!subqueries_for_sets.empty()) |
| 555 | in = std::make_shared<CreatingSetsBlockInputStream>(in, subqueries_for_sets, context); |
| 556 | } |
| 557 | |
| 558 | in = std::make_shared<MaterializingBlockInputStream>(in); |
| 559 | |
| 560 | return in; |
| 561 | } |
| 562 | |
| 563 | void MutationsInterpreter::validate(TableStructureReadLockHolder &) |
| 564 | { |
| 565 | /// For Replicated* storages mutations cannot employ non-deterministic functions |
| 566 | /// because that produces inconsistencies between replicas |
| 567 | if (startsWith(storage->getName(), "Replicated" )) |
| 568 | { |
| 569 | for (const auto & command : commands) |
| 570 | { |
| 571 | const auto nondeterministic_func_name = findFirstNonDeterministicFuncName(command, context); |
| 572 | if (nondeterministic_func_name) |
| 573 | throw Exception( |
| 574 | "ALTER UPDATE/ALTER DELETE statements must use only deterministic functions! " |
| 575 | "Function '" + *nondeterministic_func_name + "' is non-deterministic" , |
| 576 | ErrorCodes::BAD_ARGUMENTS); |
| 577 | } |
| 578 | } |
| 579 | |
| 580 | /// Do not use getSampleBlock in order to check the whole pipeline. |
| 581 | Block = select_interpreter->execute().in->getHeader(); |
| 582 | BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header); |
| 583 | addStreamsForLaterStages(stages, in)->getHeader(); |
| 584 | } |
| 585 | |
| 586 | BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &) |
| 587 | { |
| 588 | if (!can_execute) |
| 589 | throw Exception("Cannot execute mutations interpreter because can_execute flag set to false" , ErrorCodes::LOGICAL_ERROR); |
| 590 | |
| 591 | BlockInputStreamPtr in = select_interpreter->execute().in; |
| 592 | auto result_stream = addStreamsForLaterStages(stages, in); |
| 593 | if (!updated_header) |
| 594 | updated_header = std::make_unique<Block>(result_stream->getHeader()); |
| 595 | return result_stream; |
| 596 | } |
| 597 | |
| 598 | const Block & MutationsInterpreter::() const |
| 599 | { |
| 600 | return *updated_header; |
| 601 | } |
| 602 | |
| 603 | |
| 604 | size_t MutationsInterpreter::evaluateCommandsSize() |
| 605 | { |
| 606 | for (const MutationCommand & command : commands) |
| 607 | if (unlikely(!command.predicate)) /// The command touches all rows. |
| 608 | return mutation_ast->size(); |
| 609 | |
| 610 | return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size()); |
| 611 | } |
| 612 | |
| 613 | } |
| 614 | |