1#pragma once
2
3#include <Common/Exception.h>
4#include <Common/ZooKeeper/Types.h>
5#include <Core/Types.h>
6#include <IO/WriteHelpers.h>
7
8#include <mutex>
9#include <condition_variable>
10
11
12namespace DB
13{
14
15class ReadBuffer;
16class WriteBuffer;
17class ReplicatedMergeTreeQueue;
18
19namespace ErrorCodes
20{
21 extern const int LOGICAL_ERROR;
22 extern const int UNKNOWN_FORMAT_VERSION;
23 extern const int CANNOT_PARSE_TEXT;
24}
25
26
27/// Record about what needs to be done. Only data (you can copy them).
28struct ReplicatedMergeTreeLogEntryData
29{
30 enum Type
31 {
32 EMPTY, /// Not used.
33 GET_PART, /// Get the part from another replica.
34 MERGE_PARTS, /// Merge the parts.
35 DROP_RANGE, /// Delete the parts in the specified partition in the specified number range.
36 CLEAR_COLUMN, /// Drop specific column from specified partition.
37 CLEAR_INDEX, /// Drop specific index from specified partition.
38 REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones
39 MUTATE_PART, /// Apply one or several mutations to the part.
40 };
41
42 static String typeToString(Type type)
43 {
44 switch (type)
45 {
46 case ReplicatedMergeTreeLogEntryData::GET_PART: return "GET_PART";
47 case ReplicatedMergeTreeLogEntryData::MERGE_PARTS: return "MERGE_PARTS";
48 case ReplicatedMergeTreeLogEntryData::DROP_RANGE: return "DROP_RANGE";
49 case ReplicatedMergeTreeLogEntryData::CLEAR_COLUMN: return "CLEAR_COLUMN";
50 case ReplicatedMergeTreeLogEntryData::CLEAR_INDEX: return "CLEAR_INDEX";
51 case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE";
52 case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART";
53 default:
54 throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
55 }
56 }
57
58 String typeToString() const
59 {
60 return typeToString(type);
61 }
62
63 void writeText(WriteBuffer & out) const;
64 void readText(ReadBuffer & in);
65 String toString() const;
66
67 String znode_name;
68
69 Type type = EMPTY;
70 String source_replica; /// Empty string means that this entry was added to the queue immediately, and not copied from the log.
71
72 /// The name of resulting part for GET_PART and MERGE_PARTS
73 /// Part range for DROP_RANGE and CLEAR_COLUMN
74 String new_part_name;
75 String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/).
76 mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'.
77
78 Strings source_parts;
79 bool deduplicate = false; /// Do deduplicate on merge
80 String column_name;
81 String index_name;
82
83 /// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
84 /// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
85 bool force_ttl = false;
86
87 /// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory.
88 bool detach = false;
89
90 /// REPLACE PARTITION FROM command
91 struct ReplaceRangeEntry
92 {
93 String drop_range_part_name;
94
95 String from_database;
96 String from_table;
97 Strings src_part_names; // as in from_table
98 Strings new_part_names;
99 Strings part_names_checksums;
100 int columns_version;
101
102 void writeText(WriteBuffer & out) const;
103 void readText(ReadBuffer & in);
104 };
105
106 std::shared_ptr<ReplaceRangeEntry> replace_range_entry;
107
108 /// Returns a set of parts that will appear after executing the entry + parts to block
109 /// selection of merges. These parts are added to queue.virtual_parts.
110 Strings getVirtualPartNames() const
111 {
112 /// DROP_RANGE does not add a real part, but we must disable merges in that range
113 if (type == DROP_RANGE)
114 return {new_part_name};
115
116 /// Return {} because selection of merges in the partition where the column is cleared
117 /// should not be blocked (only execution of merges should be blocked).
118 if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
119 return {};
120
121 if (type == REPLACE_RANGE)
122 {
123 Strings res = replace_range_entry->new_part_names;
124 res.emplace_back(replace_range_entry->drop_range_part_name);
125 return res;
126 }
127
128 return {new_part_name};
129 }
130
131 /// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
132 /// These parts are added to future_parts.
133 Strings getBlockingPartNames() const
134 {
135 Strings res = getVirtualPartNames();
136
137 if (type == CLEAR_COLUMN)
138 res.emplace_back(new_part_name);
139
140 return res;
141 }
142
143 /// Access under queue_mutex, see ReplicatedMergeTreeQueue.
144 bool currently_executing = false; /// Whether the action is executing now.
145 /// These several fields are informational only (for viewing by the user using system tables).
146 /// Access under queue_mutex, see ReplicatedMergeTreeQueue.
147 size_t num_tries = 0; /// The number of attempts to perform the action (since the server started, including the running one).
148 std::exception_ptr exception; /// The last exception, in the case of an unsuccessful attempt to perform the action.
149 time_t last_attempt_time = 0; /// The time at which the last attempt was attempted to complete the action.
150 size_t num_postponed = 0; /// The number of times the action was postponed.
151 String postpone_reason; /// The reason why the action was postponed, if it was postponed.
152 time_t last_postpone_time = 0; /// The time of the last time the action was postponed.
153
154 /// Creation time or the time to copy from the general log to the queue of a particular replica.
155 time_t create_time = 0;
156
157 /// The quorum value (for GET_PART) is a non-zero value when the quorum write is enabled.
158 size_t quorum = 0;
159};
160
161
162struct ReplicatedMergeTreeLogEntry : public ReplicatedMergeTreeLogEntryData, std::enable_shared_from_this<ReplicatedMergeTreeLogEntry>
163{
164 using Ptr = std::shared_ptr<ReplicatedMergeTreeLogEntry>;
165
166 std::condition_variable execution_complete; /// Awake when currently_executing becomes false.
167
168 static Ptr parse(const String & s, const Coordination::Stat & stat);
169};
170
171using ReplicatedMergeTreeLogEntryPtr = std::shared_ptr<ReplicatedMergeTreeLogEntry>;
172
173
174}
175