1#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
2#include <Storages/StorageReplicatedMergeTree.h>
3#include <Poco/Timestamp.h>
4
5#include <random>
6#include <unordered_set>
7
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int NOT_FOUND_NODE;
15 extern const int ALL_REPLICAS_LOST;
16 extern const int REPLICA_STATUS_CHANGED;
17}
18
19
20ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
21 : storage(storage_)
22 , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeCleanupThread)")
23 , log(&Logger::get(log_name))
24{
25 task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
26}
27
28void ReplicatedMergeTreeCleanupThread::run()
29{
30 auto storage_settings = storage.getSettings();
31 const auto CLEANUP_SLEEP_MS = storage_settings->cleanup_delay_period * 1000
32 + std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
33
34 try
35 {
36 iterate();
37 }
38 catch (const Coordination::Exception & e)
39 {
40 tryLogCurrentException(log, __PRETTY_FUNCTION__);
41
42 if (e.code == Coordination::ZSESSIONEXPIRED)
43 return;
44 }
45 catch (...)
46 {
47 tryLogCurrentException(log, __PRETTY_FUNCTION__);
48 }
49
50 task->scheduleAfter(CLEANUP_SLEEP_MS);
51}
52
53
54void ReplicatedMergeTreeCleanupThread::iterate()
55{
56 storage.clearOldPartsAndRemoveFromZK();
57
58 {
59 /// TODO: Implement tryLockStructureForShare.
60 auto lock = storage.lockStructureForShare(false, "");
61 storage.clearOldTemporaryDirectories();
62 }
63
64 /// This is loose condition: no problem if we actually had lost leadership at this moment
65 /// and two replicas will try to do cleanup simultaneously.
66 if (storage.is_leader)
67 {
68 clearOldLogs();
69 clearOldBlocks();
70 clearOldMutations();
71 }
72}
73
74
75void ReplicatedMergeTreeCleanupThread::clearOldLogs()
76{
77 auto zookeeper = storage.getZooKeeper();
78 auto storage_settings = storage.getSettings();
79
80 Coordination::Stat stat;
81 if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat))
82 throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
83
84 int children_count = stat.numChildren;
85
86 /// We will wait for 1.1 times more records to accumulate than necessary.
87 if (static_cast<double>(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1)
88 return;
89
90 Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
91
92 /// We will keep logs after and including this threshold.
93 UInt64 min_saved_log_pointer = std::numeric_limits<UInt64>::max();
94
95 UInt64 min_log_pointer_lost_candidate = std::numeric_limits<UInt64>::max();
96
97 Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
98
99 if (entries.empty())
100 return;
101
102 std::sort(entries.begin(), entries.end());
103
104 String min_saved_record_log_str = entries[
105 entries.size() > storage_settings->max_replicated_logs_to_keep
106 ? entries.size() - storage_settings->max_replicated_logs_to_keep
107 : 0];
108
109 /// Replicas that were marked is_lost but are active.
110 std::unordered_set<String> recovering_replicas;
111
112 /// Lost replica -> a version of 'host' node.
113 std::unordered_map<String, UInt32> host_versions_lost_replicas;
114
115 /// Replica -> log pointer.
116 std::unordered_map<String, String> log_pointers_candidate_lost_replicas;
117
118 size_t num_replicas_were_marked_is_lost = 0;
119
120 for (const String & replica : replicas)
121 {
122 Coordination::Stat host_stat;
123 zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat);
124 String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
125
126 UInt64 log_pointer = 0;
127
128 if (!pointer.empty())
129 log_pointer = parse<UInt64>(pointer);
130
131 /// Check status of replica (active or not).
132 /// If replica was not active, we could check when its log_pointer locates.
133
134 /// There can be three possibilities for "is_lost" node:
135 /// It doesn't exist: in old version of ClickHouse.
136 /// It exists and value is 0.
137 /// It exists and value is 1.
138 String is_lost_str;
139
140 bool has_is_lost_node = zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", is_lost_str);
141
142 if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
143 {
144 if (has_is_lost_node && is_lost_str == "1")
145 {
146 /// Lost and active: recovering.
147 recovering_replicas.insert(replica);
148 ++num_replicas_were_marked_is_lost;
149 }
150 else
151 {
152 /// Not lost and active: usual case.
153 min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
154 }
155 }
156 else
157 {
158 if (!has_is_lost_node)
159 {
160 /// Only to support old versions CH.
161 /// If replica did not have "/is_lost" we must save it's log_pointer.
162 /// Because old version CH can not work with recovering.
163 min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
164 }
165 else
166 {
167 if (is_lost_str == "0")
168 {
169 /// Not active and not lost: a candidate to be marked as lost.
170 String log_pointer_str = "log-" + padIndex(log_pointer);
171 if (log_pointer_str >= min_saved_record_log_str)
172 {
173 /// Its log pointer is fresh enough.
174 min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
175 }
176 else
177 {
178 /// Its log pointer is stale: will mark replica as lost.
179 host_versions_lost_replicas[replica] = host_stat.version;
180 log_pointers_candidate_lost_replicas[replica] = log_pointer_str;
181 min_log_pointer_lost_candidate = std::min(min_log_pointer_lost_candidate, log_pointer);
182 }
183 }
184 else
185 {
186 ++num_replicas_were_marked_is_lost;
187 host_versions_lost_replicas[replica] = host_stat.version;
188 }
189 }
190 }
191 }
192
193 /// We must check log_pointer of recovering replicas at the end.
194 /// Because log pointer of recovering replicas can move backward.
195 for (const String & replica : recovering_replicas)
196 {
197 String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
198 UInt64 log_pointer = 0;
199 if (!pointer.empty())
200 log_pointer = parse<UInt64>(pointer);
201 min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
202 }
203
204 if (!recovering_replicas.empty())
205 min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate);
206
207 /// We will not touch the last `min_replicated_logs_to_keep` records.
208 entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage_settings->min_replicated_logs_to_keep), entries.end());
209 /// We will not touch records that are no less than `min_saved_log_pointer`.
210 entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
211
212 if (entries.empty())
213 return;
214
215 markLostReplicas(host_versions_lost_replicas, log_pointers_candidate_lost_replicas, replicas.size() - num_replicas_were_marked_is_lost, zookeeper);
216
217 Coordination::Requests ops;
218 for (size_t i = 0; i < entries.size(); ++i)
219 {
220 ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
221
222 if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
223 {
224 /// We need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer.
225 for (const auto & host_version : host_versions_lost_replicas)
226 ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + host_version.first + "/host", host_version.second));
227
228 /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
229 ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
230 zookeeper->multi(ops);
231 ops.clear();
232 }
233 }
234
235 LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
236}
237
238
239void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_lost_replicas,
240 const std::unordered_map<String, String> & log_pointers_candidate_lost_replicas,
241 size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper)
242{
243 Strings candidate_lost_replicas;
244 std::vector<Coordination::Requests> requests;
245
246 for (const auto & pair : log_pointers_candidate_lost_replicas)
247 {
248 String replica = pair.first;
249 Coordination::Requests ops;
250 /// If host changed version we can not mark replicas, because replica started to be active.
251 ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica)));
252 ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
253 candidate_lost_replicas.push_back(replica);
254 requests.push_back(ops);
255 }
256
257 if (candidate_lost_replicas.size() == replicas_count)
258 throw Exception("All replicas are stale: we won't mark any replica as lost", ErrorCodes::ALL_REPLICAS_LOST);
259
260 std::vector<zkutil::ZooKeeper::FutureMulti> futures;
261 for (size_t i = 0; i < candidate_lost_replicas.size(); ++i)
262 futures.emplace_back(zookeeper->tryAsyncMulti(requests[i]));
263
264 for (size_t i = 0; i < candidate_lost_replicas.size(); ++i)
265 {
266 auto multi_responses = futures[i].get();
267 if (multi_responses.responses[0]->error == Coordination::Error::ZBADVERSION)
268 throw Exception(candidate_lost_replicas[i] + " became active when we marked lost replicas.", DB::ErrorCodes::REPLICA_STATUS_CHANGED);
269 zkutil::KeeperMultiException::check(multi_responses.error, requests[i], multi_responses.responses);
270 }
271}
272
273
274struct ReplicatedMergeTreeCleanupThread::NodeWithStat
275{
276 String node;
277 Int64 ctime = 0;
278
279 NodeWithStat(String node_, Int64 ctime_) : node(std::move(node_)), ctime(ctime_) {}
280
281 static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs)
282 {
283 return std::forward_as_tuple(lhs.ctime, lhs.node) > std::forward_as_tuple(rhs.ctime, rhs.node);
284 }
285};
286
287void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
288{
289 auto zookeeper = storage.getZooKeeper();
290 auto storage_settings = storage.getSettings();
291
292 std::vector<NodeWithStat> timed_blocks;
293 getBlocksSortedByTime(*zookeeper, timed_blocks);
294
295 if (timed_blocks.empty())
296 return;
297
298 /// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
299 Int64 current_time = timed_blocks.front().ctime;
300 Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
301
302 /// Virtual node, all nodes that are "greater" than this one will be deleted
303 NodeWithStat block_threshold{{}, time_threshold};
304
305 size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window);
306 auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
307 auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
308 auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
309
310 zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
311 for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
312 {
313 String path = storage.zookeeper_path + "/blocks/" + it->node;
314 try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
315 }
316
317 for (auto & pair : try_remove_futures)
318 {
319 const String & path = pair.first;
320 int32_t rc = pair.second.get().error;
321 if (rc == Coordination::ZNOTEMPTY)
322 {
323 /// Can happen if there are leftover block nodes with children created by previous server versions.
324 zookeeper->removeRecursive(path);
325 cached_block_stats.erase(first_outdated_block->node);
326 }
327 else if (rc)
328 LOG_WARNING(log,
329 "Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
330 else
331 {
332 /// Successfully removed blocks have to be removed from cache
333 cached_block_stats.erase(first_outdated_block->node);
334 }
335 first_outdated_block++;
336 }
337
338 auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
339 if (num_nodes_to_delete)
340 LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper");
341}
342
343
344void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks)
345{
346 timed_blocks.clear();
347
348 Strings blocks;
349 Coordination::Stat stat;
350 if (zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
351 throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
352
353 /// Seems like this code is obsolete, because we delete blocks from cache
354 /// when they are deleted from zookeeper. But we don't know about all (maybe future) places in code
355 /// where they can be removed, so just to be sure that cache would not leak we check it here.
356 {
357 NameSet blocks_set(blocks.begin(), blocks.end());
358 for (auto it = cached_block_stats.begin(); it != cached_block_stats.end();)
359 {
360 if (!blocks_set.count(it->first))
361 it = cached_block_stats.erase(it);
362 else
363 ++it;
364 }
365 }
366
367 auto not_cached_blocks = stat.numChildren - cached_block_stats.size();
368 if (not_cached_blocks)
369 {
370 LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
371 << " to clear old ones from ZooKeeper.");
372 }
373
374 zkutil::AsyncResponses<Coordination::ExistsResponse> exists_futures;
375 for (const String & block : blocks)
376 {
377 auto it = cached_block_stats.find(block);
378 if (it == cached_block_stats.end())
379 {
380 /// New block. Fetch its stat asynchronously.
381 exists_futures.emplace_back(block, zookeeper.asyncExists(storage.zookeeper_path + "/blocks/" + block));
382 }
383 else
384 {
385 /// Cached block
386 timed_blocks.emplace_back(block, it->second);
387 }
388 }
389
390 /// Put fetched stats into the cache
391 for (auto & elem : exists_futures)
392 {
393 auto status = elem.second.get();
394 if (status.error != Coordination::ZNONODE)
395 {
396 cached_block_stats.emplace(elem.first, status.stat.ctime);
397 timed_blocks.emplace_back(elem.first, status.stat.ctime);
398 }
399 }
400
401 std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
402}
403
404
405void ReplicatedMergeTreeCleanupThread::clearOldMutations()
406{
407 auto storage_settings = storage.getSettings();
408 if (!storage_settings->finished_mutations_to_keep)
409 return;
410
411 if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep)
412 {
413 /// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
414 /// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
415 return;
416 }
417
418 auto zookeeper = storage.getZooKeeper();
419
420 Coordination::Stat replicas_stat;
421 Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &replicas_stat);
422
423 UInt64 min_pointer = std::numeric_limits<UInt64>::max();
424 for (const String & replica : replicas)
425 {
426 String pointer;
427 zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/mutation_pointer", pointer);
428 if (pointer.empty())
429 return; /// One replica hasn't done anything yet so we can't delete any mutations.
430 min_pointer = std::min(parse<UInt64>(pointer), min_pointer);
431 }
432
433 Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/mutations");
434 std::sort(entries.begin(), entries.end());
435
436 /// Do not remove entries that are greater than `min_pointer` (they are not done yet).
437 entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
438 /// Do not remove last `storage_settings->finished_mutations_to_keep` entries.
439 if (entries.size() <= storage_settings->finished_mutations_to_keep)
440 return;
441 entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end());
442
443 if (entries.empty())
444 return;
445
446 Coordination::Requests ops;
447 size_t batch_start_i = 0;
448 for (size_t i = 0; i < entries.size(); ++i)
449 {
450 ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/mutations/" + entries[i], -1));
451
452 if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
453 {
454 /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
455 ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", replicas_stat.version));
456 zookeeper->multi(ops);
457 LOG_DEBUG(log, "Removed " << (i + 1 - batch_start_i) << " old mutation entries: " << entries[batch_start_i] << " - " << entries[i]);
458 batch_start_i = i + 1;
459 ops.clear();
460 }
461 }
462}
463
464}
465