1 | #include "MutationsInterpreter.h" |
2 | |
3 | #include <Functions/FunctionFactory.h> |
4 | #include <Functions/IFunction.h> |
5 | #include <Interpreters/InDepthNodeVisitor.h> |
6 | #include <Interpreters/InterpreterSelectQuery.h> |
7 | #include <Interpreters/MutationsInterpreter.h> |
8 | #include <Interpreters/SyntaxAnalyzer.h> |
9 | #include <Storages/MergeTree/MergeTreeData.h> |
10 | #include <DataStreams/FilterBlockInputStream.h> |
11 | #include <DataStreams/ExpressionBlockInputStream.h> |
12 | #include <DataStreams/CreatingSetsBlockInputStream.h> |
13 | #include <DataStreams/MaterializingBlockInputStream.h> |
14 | #include <DataStreams/NullBlockInputStream.h> |
15 | #include <Parsers/ASTIdentifier.h> |
16 | #include <Parsers/ASTFunction.h> |
17 | #include <Parsers/ASTLiteral.h> |
18 | #include <Parsers/ASTExpressionList.h> |
19 | #include <Parsers/ASTSelectQuery.h> |
20 | #include <Parsers/formatAST.h> |
21 | #include <IO/WriteHelpers.h> |
22 | |
23 | |
24 | namespace DB |
25 | { |
26 | |
27 | namespace ErrorCodes |
28 | { |
29 | extern const int UNKNOWN_MUTATION_COMMAND; |
30 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
31 | extern const int CANNOT_UPDATE_COLUMN; |
32 | } |
33 | |
34 | namespace |
35 | { |
36 | struct FirstNonDeterministicFuncData |
37 | { |
38 | using TypeToVisit = ASTFunction; |
39 | |
40 | explicit FirstNonDeterministicFuncData(const Context & context_) |
41 | : context{context_} |
42 | {} |
43 | |
44 | const Context & context; |
45 | std::optional<String> nondeterministic_function_name; |
46 | |
47 | void visit(ASTFunction & function, ASTPtr &) |
48 | { |
49 | if (nondeterministic_function_name) |
50 | return; |
51 | |
52 | const auto func = FunctionFactory::instance().get(function.name, context); |
53 | if (!func->isDeterministic()) |
54 | nondeterministic_function_name = func->getName(); |
55 | } |
56 | }; |
57 | |
58 | using FirstNonDeterministicFuncFinder = |
59 | InDepthNodeVisitor<OneTypeMatcher<FirstNonDeterministicFuncData>, true>; |
60 | |
61 | std::optional<String> findFirstNonDeterministicFuncName(const MutationCommand & command, const Context & context) |
62 | { |
63 | FirstNonDeterministicFuncData finder_data(context); |
64 | |
65 | switch (command.type) |
66 | { |
67 | case MutationCommand::UPDATE: |
68 | { |
69 | auto update_assignments_ast = command.ast->as<const ASTAlterCommand &>().update_assignments->clone(); |
70 | FirstNonDeterministicFuncFinder(finder_data).visit(update_assignments_ast); |
71 | |
72 | if (finder_data.nondeterministic_function_name) |
73 | return finder_data.nondeterministic_function_name; |
74 | |
75 | [[fallthrough]]; |
76 | } |
77 | |
78 | case MutationCommand::DELETE: |
79 | { |
80 | auto predicate_ast = command.predicate->clone(); |
81 | FirstNonDeterministicFuncFinder(finder_data).visit(predicate_ast); |
82 | |
83 | return finder_data.nondeterministic_function_name; |
84 | } |
85 | |
86 | default: |
87 | break; |
88 | } |
89 | |
90 | return {}; |
91 | } |
92 | |
93 | ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands) |
94 | { |
95 | /// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query. |
96 | /// The result can differ from tne number of affected rows (e.g. if there is an UPDATE command that |
97 | /// changes how many rows satisfy the predicates of the subsequent commands). |
98 | /// But we can be sure that if count = 0, then no rows will be touched. |
99 | |
100 | auto select = std::make_shared<ASTSelectQuery>(); |
101 | |
102 | select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>()); |
103 | auto count_func = std::make_shared<ASTFunction>(); |
104 | count_func->name = "count" ; |
105 | count_func->arguments = std::make_shared<ASTExpressionList>(); |
106 | select->select()->children.push_back(count_func); |
107 | |
108 | if (commands.size() == 1) |
109 | select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone()); |
110 | else |
111 | { |
112 | auto coalesced_predicates = std::make_shared<ASTFunction>(); |
113 | coalesced_predicates->name = "or" ; |
114 | coalesced_predicates->arguments = std::make_shared<ASTExpressionList>(); |
115 | coalesced_predicates->children.push_back(coalesced_predicates->arguments); |
116 | |
117 | for (const MutationCommand & command : commands) |
118 | coalesced_predicates->arguments->children.push_back(command.predicate->clone()); |
119 | |
120 | select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates)); |
121 | } |
122 | |
123 | return select; |
124 | } |
125 | |
126 | }; |
127 | |
128 | bool isStorageTouchedByMutations( |
129 | StoragePtr storage, |
130 | const std::vector<MutationCommand> & commands, |
131 | Context context_copy) |
132 | { |
133 | if (commands.empty()) |
134 | return false; |
135 | |
136 | for (const MutationCommand & command : commands) |
137 | { |
138 | if (!command.predicate) /// The command touches all rows. |
139 | return true; |
140 | } |
141 | |
142 | context_copy.getSettingsRef().max_streams_to_max_threads_ratio = 1; |
143 | context_copy.getSettingsRef().max_threads = 1; |
144 | |
145 | ASTPtr select_query = prepareQueryAffectedAST(commands); |
146 | |
147 | /// Interpreter must be alive, when we use result of execute() method. |
148 | /// For some reason it may copy context and and give it into ExpressionBlockInputStream |
149 | /// after that we will use context from destroyed stack frame in our stream. |
150 | InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits()); |
151 | BlockInputStreamPtr in = interpreter.execute().in; |
152 | |
153 | Block block = in->read(); |
154 | if (!block.rows()) |
155 | return false; |
156 | else if (block.rows() != 1) |
157 | throw Exception("count() expression returned " + toString(block.rows()) + " rows, not 1" , |
158 | ErrorCodes::LOGICAL_ERROR); |
159 | |
160 | auto count = (*block.getByName("count()" ).column)[0].get<UInt64>(); |
161 | return count != 0; |
162 | |
163 | } |
164 | |
165 | MutationsInterpreter::MutationsInterpreter( |
166 | StoragePtr storage_, |
167 | std::vector<MutationCommand> commands_, |
168 | const Context & context_, |
169 | bool can_execute_) |
170 | : storage(std::move(storage_)) |
171 | , commands(std::move(commands_)) |
172 | , context(context_) |
173 | , can_execute(can_execute_) |
174 | { |
175 | mutation_ast = prepare(!can_execute); |
176 | SelectQueryOptions limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits(); |
177 | select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits); |
178 | } |
179 | |
180 | static NameSet getKeyColumns(const StoragePtr & storage) |
181 | { |
182 | const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()); |
183 | if (!merge_tree_data) |
184 | return {}; |
185 | |
186 | NameSet key_columns; |
187 | |
188 | if (merge_tree_data->partition_key_expr) |
189 | for (const String & col : merge_tree_data->partition_key_expr->getRequiredColumns()) |
190 | key_columns.insert(col); |
191 | |
192 | auto sorting_key_expr = merge_tree_data->sorting_key_expr; |
193 | if (sorting_key_expr) |
194 | for (const String & col : sorting_key_expr->getRequiredColumns()) |
195 | key_columns.insert(col); |
196 | /// We don't process sample_by_ast separately because it must be among the primary key columns. |
197 | |
198 | if (!merge_tree_data->merging_params.sign_column.empty()) |
199 | key_columns.insert(merge_tree_data->merging_params.sign_column); |
200 | |
201 | if (!merge_tree_data->merging_params.version_column.empty()) |
202 | key_columns.insert(merge_tree_data->merging_params.version_column); |
203 | |
204 | return key_columns; |
205 | } |
206 | |
207 | static void validateUpdateColumns( |
208 | const StoragePtr & storage, const NameSet & updated_columns, |
209 | const std::unordered_map<String, Names> & column_to_affected_materialized) |
210 | { |
211 | NameSet key_columns = getKeyColumns(storage); |
212 | |
213 | for (const String & column_name : updated_columns) |
214 | { |
215 | auto found = false; |
216 | for (const auto & col : storage->getColumns().getOrdinary()) |
217 | { |
218 | if (col.name == column_name) |
219 | { |
220 | found = true; |
221 | break; |
222 | } |
223 | } |
224 | |
225 | if (!found) |
226 | { |
227 | for (const auto & col : storage->getColumns().getMaterialized()) |
228 | { |
229 | if (col.name == column_name) |
230 | throw Exception("Cannot UPDATE materialized column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN); |
231 | } |
232 | |
233 | throw Exception("There is no column " + backQuote(column_name) + " in table" , ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); |
234 | } |
235 | |
236 | if (key_columns.count(column_name)) |
237 | throw Exception("Cannot UPDATE key column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN); |
238 | |
239 | auto materialized_it = column_to_affected_materialized.find(column_name); |
240 | if (materialized_it != column_to_affected_materialized.end()) |
241 | { |
242 | for (const String & materialized : materialized_it->second) |
243 | { |
244 | if (key_columns.count(materialized)) |
245 | throw Exception("Updated column " + backQuote(column_name) + " affects MATERIALIZED column " |
246 | + backQuote(materialized) + ", which is a key column. Cannot UPDATE it." , |
247 | ErrorCodes::CANNOT_UPDATE_COLUMN); |
248 | } |
249 | } |
250 | } |
251 | } |
252 | |
253 | |
254 | ASTPtr MutationsInterpreter::prepare(bool dry_run) |
255 | { |
256 | if (is_prepared) |
257 | throw Exception("MutationsInterpreter is already prepared. It is a bug." , ErrorCodes::LOGICAL_ERROR); |
258 | |
259 | if (commands.empty()) |
260 | throw Exception("Empty mutation commands list" , ErrorCodes::LOGICAL_ERROR); |
261 | |
262 | const ColumnsDescription & columns_desc = storage->getColumns(); |
263 | const IndicesDescription & indices_desc = storage->getIndices(); |
264 | NamesAndTypesList all_columns = columns_desc.getAllPhysical(); |
265 | |
266 | NameSet updated_columns; |
267 | for (const MutationCommand & command : commands) |
268 | { |
269 | for (const auto & kv : command.column_to_update_expression) |
270 | updated_columns.insert(kv.first); |
271 | } |
272 | |
273 | /// We need to know which columns affect which MATERIALIZED columns and data skipping indices |
274 | /// to recalculate them if dependencies are updated. |
275 | std::unordered_map<String, Names> column_to_affected_materialized; |
276 | NameSet affected_indices_columns; |
277 | if (!updated_columns.empty()) |
278 | { |
279 | for (const auto & column : columns_desc) |
280 | { |
281 | if (column.default_desc.kind == ColumnDefaultKind::Materialized) |
282 | { |
283 | auto query = column.default_desc.expression->clone(); |
284 | auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); |
285 | for (const String & dependency : syntax_result->requiredSourceColumns()) |
286 | { |
287 | if (updated_columns.count(dependency)) |
288 | column_to_affected_materialized[dependency].push_back(column.name); |
289 | } |
290 | } |
291 | } |
292 | for (const auto & index : indices_desc.indices) |
293 | { |
294 | auto query = index->expr->clone(); |
295 | auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); |
296 | const auto required_columns = syntax_result->requiredSourceColumns(); |
297 | |
298 | for (const String & dependency : required_columns) |
299 | { |
300 | if (updated_columns.count(dependency)) |
301 | { |
302 | affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns)); |
303 | break; |
304 | } |
305 | } |
306 | } |
307 | |
308 | validateUpdateColumns(storage, updated_columns, column_to_affected_materialized); |
309 | } |
310 | |
311 | /// First, break a sequence of commands into stages. |
312 | for (const auto & command : commands) |
313 | { |
314 | if (command.type == MutationCommand::DELETE) |
315 | { |
316 | if (stages.empty() || !stages.back().column_to_updated.empty()) |
317 | stages.emplace_back(context); |
318 | |
319 | auto negated_predicate = makeASTFunction("not" , command.predicate->clone()); |
320 | stages.back().filters.push_back(negated_predicate); |
321 | } |
322 | else if (command.type == MutationCommand::UPDATE) |
323 | { |
324 | if (stages.empty() || !stages.back().column_to_updated.empty()) |
325 | stages.emplace_back(context); |
326 | if (stages.size() == 1) /// First stage only supports filtering and can't update columns. |
327 | stages.emplace_back(context); |
328 | |
329 | NameSet affected_materialized; |
330 | |
331 | for (const auto & kv : command.column_to_update_expression) |
332 | { |
333 | const String & column = kv.first; |
334 | |
335 | auto materialized_it = column_to_affected_materialized.find(column); |
336 | if (materialized_it != column_to_affected_materialized.end()) |
337 | { |
338 | for (const String & mat_column : materialized_it->second) |
339 | affected_materialized.emplace(mat_column); |
340 | } |
341 | |
342 | /// Just to be sure, that we don't change type |
343 | /// after update expression execution. |
344 | const auto & update_expr = kv.second; |
345 | auto updated_column = makeASTFunction("CAST" , |
346 | makeASTFunction("if" , |
347 | command.predicate->clone(), |
348 | update_expr->clone(), |
349 | std::make_shared<ASTIdentifier>(column)), |
350 | std::make_shared<ASTLiteral>(columns_desc.getPhysical(column).type->getName())); |
351 | stages.back().column_to_updated.emplace(column, updated_column); |
352 | } |
353 | |
354 | if (!affected_materialized.empty()) |
355 | { |
356 | stages.emplace_back(context); |
357 | for (const auto & column : columns_desc) |
358 | { |
359 | if (column.default_desc.kind == ColumnDefaultKind::Materialized) |
360 | { |
361 | stages.back().column_to_updated.emplace( |
362 | column.name, |
363 | column.default_desc.expression->clone()); |
364 | } |
365 | } |
366 | } |
367 | } |
368 | else if (command.type == MutationCommand::MATERIALIZE_INDEX) |
369 | { |
370 | auto it = std::find_if( |
371 | std::cbegin(indices_desc.indices), std::end(indices_desc.indices), |
372 | [&](const std::shared_ptr<ASTIndexDeclaration> & index) |
373 | { |
374 | return index->name == command.index_name; |
375 | }); |
376 | if (it == std::cend(indices_desc.indices)) |
377 | throw Exception("Unknown index: " + command.index_name, ErrorCodes::BAD_ARGUMENTS); |
378 | |
379 | auto query = (*it)->expr->clone(); |
380 | auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); |
381 | const auto required_columns = syntax_result->requiredSourceColumns(); |
382 | affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns)); |
383 | } |
384 | else |
385 | throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); |
386 | } |
387 | |
388 | /// We cares about affected indices because we also need to rewrite them |
389 | /// when one of index columns updated or filtered with delete |
390 | if (!affected_indices_columns.empty()) |
391 | { |
392 | if (!stages.empty()) |
393 | { |
394 | std::vector<Stage> stages_copy; |
395 | /// Copy all filled stages except index calculation stage. |
396 | for (const auto & stage : stages) |
397 | { |
398 | stages_copy.emplace_back(context); |
399 | stages_copy.back().column_to_updated = stage.column_to_updated; |
400 | stages_copy.back().output_columns = stage.output_columns; |
401 | stages_copy.back().filters = stage.filters; |
402 | } |
403 | |
404 | const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true); |
405 | InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()}; |
406 | |
407 | auto = interpreter.getSampleBlock(); |
408 | auto in = std::make_shared<NullBlockInputStream>(first_stage_header); |
409 | updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader()); |
410 | } |
411 | /// Special step to recalculate affected indices. |
412 | stages.emplace_back(context); |
413 | for (const auto & column : affected_indices_columns) |
414 | stages.back().column_to_updated.emplace( |
415 | column, std::make_shared<ASTIdentifier>(column)); |
416 | } |
417 | |
418 | is_prepared = true; |
419 | |
420 | return prepareInterpreterSelectQuery(stages, dry_run); |
421 | } |
422 | |
423 | ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run) |
424 | { |
425 | NamesAndTypesList all_columns = storage->getColumns().getAllPhysical(); |
426 | |
427 | /// Next, for each stage calculate columns changed by this and previous stages. |
428 | for (size_t i = 0; i < prepared_stages.size(); ++i) |
429 | { |
430 | if (!prepared_stages[i].filters.empty()) |
431 | { |
432 | for (const auto & column : all_columns) |
433 | prepared_stages[i].output_columns.insert(column.name); |
434 | continue; |
435 | } |
436 | |
437 | if (i > 0) |
438 | prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; |
439 | |
440 | if (prepared_stages[i].output_columns.size() < all_columns.size()) |
441 | { |
442 | for (const auto & kv : prepared_stages[i].column_to_updated) |
443 | prepared_stages[i].output_columns.insert(kv.first); |
444 | } |
445 | } |
446 | |
447 | /// Now, calculate `expressions_chain` for each stage except the first. |
448 | /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. |
449 | for (size_t i = prepared_stages.size() - 1; i > 0; --i) |
450 | { |
451 | auto & stage = prepared_stages[i]; |
452 | |
453 | ASTPtr all_asts = std::make_shared<ASTExpressionList>(); |
454 | |
455 | for (const auto & ast : stage.filters) |
456 | all_asts->children.push_back(ast); |
457 | |
458 | for (const auto & kv : stage.column_to_updated) |
459 | all_asts->children.push_back(kv.second); |
460 | |
461 | /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. |
462 | for (const String & column : stage.output_columns) |
463 | all_asts->children.push_back(std::make_shared<ASTIdentifier>(column)); |
464 | |
465 | auto syntax_result = SyntaxAnalyzer(context).analyze(all_asts, all_columns); |
466 | stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context); |
467 | |
468 | ExpressionActionsChain & actions_chain = stage.expressions_chain; |
469 | |
470 | for (const auto & ast : stage.filters) |
471 | { |
472 | if (!actions_chain.steps.empty()) |
473 | actions_chain.addStep(); |
474 | stage.analyzer->appendExpression(actions_chain, ast, dry_run); |
475 | stage.filter_column_names.push_back(ast->getColumnName()); |
476 | } |
477 | |
478 | if (!stage.column_to_updated.empty()) |
479 | { |
480 | if (!actions_chain.steps.empty()) |
481 | actions_chain.addStep(); |
482 | |
483 | for (const auto & kv : stage.column_to_updated) |
484 | stage.analyzer->appendExpression(actions_chain, kv.second, dry_run); |
485 | |
486 | for (const auto & kv : stage.column_to_updated) |
487 | { |
488 | actions_chain.getLastActions()->add(ExpressionAction::copyColumn( |
489 | kv.second->getColumnName(), kv.first, /* can_replace = */ true)); |
490 | } |
491 | } |
492 | |
493 | /// Remove all intermediate columns. |
494 | actions_chain.addStep(); |
495 | actions_chain.getLastStep().required_output.assign(stage.output_columns.begin(), stage.output_columns.end()); |
496 | |
497 | actions_chain.finalize(); |
498 | |
499 | /// Propagate information about columns needed as input. |
500 | for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes()) |
501 | prepared_stages[i - 1].output_columns.insert(column.name); |
502 | } |
503 | |
504 | /// Execute first stage as a SELECT statement. |
505 | |
506 | auto select = std::make_shared<ASTSelectQuery>(); |
507 | |
508 | select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>()); |
509 | for (const auto & column_name : prepared_stages[0].output_columns) |
510 | select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name)); |
511 | |
512 | if (!prepared_stages[0].filters.empty()) |
513 | { |
514 | ASTPtr where_expression; |
515 | if (prepared_stages[0].filters.size() == 1) |
516 | where_expression = prepared_stages[0].filters[0]; |
517 | else |
518 | { |
519 | auto coalesced_predicates = std::make_shared<ASTFunction>(); |
520 | coalesced_predicates->name = "and" ; |
521 | coalesced_predicates->arguments = std::make_shared<ASTExpressionList>(); |
522 | coalesced_predicates->children.push_back(coalesced_predicates->arguments); |
523 | coalesced_predicates->arguments->children = prepared_stages[0].filters; |
524 | where_expression = std::move(coalesced_predicates); |
525 | } |
526 | select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); |
527 | } |
528 | |
529 | return select; |
530 | } |
531 | |
532 | BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const |
533 | { |
534 | for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage) |
535 | { |
536 | const Stage & stage = prepared_stages[i_stage]; |
537 | |
538 | for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) |
539 | { |
540 | const auto & step = stage.expressions_chain.steps[i]; |
541 | if (i < stage.filter_column_names.size()) |
542 | { |
543 | /// Execute DELETEs. |
544 | in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]); |
545 | } |
546 | else |
547 | { |
548 | /// Execute UPDATE or final projection. |
549 | in = std::make_shared<ExpressionBlockInputStream>(in, step.actions); |
550 | } |
551 | } |
552 | |
553 | const SubqueriesForSets & subqueries_for_sets = stage.analyzer->getSubqueriesForSets(); |
554 | if (!subqueries_for_sets.empty()) |
555 | in = std::make_shared<CreatingSetsBlockInputStream>(in, subqueries_for_sets, context); |
556 | } |
557 | |
558 | in = std::make_shared<MaterializingBlockInputStream>(in); |
559 | |
560 | return in; |
561 | } |
562 | |
563 | void MutationsInterpreter::validate(TableStructureReadLockHolder &) |
564 | { |
565 | /// For Replicated* storages mutations cannot employ non-deterministic functions |
566 | /// because that produces inconsistencies between replicas |
567 | if (startsWith(storage->getName(), "Replicated" )) |
568 | { |
569 | for (const auto & command : commands) |
570 | { |
571 | const auto nondeterministic_func_name = findFirstNonDeterministicFuncName(command, context); |
572 | if (nondeterministic_func_name) |
573 | throw Exception( |
574 | "ALTER UPDATE/ALTER DELETE statements must use only deterministic functions! " |
575 | "Function '" + *nondeterministic_func_name + "' is non-deterministic" , |
576 | ErrorCodes::BAD_ARGUMENTS); |
577 | } |
578 | } |
579 | |
580 | /// Do not use getSampleBlock in order to check the whole pipeline. |
581 | Block = select_interpreter->execute().in->getHeader(); |
582 | BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header); |
583 | addStreamsForLaterStages(stages, in)->getHeader(); |
584 | } |
585 | |
586 | BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &) |
587 | { |
588 | if (!can_execute) |
589 | throw Exception("Cannot execute mutations interpreter because can_execute flag set to false" , ErrorCodes::LOGICAL_ERROR); |
590 | |
591 | BlockInputStreamPtr in = select_interpreter->execute().in; |
592 | auto result_stream = addStreamsForLaterStages(stages, in); |
593 | if (!updated_header) |
594 | updated_header = std::make_unique<Block>(result_stream->getHeader()); |
595 | return result_stream; |
596 | } |
597 | |
598 | const Block & MutationsInterpreter::() const |
599 | { |
600 | return *updated_header; |
601 | } |
602 | |
603 | |
604 | size_t MutationsInterpreter::evaluateCommandsSize() |
605 | { |
606 | for (const MutationCommand & command : commands) |
607 | if (unlikely(!command.predicate)) /// The command touches all rows. |
608 | return mutation_ast->size(); |
609 | |
610 | return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size()); |
611 | } |
612 | |
613 | } |
614 | |