1 | #include <Common/ZooKeeper/Types.h> |
---|---|
2 | |
3 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> |
4 | #include <IO/Operators.h> |
5 | #include <IO/ReadBufferFromString.h> |
6 | #include <IO/WriteBufferFromString.h> |
7 | |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | |
13 | void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const |
14 | { |
15 | out << "format version: 4\n" |
16 | << "create_time: "<< LocalDateTime(create_time ? create_time : time(nullptr)) << "\n" |
17 | << "source replica: "<< source_replica << '\n' |
18 | << "block_id: "<< escape << block_id << '\n'; |
19 | |
20 | switch (type) |
21 | { |
22 | case GET_PART: |
23 | out << "get\n"<< new_part_name; |
24 | break; |
25 | |
26 | case MERGE_PARTS: |
27 | out << "merge\n"; |
28 | for (const String & s : source_parts) |
29 | out << s << '\n'; |
30 | out << "into\n"<< new_part_name; |
31 | out << "\ndeduplicate: "<< deduplicate; |
32 | break; |
33 | |
34 | case DROP_RANGE: |
35 | if (detach) |
36 | out << "detach\n"; |
37 | else |
38 | out << "drop\n"; |
39 | out << new_part_name; |
40 | break; |
41 | |
42 | case CLEAR_COLUMN: |
43 | out << "clear_column\n" |
44 | << escape << column_name |
45 | << "\nfrom\n" |
46 | << new_part_name; |
47 | break; |
48 | |
49 | case CLEAR_INDEX: |
50 | out << "clear_index\n" |
51 | << escape << index_name |
52 | << "\nfrom\n" |
53 | << new_part_name; |
54 | break; |
55 | |
56 | case REPLACE_RANGE: |
57 | out << typeToString(REPLACE_RANGE) << "\n"; |
58 | replace_range_entry->writeText(out); |
59 | break; |
60 | |
61 | case MUTATE_PART: |
62 | out << "mutate\n" |
63 | << source_parts.at(0) << "\n" |
64 | << "to\n" |
65 | << new_part_name; |
66 | break; |
67 | |
68 | default: |
69 | throw Exception("Unknown log entry type: "+ DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR); |
70 | } |
71 | |
72 | out << '\n'; |
73 | |
74 | if (quorum) |
75 | out << "quorum: "<< quorum << '\n'; |
76 | } |
77 | |
78 | void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) |
79 | { |
80 | UInt8 format_version = 0; |
81 | String type_str; |
82 | |
83 | in >> "format version: ">> format_version >> "\n"; |
84 | |
85 | if (format_version < 1 || format_version > 4) |
86 | throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: "+ DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION); |
87 | |
88 | if (format_version >= 2) |
89 | { |
90 | LocalDateTime create_time_dt; |
91 | in >> "create_time: ">> create_time_dt >> "\n"; |
92 | create_time = create_time_dt; |
93 | } |
94 | |
95 | in >> "source replica: ">> source_replica >> "\n"; |
96 | |
97 | if (format_version >= 3) |
98 | { |
99 | in >> "block_id: ">> escape >> block_id >> "\n"; |
100 | } |
101 | |
102 | in >> type_str >> "\n"; |
103 | |
104 | if (type_str == "get") |
105 | { |
106 | type = GET_PART; |
107 | in >> new_part_name; |
108 | } |
109 | else if (type_str == "merge") |
110 | { |
111 | type = MERGE_PARTS; |
112 | while (true) |
113 | { |
114 | String s; |
115 | in >> s >> "\n"; |
116 | if (s == "into") |
117 | break; |
118 | source_parts.push_back(s); |
119 | } |
120 | in >> new_part_name; |
121 | if (format_version >= 4) |
122 | in >> "\ndeduplicate: ">> deduplicate; |
123 | } |
124 | else if (type_str == "drop"|| type_str == "detach") |
125 | { |
126 | type = DROP_RANGE; |
127 | detach = type_str == "detach"; |
128 | in >> new_part_name; |
129 | } |
130 | else if (type_str == "clear_column") |
131 | { |
132 | type = CLEAR_COLUMN; |
133 | in >> escape >> column_name >> "\nfrom\n">> new_part_name; |
134 | } |
135 | else if (type_str == "clear_index") |
136 | { |
137 | type = CLEAR_INDEX; |
138 | in >> escape >> index_name >> "\nfrom\n">> new_part_name; |
139 | } |
140 | else if (type_str == typeToString(REPLACE_RANGE)) |
141 | { |
142 | type = REPLACE_RANGE; |
143 | replace_range_entry = std::make_shared<ReplaceRangeEntry>(); |
144 | replace_range_entry->readText(in); |
145 | } |
146 | else if (type_str == "mutate") |
147 | { |
148 | type = MUTATE_PART; |
149 | String source_part; |
150 | in >> source_part >> "\n" |
151 | >> "to\n" |
152 | >> new_part_name; |
153 | source_parts.push_back(source_part); |
154 | } |
155 | |
156 | in >> "\n"; |
157 | |
158 | /// Optional field. |
159 | if (!in.eof()) |
160 | in >> "quorum: ">> quorum >> "\n"; |
161 | } |
162 | |
163 | void ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::writeText(WriteBuffer & out) const |
164 | { |
165 | out << "drop_range_name: "<< drop_range_part_name << "\n"; |
166 | out << "from_database: "<< escape << from_database << "\n"; |
167 | out << "from_table: "<< escape << from_table << "\n"; |
168 | |
169 | out << "source_parts: "; |
170 | writeQuoted(src_part_names, out); |
171 | out << "\n"; |
172 | |
173 | out << "new_parts: "; |
174 | writeQuoted(new_part_names, out); |
175 | out << "\n"; |
176 | |
177 | out << "part_checksums: "; |
178 | writeQuoted(part_names_checksums, out); |
179 | out << "\n"; |
180 | |
181 | out << "columns_version: "<< columns_version; |
182 | } |
183 | |
184 | void ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::readText(ReadBuffer & in) |
185 | { |
186 | in >> "drop_range_name: ">> drop_range_part_name >> "\n"; |
187 | in >> "from_database: ">> escape >> from_database >> "\n"; |
188 | in >> "from_table: ">> escape >> from_table >> "\n"; |
189 | |
190 | in >> "source_parts: "; |
191 | readQuoted(src_part_names, in); |
192 | in >> "\n"; |
193 | |
194 | in >> "new_parts: "; |
195 | readQuoted(new_part_names, in); |
196 | in >> "\n"; |
197 | |
198 | in >> "part_checksums: "; |
199 | readQuoted(part_names_checksums, in); |
200 | in >> "\n"; |
201 | |
202 | in >> "columns_version: ">> columns_version; |
203 | } |
204 | |
205 | String ReplicatedMergeTreeLogEntryData::toString() const |
206 | { |
207 | WriteBufferFromOwnString out; |
208 | writeText(out); |
209 | return out.str(); |
210 | } |
211 | |
212 | ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const Coordination::Stat & stat) |
213 | { |
214 | ReadBufferFromString in(s); |
215 | Ptr res = std::make_shared<ReplicatedMergeTreeLogEntry>(); |
216 | res->readText(in); |
217 | assertEOF(in); |
218 | |
219 | if (!res->create_time) |
220 | res->create_time = stat.ctime / 1000; |
221 | |
222 | return res; |
223 | } |
224 | |
225 | } |
226 |