1 | #include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h> |
2 | #include <Storages/MergeTree/checkDataPart.h> |
3 | #include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h> |
4 | #include <Storages/StorageReplicatedMergeTree.h> |
5 | |
6 | |
7 | namespace ProfileEvents |
8 | { |
9 | extern const Event ReplicatedPartChecks; |
10 | extern const Event ReplicatedPartChecksFailed; |
11 | extern const Event ReplicatedDataLoss; |
12 | } |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | namespace ErrorCodes |
18 | { |
19 | extern const int TABLE_DIFFERS_TOO_MUCH; |
20 | } |
21 | |
22 | static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000; |
23 | |
24 | |
25 | ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_) |
26 | : storage(storage_) |
27 | , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreePartCheckThread)" ) |
28 | , log(&Logger::get(log_name)) |
29 | { |
30 | task = storage.global_context.getSchedulePool().createTask(log_name, [this] { run(); }); |
31 | task->schedule(); |
32 | } |
33 | |
34 | ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread() |
35 | { |
36 | stop(); |
37 | } |
38 | |
39 | void ReplicatedMergeTreePartCheckThread::start() |
40 | { |
41 | std::lock_guard lock(start_stop_mutex); |
42 | need_stop = false; |
43 | task->activateAndSchedule(); |
44 | } |
45 | |
46 | void ReplicatedMergeTreePartCheckThread::stop() |
47 | { |
48 | //based on discussion on https://github.com/ClickHouse/ClickHouse/pull/1489#issuecomment-344756259 |
49 | //using the schedule pool there is no problem in case stop is called two time in row and the start multiple times |
50 | |
51 | std::lock_guard lock(start_stop_mutex); |
52 | need_stop = true; |
53 | task->deactivate(); |
54 | } |
55 | |
56 | void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds) |
57 | { |
58 | std::lock_guard lock(parts_mutex); |
59 | |
60 | if (parts_set.count(name)) |
61 | return; |
62 | |
63 | parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); |
64 | parts_set.insert(name); |
65 | task->schedule(); |
66 | } |
67 | |
68 | |
69 | size_t ReplicatedMergeTreePartCheckThread::size() const |
70 | { |
71 | std::lock_guard lock(parts_mutex); |
72 | return parts_set.size(); |
73 | } |
74 | |
75 | |
76 | void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & part_name) |
77 | { |
78 | auto zookeeper = storage.getZooKeeper(); |
79 | String part_path = storage.replica_path + "/parts/" + part_name; |
80 | |
81 | /// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue. |
82 | if (zookeeper->exists(part_path)) |
83 | { |
84 | LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. " |
85 | "Removing from ZooKeeper and queueing a fetch." ); |
86 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); |
87 | |
88 | storage.removePartAndEnqueueFetch(part_name); |
89 | return; |
90 | } |
91 | |
92 | /// If the part is not in ZooKeeper, we'll check if it's at least somewhere. |
93 | auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version); |
94 | |
95 | /** The logic is as follows: |
96 | * - if some live or inactive replica has such a part, or a part covering it |
97 | * - it is Ok, nothing is needed, it is then downloaded when processing the queue, when the replica comes to life; |
98 | * - or, if the replica never comes to life, then the administrator will delete or create a new replica with the same address and see everything from the beginning; |
99 | * - if no one has such part or a part covering it, then |
100 | * - if there are two smaller parts, one with the same min block and the other with the same |
101 | * max block, we hope that all parts in between are present too and the needed part |
102 | * will appear on other replicas as a result of a merge. |
103 | * - otherwise, consider the part lost and delete the entry from the queue. |
104 | * |
105 | * Note that this logic is not perfect - some part in the interior may be missing and the |
106 | * needed part will never appear. But precisely determining whether the part will appear as |
107 | * a result of a merge is complicated - we can't just check if all block numbers covered |
108 | * by the missing part are present somewhere (because gaps between blocks are possible) |
109 | * and to determine the constituent parts of the merge we need to query the replication log |
110 | * (both the common log and the queues of the individual replicas) and then, if the |
111 | * constituent parts are in turn not found, solve the problem recursively for them. |
112 | * |
113 | * Considering the part lost when it is not in fact lost is very dangerous because it leads |
114 | * to divergent replicas and intersecting parts. So we err on the side of caution |
115 | * and don't delete the queue entry when in doubt. |
116 | */ |
117 | |
118 | LOG_WARNING(log, "Checking if anyone has a part covering " << part_name << "." ); |
119 | |
120 | bool found_part_with_the_same_min_block = false; |
121 | bool found_part_with_the_same_max_block = false; |
122 | |
123 | Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas" ); |
124 | for (const String & replica : replicas) |
125 | { |
126 | Strings parts = zookeeper->getChildren(storage.zookeeper_path + "/replicas/" + replica + "/parts" ); |
127 | for (const String & part_on_replica : parts) |
128 | { |
129 | auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version); |
130 | |
131 | if (part_on_replica_info.contains(part_info)) |
132 | { |
133 | LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica << " that covers the missing part " << part_name); |
134 | return; |
135 | } |
136 | |
137 | if (part_info.contains(part_on_replica_info)) |
138 | { |
139 | if (part_on_replica_info.min_block == part_info.min_block) |
140 | found_part_with_the_same_min_block = true; |
141 | if (part_on_replica_info.max_block == part_info.max_block) |
142 | found_part_with_the_same_max_block = true; |
143 | |
144 | if (found_part_with_the_same_min_block && found_part_with_the_same_max_block) |
145 | { |
146 | LOG_WARNING(log, |
147 | "Found parts with the same min block and with the same max block as the missing part " |
148 | << part_name << ". Hoping that it will eventually appear as a result of a merge." ); |
149 | return; |
150 | } |
151 | } |
152 | } |
153 | } |
154 | |
155 | /// No one has such a part and the merge is impossible. |
156 | String not_found_msg; |
157 | if (found_part_with_the_same_max_block) |
158 | not_found_msg = "a smaller part with the same max block." ; |
159 | else if (found_part_with_the_same_min_block) |
160 | not_found_msg = "a smaller part with the same min block." ; |
161 | else |
162 | not_found_msg = "smaller parts with either the same min block or the same max block." ; |
163 | LOG_ERROR(log, "No replica has part covering " << part_name |
164 | << " and a merge is impossible: we didn't find " << not_found_msg); |
165 | |
166 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); |
167 | |
168 | /// Is it in the replication queue? If there is - delete, because the task can not be processed. |
169 | if (!storage.queue.remove(zookeeper, part_name)) |
170 | { |
171 | /// The part was not in our queue. Why did it happen? |
172 | LOG_ERROR(log, "Missing part " << part_name << " is not in our queue." ); |
173 | return; |
174 | } |
175 | |
176 | /** This situation is possible if on all the replicas where the part was, it deteriorated. |
177 | * For example, a replica that has just written it has power turned off and the data has not been written from cache to disk. |
178 | */ |
179 | LOG_ERROR(log, "Part " << part_name << " is lost forever." ); |
180 | ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss); |
181 | } |
182 | |
183 | |
184 | CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) |
185 | { |
186 | LOG_WARNING(log, "Checking part " << part_name); |
187 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks); |
188 | |
189 | /// If the part is still in the PreCommitted -> Committed transition, it is not lost |
190 | /// and there is no need to go searching for it on other replicas. To definitely find the needed part |
191 | /// if it exists (or a part containing it) we first search among the PreCommitted parts. |
192 | auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::PreCommitted}); |
193 | if (!part) |
194 | part = storage.getActiveContainingPart(part_name); |
195 | |
196 | /// We do not have this or a covering part. |
197 | if (!part) |
198 | { |
199 | searchForMissingPart(part_name); |
200 | return {part_name, false, "Part is missing, will search for it" }; |
201 | } |
202 | /// We have this part, and it's active. We will check whether we need this part and whether it has the right data. |
203 | else if (part->name == part_name) |
204 | { |
205 | auto zookeeper = storage.getZooKeeper(); |
206 | auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY); |
207 | |
208 | auto = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( |
209 | part->columns, part->checksums); |
210 | |
211 | String part_path = storage.replica_path + "/parts/" + part_name; |
212 | String part_znode; |
213 | /// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper. |
214 | if (zookeeper->tryGet(part_path, part_znode)) |
215 | { |
216 | LOG_WARNING(log, "Checking data of part " << part_name << "." ); |
217 | |
218 | try |
219 | { |
220 | ReplicatedMergeTreePartHeader ; |
221 | if (!part_znode.empty()) |
222 | zk_part_header = ReplicatedMergeTreePartHeader::fromString(part_znode); |
223 | else |
224 | { |
225 | String columns_znode = zookeeper->get(part_path + "/columns" ); |
226 | String checksums_znode = zookeeper->get(part_path + "/checksums" ); |
227 | zk_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes( |
228 | columns_znode, checksums_znode); |
229 | } |
230 | |
231 | if (local_part_header.getColumnsHash() != zk_part_header.getColumnsHash()) |
232 | throw Exception("Columns of local part " + part_name + " are different from ZooKeeper" , ErrorCodes::TABLE_DIFFERS_TOO_MUCH); |
233 | |
234 | zk_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true); |
235 | |
236 | checkDataPart( |
237 | part, |
238 | true, |
239 | storage.primary_key_data_types, |
240 | storage.skip_indices, |
241 | [this] { return need_stop.load(); }); |
242 | |
243 | if (need_stop) |
244 | { |
245 | LOG_INFO(log, "Checking part was cancelled." ); |
246 | return {part_name, false, "Checking part was cancelled" }; |
247 | } |
248 | |
249 | LOG_INFO(log, "Part " << part_name << " looks good." ); |
250 | } |
251 | catch (const Exception &) |
252 | { |
253 | /// TODO Better to check error code. |
254 | |
255 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
256 | |
257 | String message = "Part " + part_name + " looks broken. Removing it and queueing a fetch." ; |
258 | LOG_ERROR(log, message); |
259 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); |
260 | |
261 | storage.removePartAndEnqueueFetch(part_name); |
262 | |
263 | /// Delete part locally. |
264 | storage.forgetPartAndMoveToDetached(part, "broken" ); |
265 | return {part_name, false, message}; |
266 | } |
267 | } |
268 | else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr)) |
269 | { |
270 | /// If the part is not in ZooKeeper, delete it locally. |
271 | /// Probably, someone just wrote down the part, and has not yet added to ZK. |
272 | /// Therefore, delete only if the part is old (not very reliable). |
273 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); |
274 | |
275 | String message = "Unexpected part " + part_name + " in filesystem. Removing." ; |
276 | LOG_ERROR(log, message); |
277 | storage.forgetPartAndMoveToDetached(part, "unexpected" ); |
278 | return {part_name, false, message}; |
279 | } |
280 | else |
281 | { |
282 | /// TODO You need to make sure that the part is still checked after a while. |
283 | /// Otherwise, it's possible that the part was not added to ZK, |
284 | /// but remained in the filesystem and in a number of active parts. |
285 | /// And then for a long time (before restarting), the data on the replicas will be different. |
286 | |
287 | LOG_TRACE(log, "Young part " << part_name |
288 | << " with age " << (time(nullptr) - part->modification_time) |
289 | << " seconds hasn't been added to ZooKeeper yet. It's ok." ); |
290 | } |
291 | } |
292 | else |
293 | { |
294 | /// If we have a covering part, ignore all the problems with this part. |
295 | /// In the worst case, errors will still appear `old_parts_lifetime` seconds in error log until the part is removed as the old one. |
296 | LOG_WARNING(log, "We have part " << part->name << " covering part " << part_name); |
297 | } |
298 | |
299 | return {part_name, true, "" }; |
300 | } |
301 | |
302 | |
303 | void ReplicatedMergeTreePartCheckThread::run() |
304 | { |
305 | if (need_stop) |
306 | return; |
307 | |
308 | try |
309 | { |
310 | time_t current_time = time(nullptr); |
311 | |
312 | /// Take part from the queue for verification. |
313 | PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated |
314 | time_t min_check_time = std::numeric_limits<time_t>::max(); |
315 | |
316 | { |
317 | std::lock_guard lock(parts_mutex); |
318 | |
319 | if (parts_queue.empty()) |
320 | { |
321 | if (!parts_set.empty()) |
322 | { |
323 | LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug." ); |
324 | parts_set.clear(); |
325 | } |
326 | } |
327 | else |
328 | { |
329 | for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it) |
330 | { |
331 | if (it->second <= current_time) |
332 | { |
333 | selected = it; |
334 | break; |
335 | } |
336 | |
337 | if (it->second < min_check_time) |
338 | min_check_time = it->second; |
339 | } |
340 | } |
341 | } |
342 | |
343 | if (selected == parts_queue.end()) |
344 | return; |
345 | |
346 | checkPart(selected->first); |
347 | |
348 | if (need_stop) |
349 | return; |
350 | |
351 | /// Remove the part from check queue. |
352 | { |
353 | std::lock_guard lock(parts_mutex); |
354 | |
355 | if (parts_queue.empty()) |
356 | { |
357 | LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug." ); |
358 | } |
359 | else |
360 | { |
361 | parts_set.erase(selected->first); |
362 | parts_queue.erase(selected); |
363 | } |
364 | } |
365 | |
366 | task->schedule(); |
367 | } |
368 | catch (const Coordination::Exception & e) |
369 | { |
370 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
371 | |
372 | if (e.code == Coordination::ZSESSIONEXPIRED) |
373 | return; |
374 | |
375 | task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); |
376 | } |
377 | catch (...) |
378 | { |
379 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
380 | task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); |
381 | } |
382 | } |
383 | |
384 | } |
385 | |