1#include <Common/ZooKeeper/Types.h>
2
3#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
4#include <IO/Operators.h>
5#include <IO/ReadBufferFromString.h>
6#include <IO/WriteBufferFromString.h>
7
8
9namespace DB
10{
11
12
13void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
14{
15 out << "format version: 4\n"
16 << "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
17 << "source replica: " << source_replica << '\n'
18 << "block_id: " << escape << block_id << '\n';
19
20 switch (type)
21 {
22 case GET_PART:
23 out << "get\n" << new_part_name;
24 break;
25
26 case MERGE_PARTS:
27 out << "merge\n";
28 for (const String & s : source_parts)
29 out << s << '\n';
30 out << "into\n" << new_part_name;
31 out << "\ndeduplicate: " << deduplicate;
32 break;
33
34 case DROP_RANGE:
35 if (detach)
36 out << "detach\n";
37 else
38 out << "drop\n";
39 out << new_part_name;
40 break;
41
42 case CLEAR_COLUMN:
43 out << "clear_column\n"
44 << escape << column_name
45 << "\nfrom\n"
46 << new_part_name;
47 break;
48
49 case CLEAR_INDEX:
50 out << "clear_index\n"
51 << escape << index_name
52 << "\nfrom\n"
53 << new_part_name;
54 break;
55
56 case REPLACE_RANGE:
57 out << typeToString(REPLACE_RANGE) << "\n";
58 replace_range_entry->writeText(out);
59 break;
60
61 case MUTATE_PART:
62 out << "mutate\n"
63 << source_parts.at(0) << "\n"
64 << "to\n"
65 << new_part_name;
66 break;
67
68 default:
69 throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
70 }
71
72 out << '\n';
73
74 if (quorum)
75 out << "quorum: " << quorum << '\n';
76}
77
78void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
79{
80 UInt8 format_version = 0;
81 String type_str;
82
83 in >> "format version: " >> format_version >> "\n";
84
85 if (format_version < 1 || format_version > 4)
86 throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
87
88 if (format_version >= 2)
89 {
90 LocalDateTime create_time_dt;
91 in >> "create_time: " >> create_time_dt >> "\n";
92 create_time = create_time_dt;
93 }
94
95 in >> "source replica: " >> source_replica >> "\n";
96
97 if (format_version >= 3)
98 {
99 in >> "block_id: " >> escape >> block_id >> "\n";
100 }
101
102 in >> type_str >> "\n";
103
104 if (type_str == "get")
105 {
106 type = GET_PART;
107 in >> new_part_name;
108 }
109 else if (type_str == "merge")
110 {
111 type = MERGE_PARTS;
112 while (true)
113 {
114 String s;
115 in >> s >> "\n";
116 if (s == "into")
117 break;
118 source_parts.push_back(s);
119 }
120 in >> new_part_name;
121 if (format_version >= 4)
122 in >> "\ndeduplicate: " >> deduplicate;
123 }
124 else if (type_str == "drop" || type_str == "detach")
125 {
126 type = DROP_RANGE;
127 detach = type_str == "detach";
128 in >> new_part_name;
129 }
130 else if (type_str == "clear_column")
131 {
132 type = CLEAR_COLUMN;
133 in >> escape >> column_name >> "\nfrom\n" >> new_part_name;
134 }
135 else if (type_str == "clear_index")
136 {
137 type = CLEAR_INDEX;
138 in >> escape >> index_name >> "\nfrom\n" >> new_part_name;
139 }
140 else if (type_str == typeToString(REPLACE_RANGE))
141 {
142 type = REPLACE_RANGE;
143 replace_range_entry = std::make_shared<ReplaceRangeEntry>();
144 replace_range_entry->readText(in);
145 }
146 else if (type_str == "mutate")
147 {
148 type = MUTATE_PART;
149 String source_part;
150 in >> source_part >> "\n"
151 >> "to\n"
152 >> new_part_name;
153 source_parts.push_back(source_part);
154 }
155
156 in >> "\n";
157
158 /// Optional field.
159 if (!in.eof())
160 in >> "quorum: " >> quorum >> "\n";
161}
162
163void ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::writeText(WriteBuffer & out) const
164{
165 out << "drop_range_name: " << drop_range_part_name << "\n";
166 out << "from_database: " << escape << from_database << "\n";
167 out << "from_table: " << escape << from_table << "\n";
168
169 out << "source_parts: ";
170 writeQuoted(src_part_names, out);
171 out << "\n";
172
173 out << "new_parts: ";
174 writeQuoted(new_part_names, out);
175 out << "\n";
176
177 out << "part_checksums: ";
178 writeQuoted(part_names_checksums, out);
179 out << "\n";
180
181 out << "columns_version: " << columns_version;
182}
183
184void ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::readText(ReadBuffer & in)
185{
186 in >> "drop_range_name: " >> drop_range_part_name >> "\n";
187 in >> "from_database: " >> escape >> from_database >> "\n";
188 in >> "from_table: " >> escape >> from_table >> "\n";
189
190 in >> "source_parts: ";
191 readQuoted(src_part_names, in);
192 in >> "\n";
193
194 in >> "new_parts: ";
195 readQuoted(new_part_names, in);
196 in >> "\n";
197
198 in >> "part_checksums: ";
199 readQuoted(part_names_checksums, in);
200 in >> "\n";
201
202 in >> "columns_version: " >> columns_version;
203}
204
205String ReplicatedMergeTreeLogEntryData::toString() const
206{
207 WriteBufferFromOwnString out;
208 writeText(out);
209 return out.str();
210}
211
212ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const Coordination::Stat & stat)
213{
214 ReadBufferFromString in(s);
215 Ptr res = std::make_shared<ReplicatedMergeTreeLogEntry>();
216 res->readText(in);
217 assertEOF(in);
218
219 if (!res->create_time)
220 res->create_time = stat.ctime / 1000;
221
222 return res;
223}
224
225}
226