1 | #pragma once |
2 | |
3 | #include <Common/Exception.h> |
4 | #include <Common/ZooKeeper/Types.h> |
5 | #include <Core/Types.h> |
6 | #include <IO/WriteHelpers.h> |
7 | |
8 | #include <mutex> |
9 | #include <condition_variable> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | |
15 | class ReadBuffer; |
16 | class WriteBuffer; |
17 | class ReplicatedMergeTreeQueue; |
18 | |
19 | namespace ErrorCodes |
20 | { |
21 | extern const int LOGICAL_ERROR; |
22 | extern const int UNKNOWN_FORMAT_VERSION; |
23 | extern const int CANNOT_PARSE_TEXT; |
24 | } |
25 | |
26 | |
27 | /// Record about what needs to be done. Only data (you can copy them). |
28 | struct ReplicatedMergeTreeLogEntryData |
29 | { |
30 | enum Type |
31 | { |
32 | EMPTY, /// Not used. |
33 | GET_PART, /// Get the part from another replica. |
34 | MERGE_PARTS, /// Merge the parts. |
35 | DROP_RANGE, /// Delete the parts in the specified partition in the specified number range. |
36 | CLEAR_COLUMN, /// Drop specific column from specified partition. |
37 | CLEAR_INDEX, /// Drop specific index from specified partition. |
38 | REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones |
39 | MUTATE_PART, /// Apply one or several mutations to the part. |
40 | }; |
41 | |
42 | static String typeToString(Type type) |
43 | { |
44 | switch (type) |
45 | { |
46 | case ReplicatedMergeTreeLogEntryData::GET_PART: return "GET_PART" ; |
47 | case ReplicatedMergeTreeLogEntryData::MERGE_PARTS: return "MERGE_PARTS" ; |
48 | case ReplicatedMergeTreeLogEntryData::DROP_RANGE: return "DROP_RANGE" ; |
49 | case ReplicatedMergeTreeLogEntryData::CLEAR_COLUMN: return "CLEAR_COLUMN" ; |
50 | case ReplicatedMergeTreeLogEntryData::CLEAR_INDEX: return "CLEAR_INDEX" ; |
51 | case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE" ; |
52 | case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART" ; |
53 | default: |
54 | throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR); |
55 | } |
56 | } |
57 | |
58 | String typeToString() const |
59 | { |
60 | return typeToString(type); |
61 | } |
62 | |
63 | void writeText(WriteBuffer & out) const; |
64 | void readText(ReadBuffer & in); |
65 | String toString() const; |
66 | |
67 | String znode_name; |
68 | |
69 | Type type = EMPTY; |
70 | String source_replica; /// Empty string means that this entry was added to the queue immediately, and not copied from the log. |
71 | |
72 | /// The name of resulting part for GET_PART and MERGE_PARTS |
73 | /// Part range for DROP_RANGE and CLEAR_COLUMN |
74 | String new_part_name; |
75 | String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/). |
76 | mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'. |
77 | |
78 | Strings source_parts; |
79 | bool deduplicate = false; /// Do deduplicate on merge |
80 | String column_name; |
81 | String index_name; |
82 | |
83 | /// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts |
84 | /// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query. |
85 | bool force_ttl = false; |
86 | |
87 | /// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory. |
88 | bool detach = false; |
89 | |
90 | /// REPLACE PARTITION FROM command |
91 | struct ReplaceRangeEntry |
92 | { |
93 | String drop_range_part_name; |
94 | |
95 | String from_database; |
96 | String from_table; |
97 | Strings src_part_names; // as in from_table |
98 | Strings new_part_names; |
99 | Strings part_names_checksums; |
100 | int columns_version; |
101 | |
102 | void writeText(WriteBuffer & out) const; |
103 | void readText(ReadBuffer & in); |
104 | }; |
105 | |
106 | std::shared_ptr<ReplaceRangeEntry> replace_range_entry; |
107 | |
108 | /// Returns a set of parts that will appear after executing the entry + parts to block |
109 | /// selection of merges. These parts are added to queue.virtual_parts. |
110 | Strings getVirtualPartNames() const |
111 | { |
112 | /// DROP_RANGE does not add a real part, but we must disable merges in that range |
113 | if (type == DROP_RANGE) |
114 | return {new_part_name}; |
115 | |
116 | /// Return {} because selection of merges in the partition where the column is cleared |
117 | /// should not be blocked (only execution of merges should be blocked). |
118 | if (type == CLEAR_COLUMN || type == CLEAR_INDEX) |
119 | return {}; |
120 | |
121 | if (type == REPLACE_RANGE) |
122 | { |
123 | Strings res = replace_range_entry->new_part_names; |
124 | res.emplace_back(replace_range_entry->drop_range_part_name); |
125 | return res; |
126 | } |
127 | |
128 | return {new_part_name}; |
129 | } |
130 | |
131 | /// Returns set of parts that denote the block number ranges that should be blocked during the entry execution. |
132 | /// These parts are added to future_parts. |
133 | Strings getBlockingPartNames() const |
134 | { |
135 | Strings res = getVirtualPartNames(); |
136 | |
137 | if (type == CLEAR_COLUMN) |
138 | res.emplace_back(new_part_name); |
139 | |
140 | return res; |
141 | } |
142 | |
143 | /// Access under queue_mutex, see ReplicatedMergeTreeQueue. |
144 | bool currently_executing = false; /// Whether the action is executing now. |
145 | /// These several fields are informational only (for viewing by the user using system tables). |
146 | /// Access under queue_mutex, see ReplicatedMergeTreeQueue. |
147 | size_t num_tries = 0; /// The number of attempts to perform the action (since the server started, including the running one). |
148 | std::exception_ptr exception; /// The last exception, in the case of an unsuccessful attempt to perform the action. |
149 | time_t last_attempt_time = 0; /// The time at which the last attempt was attempted to complete the action. |
150 | size_t num_postponed = 0; /// The number of times the action was postponed. |
151 | String postpone_reason; /// The reason why the action was postponed, if it was postponed. |
152 | time_t last_postpone_time = 0; /// The time of the last time the action was postponed. |
153 | |
154 | /// Creation time or the time to copy from the general log to the queue of a particular replica. |
155 | time_t create_time = 0; |
156 | |
157 | /// The quorum value (for GET_PART) is a non-zero value when the quorum write is enabled. |
158 | size_t quorum = 0; |
159 | }; |
160 | |
161 | |
162 | struct ReplicatedMergeTreeLogEntry : public ReplicatedMergeTreeLogEntryData, std::enable_shared_from_this<ReplicatedMergeTreeLogEntry> |
163 | { |
164 | using Ptr = std::shared_ptr<ReplicatedMergeTreeLogEntry>; |
165 | |
166 | std::condition_variable execution_complete; /// Awake when currently_executing becomes false. |
167 | |
168 | static Ptr parse(const String & s, const Coordination::Stat & stat); |
169 | }; |
170 | |
171 | using ReplicatedMergeTreeLogEntryPtr = std::shared_ptr<ReplicatedMergeTreeLogEntry>; |
172 | |
173 | |
174 | } |
175 | |