1#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
2#include <Storages/MergeTree/checkDataPart.h>
3#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
4#include <Storages/StorageReplicatedMergeTree.h>
5
6
7namespace ProfileEvents
8{
9 extern const Event ReplicatedPartChecks;
10 extern const Event ReplicatedPartChecksFailed;
11 extern const Event ReplicatedDataLoss;
12}
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int TABLE_DIFFERS_TOO_MUCH;
20}
21
22static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000;
23
24
25ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_)
26 : storage(storage_)
27 , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreePartCheckThread)")
28 , log(&Logger::get(log_name))
29{
30 task = storage.global_context.getSchedulePool().createTask(log_name, [this] { run(); });
31 task->schedule();
32}
33
34ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread()
35{
36 stop();
37}
38
39void ReplicatedMergeTreePartCheckThread::start()
40{
41 std::lock_guard lock(start_stop_mutex);
42 need_stop = false;
43 task->activateAndSchedule();
44}
45
46void ReplicatedMergeTreePartCheckThread::stop()
47{
48 //based on discussion on https://github.com/ClickHouse/ClickHouse/pull/1489#issuecomment-344756259
49 //using the schedule pool there is no problem in case stop is called two time in row and the start multiple times
50
51 std::lock_guard lock(start_stop_mutex);
52 need_stop = true;
53 task->deactivate();
54}
55
56void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds)
57{
58 std::lock_guard lock(parts_mutex);
59
60 if (parts_set.count(name))
61 return;
62
63 parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds);
64 parts_set.insert(name);
65 task->schedule();
66}
67
68
69size_t ReplicatedMergeTreePartCheckThread::size() const
70{
71 std::lock_guard lock(parts_mutex);
72 return parts_set.size();
73}
74
75
76void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & part_name)
77{
78 auto zookeeper = storage.getZooKeeper();
79 String part_path = storage.replica_path + "/parts/" + part_name;
80
81 /// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue.
82 if (zookeeper->exists(part_path))
83 {
84 LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. "
85 "Removing from ZooKeeper and queueing a fetch.");
86 ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
87
88 storage.removePartAndEnqueueFetch(part_name);
89 return;
90 }
91
92 /// If the part is not in ZooKeeper, we'll check if it's at least somewhere.
93 auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
94
95 /** The logic is as follows:
96 * - if some live or inactive replica has such a part, or a part covering it
97 * - it is Ok, nothing is needed, it is then downloaded when processing the queue, when the replica comes to life;
98 * - or, if the replica never comes to life, then the administrator will delete or create a new replica with the same address and see everything from the beginning;
99 * - if no one has such part or a part covering it, then
100 * - if there are two smaller parts, one with the same min block and the other with the same
101 * max block, we hope that all parts in between are present too and the needed part
102 * will appear on other replicas as a result of a merge.
103 * - otherwise, consider the part lost and delete the entry from the queue.
104 *
105 * Note that this logic is not perfect - some part in the interior may be missing and the
106 * needed part will never appear. But precisely determining whether the part will appear as
107 * a result of a merge is complicated - we can't just check if all block numbers covered
108 * by the missing part are present somewhere (because gaps between blocks are possible)
109 * and to determine the constituent parts of the merge we need to query the replication log
110 * (both the common log and the queues of the individual replicas) and then, if the
111 * constituent parts are in turn not found, solve the problem recursively for them.
112 *
113 * Considering the part lost when it is not in fact lost is very dangerous because it leads
114 * to divergent replicas and intersecting parts. So we err on the side of caution
115 * and don't delete the queue entry when in doubt.
116 */
117
118 LOG_WARNING(log, "Checking if anyone has a part covering " << part_name << ".");
119
120 bool found_part_with_the_same_min_block = false;
121 bool found_part_with_the_same_max_block = false;
122
123 Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
124 for (const String & replica : replicas)
125 {
126 Strings parts = zookeeper->getChildren(storage.zookeeper_path + "/replicas/" + replica + "/parts");
127 for (const String & part_on_replica : parts)
128 {
129 auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version);
130
131 if (part_on_replica_info.contains(part_info))
132 {
133 LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica << " that covers the missing part " << part_name);
134 return;
135 }
136
137 if (part_info.contains(part_on_replica_info))
138 {
139 if (part_on_replica_info.min_block == part_info.min_block)
140 found_part_with_the_same_min_block = true;
141 if (part_on_replica_info.max_block == part_info.max_block)
142 found_part_with_the_same_max_block = true;
143
144 if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
145 {
146 LOG_WARNING(log,
147 "Found parts with the same min block and with the same max block as the missing part "
148 << part_name << ". Hoping that it will eventually appear as a result of a merge.");
149 return;
150 }
151 }
152 }
153 }
154
155 /// No one has such a part and the merge is impossible.
156 String not_found_msg;
157 if (found_part_with_the_same_max_block)
158 not_found_msg = "a smaller part with the same max block.";
159 else if (found_part_with_the_same_min_block)
160 not_found_msg = "a smaller part with the same min block.";
161 else
162 not_found_msg = "smaller parts with either the same min block or the same max block.";
163 LOG_ERROR(log, "No replica has part covering " << part_name
164 << " and a merge is impossible: we didn't find " << not_found_msg);
165
166 ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
167
168 /// Is it in the replication queue? If there is - delete, because the task can not be processed.
169 if (!storage.queue.remove(zookeeper, part_name))
170 {
171 /// The part was not in our queue. Why did it happen?
172 LOG_ERROR(log, "Missing part " << part_name << " is not in our queue.");
173 return;
174 }
175
176 /** This situation is possible if on all the replicas where the part was, it deteriorated.
177 * For example, a replica that has just written it has power turned off and the data has not been written from cache to disk.
178 */
179 LOG_ERROR(log, "Part " << part_name << " is lost forever.");
180 ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
181}
182
183
184CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
185{
186 LOG_WARNING(log, "Checking part " << part_name);
187 ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
188
189 /// If the part is still in the PreCommitted -> Committed transition, it is not lost
190 /// and there is no need to go searching for it on other replicas. To definitely find the needed part
191 /// if it exists (or a part containing it) we first search among the PreCommitted parts.
192 auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::PreCommitted});
193 if (!part)
194 part = storage.getActiveContainingPart(part_name);
195
196 /// We do not have this or a covering part.
197 if (!part)
198 {
199 searchForMissingPart(part_name);
200 return {part_name, false, "Part is missing, will search for it"};
201 }
202 /// We have this part, and it's active. We will check whether we need this part and whether it has the right data.
203 else if (part->name == part_name)
204 {
205 auto zookeeper = storage.getZooKeeper();
206 auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
207
208 auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
209 part->columns, part->checksums);
210
211 String part_path = storage.replica_path + "/parts/" + part_name;
212 String part_znode;
213 /// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper.
214 if (zookeeper->tryGet(part_path, part_znode))
215 {
216 LOG_WARNING(log, "Checking data of part " << part_name << ".");
217
218 try
219 {
220 ReplicatedMergeTreePartHeader zk_part_header;
221 if (!part_znode.empty())
222 zk_part_header = ReplicatedMergeTreePartHeader::fromString(part_znode);
223 else
224 {
225 String columns_znode = zookeeper->get(part_path + "/columns");
226 String checksums_znode = zookeeper->get(part_path + "/checksums");
227 zk_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(
228 columns_znode, checksums_znode);
229 }
230
231 if (local_part_header.getColumnsHash() != zk_part_header.getColumnsHash())
232 throw Exception("Columns of local part " + part_name + " are different from ZooKeeper", ErrorCodes::TABLE_DIFFERS_TOO_MUCH);
233
234 zk_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);
235
236 checkDataPart(
237 part,
238 true,
239 storage.primary_key_data_types,
240 storage.skip_indices,
241 [this] { return need_stop.load(); });
242
243 if (need_stop)
244 {
245 LOG_INFO(log, "Checking part was cancelled.");
246 return {part_name, false, "Checking part was cancelled"};
247 }
248
249 LOG_INFO(log, "Part " << part_name << " looks good.");
250 }
251 catch (const Exception &)
252 {
253 /// TODO Better to check error code.
254
255 tryLogCurrentException(log, __PRETTY_FUNCTION__);
256
257 String message = "Part " + part_name + " looks broken. Removing it and queueing a fetch.";
258 LOG_ERROR(log, message);
259 ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
260
261 storage.removePartAndEnqueueFetch(part_name);
262
263 /// Delete part locally.
264 storage.forgetPartAndMoveToDetached(part, "broken");
265 return {part_name, false, message};
266 }
267 }
268 else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
269 {
270 /// If the part is not in ZooKeeper, delete it locally.
271 /// Probably, someone just wrote down the part, and has not yet added to ZK.
272 /// Therefore, delete only if the part is old (not very reliable).
273 ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
274
275 String message = "Unexpected part " + part_name + " in filesystem. Removing.";
276 LOG_ERROR(log, message);
277 storage.forgetPartAndMoveToDetached(part, "unexpected");
278 return {part_name, false, message};
279 }
280 else
281 {
282 /// TODO You need to make sure that the part is still checked after a while.
283 /// Otherwise, it's possible that the part was not added to ZK,
284 /// but remained in the filesystem and in a number of active parts.
285 /// And then for a long time (before restarting), the data on the replicas will be different.
286
287 LOG_TRACE(log, "Young part " << part_name
288 << " with age " << (time(nullptr) - part->modification_time)
289 << " seconds hasn't been added to ZooKeeper yet. It's ok.");
290 }
291 }
292 else
293 {
294 /// If we have a covering part, ignore all the problems with this part.
295 /// In the worst case, errors will still appear `old_parts_lifetime` seconds in error log until the part is removed as the old one.
296 LOG_WARNING(log, "We have part " << part->name << " covering part " << part_name);
297 }
298
299 return {part_name, true, ""};
300}
301
302
303void ReplicatedMergeTreePartCheckThread::run()
304{
305 if (need_stop)
306 return;
307
308 try
309 {
310 time_t current_time = time(nullptr);
311
312 /// Take part from the queue for verification.
313 PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
314 time_t min_check_time = std::numeric_limits<time_t>::max();
315
316 {
317 std::lock_guard lock(parts_mutex);
318
319 if (parts_queue.empty())
320 {
321 if (!parts_set.empty())
322 {
323 LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
324 parts_set.clear();
325 }
326 }
327 else
328 {
329 for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it)
330 {
331 if (it->second <= current_time)
332 {
333 selected = it;
334 break;
335 }
336
337 if (it->second < min_check_time)
338 min_check_time = it->second;
339 }
340 }
341 }
342
343 if (selected == parts_queue.end())
344 return;
345
346 checkPart(selected->first);
347
348 if (need_stop)
349 return;
350
351 /// Remove the part from check queue.
352 {
353 std::lock_guard lock(parts_mutex);
354
355 if (parts_queue.empty())
356 {
357 LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug.");
358 }
359 else
360 {
361 parts_set.erase(selected->first);
362 parts_queue.erase(selected);
363 }
364 }
365
366 task->schedule();
367 }
368 catch (const Coordination::Exception & e)
369 {
370 tryLogCurrentException(log, __PRETTY_FUNCTION__);
371
372 if (e.code == Coordination::ZSESSIONEXPIRED)
373 return;
374
375 task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
376 }
377 catch (...)
378 {
379 tryLogCurrentException(log, __PRETTY_FUNCTION__);
380 task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
381 }
382}
383
384}
385