1#pragma once
2
3#include <DataStreams/IBlockInputStream.h>
4#include <Interpreters/ExpressionActions.h>
5#include <Interpreters/ExpressionAnalyzer.h>
6#include <Interpreters/InterpreterSelectQuery.h>
7#include <Storages/IStorage_fwd.h>
8#include <Storages/MutationCommands.h>
9
10
11namespace DB
12{
13
14class Context;
15
16/// Return false if the data isn't going to be changed by mutations.
17bool isStorageTouchedByMutations(StoragePtr storage, const std::vector<MutationCommand> & commands, Context context_copy);
18
19/// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs, MATERIALIZEs)
20/// to this data.
21class MutationsInterpreter
22{
23public:
24 /// Storage to mutate, array of mutations commands and context. If you really want to execute mutation
25 /// use can_execute = true, in other cases (validation, amount of commands) it can be false
26 MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_, bool can_execute_);
27
28 void validate(TableStructureReadLockHolder & table_lock_holder);
29
30 size_t evaluateCommandsSize();
31
32 /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
33 BlockInputStreamPtr execute(TableStructureReadLockHolder & table_lock_holder);
34
35 /// Only changed columns.
36 const Block & getUpdatedHeader() const;
37
38private:
39 ASTPtr prepare(bool dry_run);
40
41 struct Stage;
42
43 ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run);
44 BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const;
45
46 StoragePtr storage;
47 std::vector<MutationCommand> commands;
48 const Context & context;
49 bool can_execute;
50
51 ASTPtr mutation_ast;
52
53 /// We have to store interpreter because it use own copy of context
54 /// and some streams from execute method may use it.
55 std::unique_ptr<InterpreterSelectQuery> select_interpreter;
56
57 /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
58 /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the
59 /// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away
60 /// (they may contain wrong values because the column values have been updated).
61 ///
62 /// If an UPDATE command changes some columns that some MATERIALIZED columns depend on, a stage to
63 /// recalculate these columns is added.
64 ///
65 /// Each stage has output_columns that contain columns that are changed at the end of that stage
66 /// plus columns needed for the next mutations.
67 ///
68 /// First stage is special: it can contain only filters and is executed using InterpreterSelectQuery
69 /// to take advantage of table indexes (if there are any). It's necessary because all mutations have
70 /// `WHERE clause` part.
71
72 struct Stage
73 {
74 Stage(const Context & context_) : expressions_chain(context_) {}
75
76 ASTs filters;
77 std::unordered_map<String, ASTPtr> column_to_updated;
78
79 /// Contains columns that are changed by this stage,
80 /// columns changed by the previous stages and also columns needed by the next stages.
81 NameSet output_columns;
82
83 std::unique_ptr<ExpressionAnalyzer> analyzer;
84
85 /// A chain of actions needed to execute this stage.
86 /// First steps calculate filter columns for DELETEs (in the same order as in `filter_column_names`),
87 /// then there is (possibly) an UPDATE step, and finally a projection step.
88 ExpressionActionsChain expressions_chain;
89 Names filter_column_names;
90 };
91
92 std::unique_ptr<Block> updated_header;
93 std::vector<Stage> stages;
94 bool is_prepared = false; /// Has the sequence of stages been prepared.
95};
96
97}
98