1#include "MutationsInterpreter.h"
2
3#include <Functions/FunctionFactory.h>
4#include <Functions/IFunction.h>
5#include <Interpreters/InDepthNodeVisitor.h>
6#include <Interpreters/InterpreterSelectQuery.h>
7#include <Interpreters/MutationsInterpreter.h>
8#include <Interpreters/SyntaxAnalyzer.h>
9#include <Storages/MergeTree/MergeTreeData.h>
10#include <DataStreams/FilterBlockInputStream.h>
11#include <DataStreams/ExpressionBlockInputStream.h>
12#include <DataStreams/CreatingSetsBlockInputStream.h>
13#include <DataStreams/MaterializingBlockInputStream.h>
14#include <DataStreams/NullBlockInputStream.h>
15#include <Parsers/ASTIdentifier.h>
16#include <Parsers/ASTFunction.h>
17#include <Parsers/ASTLiteral.h>
18#include <Parsers/ASTExpressionList.h>
19#include <Parsers/ASTSelectQuery.h>
20#include <Parsers/formatAST.h>
21#include <IO/WriteHelpers.h>
22
23
24namespace DB
25{
26
27namespace ErrorCodes
28{
29 extern const int UNKNOWN_MUTATION_COMMAND;
30 extern const int NO_SUCH_COLUMN_IN_TABLE;
31 extern const int CANNOT_UPDATE_COLUMN;
32}
33
34namespace
35{
36struct FirstNonDeterministicFuncData
37{
38 using TypeToVisit = ASTFunction;
39
40 explicit FirstNonDeterministicFuncData(const Context & context_)
41 : context{context_}
42 {}
43
44 const Context & context;
45 std::optional<String> nondeterministic_function_name;
46
47 void visit(ASTFunction & function, ASTPtr &)
48 {
49 if (nondeterministic_function_name)
50 return;
51
52 const auto func = FunctionFactory::instance().get(function.name, context);
53 if (!func->isDeterministic())
54 nondeterministic_function_name = func->getName();
55 }
56};
57
58using FirstNonDeterministicFuncFinder =
59 InDepthNodeVisitor<OneTypeMatcher<FirstNonDeterministicFuncData>, true>;
60
61std::optional<String> findFirstNonDeterministicFuncName(const MutationCommand & command, const Context & context)
62{
63 FirstNonDeterministicFuncData finder_data(context);
64
65 switch (command.type)
66 {
67 case MutationCommand::UPDATE:
68 {
69 auto update_assignments_ast = command.ast->as<const ASTAlterCommand &>().update_assignments->clone();
70 FirstNonDeterministicFuncFinder(finder_data).visit(update_assignments_ast);
71
72 if (finder_data.nondeterministic_function_name)
73 return finder_data.nondeterministic_function_name;
74
75 [[fallthrough]];
76 }
77
78 case MutationCommand::DELETE:
79 {
80 auto predicate_ast = command.predicate->clone();
81 FirstNonDeterministicFuncFinder(finder_data).visit(predicate_ast);
82
83 return finder_data.nondeterministic_function_name;
84 }
85
86 default:
87 break;
88 }
89
90 return {};
91}
92
93ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands)
94{
95 /// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
96 /// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that
97 /// changes how many rows satisfy the predicates of the subsequent commands).
98 /// But we can be sure that if count = 0, then no rows will be touched.
99
100 auto select = std::make_shared<ASTSelectQuery>();
101
102 select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
103 auto count_func = std::make_shared<ASTFunction>();
104 count_func->name = "count";
105 count_func->arguments = std::make_shared<ASTExpressionList>();
106 select->select()->children.push_back(count_func);
107
108 if (commands.size() == 1)
109 select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone());
110 else
111 {
112 auto coalesced_predicates = std::make_shared<ASTFunction>();
113 coalesced_predicates->name = "or";
114 coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
115 coalesced_predicates->children.push_back(coalesced_predicates->arguments);
116
117 for (const MutationCommand & command : commands)
118 coalesced_predicates->arguments->children.push_back(command.predicate->clone());
119
120 select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates));
121 }
122
123 return select;
124}
125
126};
127
128bool isStorageTouchedByMutations(
129 StoragePtr storage,
130 const std::vector<MutationCommand> & commands,
131 Context context_copy)
132{
133 if (commands.empty())
134 return false;
135
136 for (const MutationCommand & command : commands)
137 {
138 if (!command.predicate) /// The command touches all rows.
139 return true;
140 }
141
142 context_copy.getSettingsRef().max_streams_to_max_threads_ratio = 1;
143 context_copy.getSettingsRef().max_threads = 1;
144
145 ASTPtr select_query = prepareQueryAffectedAST(commands);
146
147 /// Interpreter must be alive, when we use result of execute() method.
148 /// For some reason it may copy context and and give it into ExpressionBlockInputStream
149 /// after that we will use context from destroyed stack frame in our stream.
150 InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits());
151 BlockInputStreamPtr in = interpreter.execute().in;
152
153 Block block = in->read();
154 if (!block.rows())
155 return false;
156 else if (block.rows() != 1)
157 throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1",
158 ErrorCodes::LOGICAL_ERROR);
159
160 auto count = (*block.getByName("count()").column)[0].get<UInt64>();
161 return count != 0;
162
163}
164
165MutationsInterpreter::MutationsInterpreter(
166 StoragePtr storage_,
167 std::vector<MutationCommand> commands_,
168 const Context & context_,
169 bool can_execute_)
170 : storage(std::move(storage_))
171 , commands(std::move(commands_))
172 , context(context_)
173 , can_execute(can_execute_)
174{
175 mutation_ast = prepare(!can_execute);
176 SelectQueryOptions limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits();
177 select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits);
178}
179
180static NameSet getKeyColumns(const StoragePtr & storage)
181{
182 const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get());
183 if (!merge_tree_data)
184 return {};
185
186 NameSet key_columns;
187
188 if (merge_tree_data->partition_key_expr)
189 for (const String & col : merge_tree_data->partition_key_expr->getRequiredColumns())
190 key_columns.insert(col);
191
192 auto sorting_key_expr = merge_tree_data->sorting_key_expr;
193 if (sorting_key_expr)
194 for (const String & col : sorting_key_expr->getRequiredColumns())
195 key_columns.insert(col);
196 /// We don't process sample_by_ast separately because it must be among the primary key columns.
197
198 if (!merge_tree_data->merging_params.sign_column.empty())
199 key_columns.insert(merge_tree_data->merging_params.sign_column);
200
201 if (!merge_tree_data->merging_params.version_column.empty())
202 key_columns.insert(merge_tree_data->merging_params.version_column);
203
204 return key_columns;
205}
206
207static void validateUpdateColumns(
208 const StoragePtr & storage, const NameSet & updated_columns,
209 const std::unordered_map<String, Names> & column_to_affected_materialized)
210{
211 NameSet key_columns = getKeyColumns(storage);
212
213 for (const String & column_name : updated_columns)
214 {
215 auto found = false;
216 for (const auto & col : storage->getColumns().getOrdinary())
217 {
218 if (col.name == column_name)
219 {
220 found = true;
221 break;
222 }
223 }
224
225 if (!found)
226 {
227 for (const auto & col : storage->getColumns().getMaterialized())
228 {
229 if (col.name == column_name)
230 throw Exception("Cannot UPDATE materialized column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
231 }
232
233 throw Exception("There is no column " + backQuote(column_name) + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
234 }
235
236 if (key_columns.count(column_name))
237 throw Exception("Cannot UPDATE key column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
238
239 auto materialized_it = column_to_affected_materialized.find(column_name);
240 if (materialized_it != column_to_affected_materialized.end())
241 {
242 for (const String & materialized : materialized_it->second)
243 {
244 if (key_columns.count(materialized))
245 throw Exception("Updated column " + backQuote(column_name) + " affects MATERIALIZED column "
246 + backQuote(materialized) + ", which is a key column. Cannot UPDATE it.",
247 ErrorCodes::CANNOT_UPDATE_COLUMN);
248 }
249 }
250 }
251}
252
253
254ASTPtr MutationsInterpreter::prepare(bool dry_run)
255{
256 if (is_prepared)
257 throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR);
258
259 if (commands.empty())
260 throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
261
262 const ColumnsDescription & columns_desc = storage->getColumns();
263 const IndicesDescription & indices_desc = storage->getIndices();
264 NamesAndTypesList all_columns = columns_desc.getAllPhysical();
265
266 NameSet updated_columns;
267 for (const MutationCommand & command : commands)
268 {
269 for (const auto & kv : command.column_to_update_expression)
270 updated_columns.insert(kv.first);
271 }
272
273 /// We need to know which columns affect which MATERIALIZED columns and data skipping indices
274 /// to recalculate them if dependencies are updated.
275 std::unordered_map<String, Names> column_to_affected_materialized;
276 NameSet affected_indices_columns;
277 if (!updated_columns.empty())
278 {
279 for (const auto & column : columns_desc)
280 {
281 if (column.default_desc.kind == ColumnDefaultKind::Materialized)
282 {
283 auto query = column.default_desc.expression->clone();
284 auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
285 for (const String & dependency : syntax_result->requiredSourceColumns())
286 {
287 if (updated_columns.count(dependency))
288 column_to_affected_materialized[dependency].push_back(column.name);
289 }
290 }
291 }
292 for (const auto & index : indices_desc.indices)
293 {
294 auto query = index->expr->clone();
295 auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
296 const auto required_columns = syntax_result->requiredSourceColumns();
297
298 for (const String & dependency : required_columns)
299 {
300 if (updated_columns.count(dependency))
301 {
302 affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
303 break;
304 }
305 }
306 }
307
308 validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
309 }
310
311 /// First, break a sequence of commands into stages.
312 for (const auto & command : commands)
313 {
314 if (command.type == MutationCommand::DELETE)
315 {
316 if (stages.empty() || !stages.back().column_to_updated.empty())
317 stages.emplace_back(context);
318
319 auto negated_predicate = makeASTFunction("not", command.predicate->clone());
320 stages.back().filters.push_back(negated_predicate);
321 }
322 else if (command.type == MutationCommand::UPDATE)
323 {
324 if (stages.empty() || !stages.back().column_to_updated.empty())
325 stages.emplace_back(context);
326 if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
327 stages.emplace_back(context);
328
329 NameSet affected_materialized;
330
331 for (const auto & kv : command.column_to_update_expression)
332 {
333 const String & column = kv.first;
334
335 auto materialized_it = column_to_affected_materialized.find(column);
336 if (materialized_it != column_to_affected_materialized.end())
337 {
338 for (const String & mat_column : materialized_it->second)
339 affected_materialized.emplace(mat_column);
340 }
341
342 /// Just to be sure, that we don't change type
343 /// after update expression execution.
344 const auto & update_expr = kv.second;
345 auto updated_column = makeASTFunction("CAST",
346 makeASTFunction("if",
347 command.predicate->clone(),
348 update_expr->clone(),
349 std::make_shared<ASTIdentifier>(column)),
350 std::make_shared<ASTLiteral>(columns_desc.getPhysical(column).type->getName()));
351 stages.back().column_to_updated.emplace(column, updated_column);
352 }
353
354 if (!affected_materialized.empty())
355 {
356 stages.emplace_back(context);
357 for (const auto & column : columns_desc)
358 {
359 if (column.default_desc.kind == ColumnDefaultKind::Materialized)
360 {
361 stages.back().column_to_updated.emplace(
362 column.name,
363 column.default_desc.expression->clone());
364 }
365 }
366 }
367 }
368 else if (command.type == MutationCommand::MATERIALIZE_INDEX)
369 {
370 auto it = std::find_if(
371 std::cbegin(indices_desc.indices), std::end(indices_desc.indices),
372 [&](const std::shared_ptr<ASTIndexDeclaration> & index)
373 {
374 return index->name == command.index_name;
375 });
376 if (it == std::cend(indices_desc.indices))
377 throw Exception("Unknown index: " + command.index_name, ErrorCodes::BAD_ARGUMENTS);
378
379 auto query = (*it)->expr->clone();
380 auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
381 const auto required_columns = syntax_result->requiredSourceColumns();
382 affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
383 }
384 else
385 throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
386 }
387
388 /// We cares about affected indices because we also need to rewrite them
389 /// when one of index columns updated or filtered with delete
390 if (!affected_indices_columns.empty())
391 {
392 if (!stages.empty())
393 {
394 std::vector<Stage> stages_copy;
395 /// Copy all filled stages except index calculation stage.
396 for (const auto & stage : stages)
397 {
398 stages_copy.emplace_back(context);
399 stages_copy.back().column_to_updated = stage.column_to_updated;
400 stages_copy.back().output_columns = stage.output_columns;
401 stages_copy.back().filters = stage.filters;
402 }
403
404 const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
405 InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
406
407 auto first_stage_header = interpreter.getSampleBlock();
408 auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
409 updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
410 }
411 /// Special step to recalculate affected indices.
412 stages.emplace_back(context);
413 for (const auto & column : affected_indices_columns)
414 stages.back().column_to_updated.emplace(
415 column, std::make_shared<ASTIdentifier>(column));
416 }
417
418 is_prepared = true;
419
420 return prepareInterpreterSelectQuery(stages, dry_run);
421}
422
423ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
424{
425 NamesAndTypesList all_columns = storage->getColumns().getAllPhysical();
426
427 /// Next, for each stage calculate columns changed by this and previous stages.
428 for (size_t i = 0; i < prepared_stages.size(); ++i)
429 {
430 if (!prepared_stages[i].filters.empty())
431 {
432 for (const auto & column : all_columns)
433 prepared_stages[i].output_columns.insert(column.name);
434 continue;
435 }
436
437 if (i > 0)
438 prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
439
440 if (prepared_stages[i].output_columns.size() < all_columns.size())
441 {
442 for (const auto & kv : prepared_stages[i].column_to_updated)
443 prepared_stages[i].output_columns.insert(kv.first);
444 }
445 }
446
447 /// Now, calculate `expressions_chain` for each stage except the first.
448 /// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
449 for (size_t i = prepared_stages.size() - 1; i > 0; --i)
450 {
451 auto & stage = prepared_stages[i];
452
453 ASTPtr all_asts = std::make_shared<ASTExpressionList>();
454
455 for (const auto & ast : stage.filters)
456 all_asts->children.push_back(ast);
457
458 for (const auto & kv : stage.column_to_updated)
459 all_asts->children.push_back(kv.second);
460
461 /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
462 for (const String & column : stage.output_columns)
463 all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
464
465 auto syntax_result = SyntaxAnalyzer(context).analyze(all_asts, all_columns);
466 stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context);
467
468 ExpressionActionsChain & actions_chain = stage.expressions_chain;
469
470 for (const auto & ast : stage.filters)
471 {
472 if (!actions_chain.steps.empty())
473 actions_chain.addStep();
474 stage.analyzer->appendExpression(actions_chain, ast, dry_run);
475 stage.filter_column_names.push_back(ast->getColumnName());
476 }
477
478 if (!stage.column_to_updated.empty())
479 {
480 if (!actions_chain.steps.empty())
481 actions_chain.addStep();
482
483 for (const auto & kv : stage.column_to_updated)
484 stage.analyzer->appendExpression(actions_chain, kv.second, dry_run);
485
486 for (const auto & kv : stage.column_to_updated)
487 {
488 actions_chain.getLastActions()->add(ExpressionAction::copyColumn(
489 kv.second->getColumnName(), kv.first, /* can_replace = */ true));
490 }
491 }
492
493 /// Remove all intermediate columns.
494 actions_chain.addStep();
495 actions_chain.getLastStep().required_output.assign(stage.output_columns.begin(), stage.output_columns.end());
496
497 actions_chain.finalize();
498
499 /// Propagate information about columns needed as input.
500 for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes())
501 prepared_stages[i - 1].output_columns.insert(column.name);
502 }
503
504 /// Execute first stage as a SELECT statement.
505
506 auto select = std::make_shared<ASTSelectQuery>();
507
508 select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
509 for (const auto & column_name : prepared_stages[0].output_columns)
510 select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
511
512 if (!prepared_stages[0].filters.empty())
513 {
514 ASTPtr where_expression;
515 if (prepared_stages[0].filters.size() == 1)
516 where_expression = prepared_stages[0].filters[0];
517 else
518 {
519 auto coalesced_predicates = std::make_shared<ASTFunction>();
520 coalesced_predicates->name = "and";
521 coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
522 coalesced_predicates->children.push_back(coalesced_predicates->arguments);
523 coalesced_predicates->arguments->children = prepared_stages[0].filters;
524 where_expression = std::move(coalesced_predicates);
525 }
526 select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
527 }
528
529 return select;
530}
531
532BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const
533{
534 for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage)
535 {
536 const Stage & stage = prepared_stages[i_stage];
537
538 for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
539 {
540 const auto & step = stage.expressions_chain.steps[i];
541 if (i < stage.filter_column_names.size())
542 {
543 /// Execute DELETEs.
544 in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]);
545 }
546 else
547 {
548 /// Execute UPDATE or final projection.
549 in = std::make_shared<ExpressionBlockInputStream>(in, step.actions);
550 }
551 }
552
553 const SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets();
554 if (!subqueries_for_sets.empty())
555 in = std::make_shared<CreatingSetsBlockInputStream>(in, subqueries_for_sets, context);
556 }
557
558 in = std::make_shared<MaterializingBlockInputStream>(in);
559
560 return in;
561}
562
563void MutationsInterpreter::validate(TableStructureReadLockHolder &)
564{
565 /// For Replicated* storages mutations cannot employ non-deterministic functions
566 /// because that produces inconsistencies between replicas
567 if (startsWith(storage->getName(), "Replicated"))
568 {
569 for (const auto & command : commands)
570 {
571 const auto nondeterministic_func_name = findFirstNonDeterministicFuncName(command, context);
572 if (nondeterministic_func_name)
573 throw Exception(
574 "ALTER UPDATE/ALTER DELETE statements must use only deterministic functions! "
575 "Function '" + *nondeterministic_func_name + "' is non-deterministic",
576 ErrorCodes::BAD_ARGUMENTS);
577 }
578 }
579
580 /// Do not use getSampleBlock in order to check the whole pipeline.
581 Block first_stage_header = select_interpreter->execute().in->getHeader();
582 BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
583 addStreamsForLaterStages(stages, in)->getHeader();
584}
585
586BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &)
587{
588 if (!can_execute)
589 throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
590
591 BlockInputStreamPtr in = select_interpreter->execute().in;
592 auto result_stream = addStreamsForLaterStages(stages, in);
593 if (!updated_header)
594 updated_header = std::make_unique<Block>(result_stream->getHeader());
595 return result_stream;
596}
597
598const Block & MutationsInterpreter::getUpdatedHeader() const
599{
600 return *updated_header;
601}
602
603
604size_t MutationsInterpreter::evaluateCommandsSize()
605{
606 for (const MutationCommand & command : commands)
607 if (unlikely(!command.predicate)) /// The command touches all rows.
608 return mutation_ast->size();
609
610 return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size());
611}
612
613}
614