1 | #include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h> |
2 | #include <Storages/StorageReplicatedMergeTree.h> |
3 | #include <Poco/Timestamp.h> |
4 | |
5 | #include <random> |
6 | #include <unordered_set> |
7 | |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | namespace ErrorCodes |
13 | { |
14 | extern const int NOT_FOUND_NODE; |
15 | extern const int ALL_REPLICAS_LOST; |
16 | extern const int REPLICA_STATUS_CHANGED; |
17 | } |
18 | |
19 | |
20 | ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_) |
21 | : storage(storage_) |
22 | , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeCleanupThread)" ) |
23 | , log(&Logger::get(log_name)) |
24 | { |
25 | task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); }); |
26 | } |
27 | |
28 | void ReplicatedMergeTreeCleanupThread::run() |
29 | { |
30 | auto storage_settings = storage.getSettings(); |
31 | const auto CLEANUP_SLEEP_MS = storage_settings->cleanup_delay_period * 1000 |
32 | + std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng); |
33 | |
34 | try |
35 | { |
36 | iterate(); |
37 | } |
38 | catch (const Coordination::Exception & e) |
39 | { |
40 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
41 | |
42 | if (e.code == Coordination::ZSESSIONEXPIRED) |
43 | return; |
44 | } |
45 | catch (...) |
46 | { |
47 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
48 | } |
49 | |
50 | task->scheduleAfter(CLEANUP_SLEEP_MS); |
51 | } |
52 | |
53 | |
54 | void ReplicatedMergeTreeCleanupThread::iterate() |
55 | { |
56 | storage.clearOldPartsAndRemoveFromZK(); |
57 | |
58 | { |
59 | /// TODO: Implement tryLockStructureForShare. |
60 | auto lock = storage.lockStructureForShare(false, "" ); |
61 | storage.clearOldTemporaryDirectories(); |
62 | } |
63 | |
64 | /// This is loose condition: no problem if we actually had lost leadership at this moment |
65 | /// and two replicas will try to do cleanup simultaneously. |
66 | if (storage.is_leader) |
67 | { |
68 | clearOldLogs(); |
69 | clearOldBlocks(); |
70 | clearOldMutations(); |
71 | } |
72 | } |
73 | |
74 | |
75 | void ReplicatedMergeTreeCleanupThread::clearOldLogs() |
76 | { |
77 | auto zookeeper = storage.getZooKeeper(); |
78 | auto storage_settings = storage.getSettings(); |
79 | |
80 | Coordination::Stat stat; |
81 | if (!zookeeper->exists(storage.zookeeper_path + "/log" , &stat)) |
82 | throw Exception(storage.zookeeper_path + "/log doesn't exist" , ErrorCodes::NOT_FOUND_NODE); |
83 | |
84 | int children_count = stat.numChildren; |
85 | |
86 | /// We will wait for 1.1 times more records to accumulate than necessary. |
87 | if (static_cast<double>(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1) |
88 | return; |
89 | |
90 | Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas" , &stat); |
91 | |
92 | /// We will keep logs after and including this threshold. |
93 | UInt64 min_saved_log_pointer = std::numeric_limits<UInt64>::max(); |
94 | |
95 | UInt64 min_log_pointer_lost_candidate = std::numeric_limits<UInt64>::max(); |
96 | |
97 | Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log" ); |
98 | |
99 | if (entries.empty()) |
100 | return; |
101 | |
102 | std::sort(entries.begin(), entries.end()); |
103 | |
104 | String min_saved_record_log_str = entries[ |
105 | entries.size() > storage_settings->max_replicated_logs_to_keep |
106 | ? entries.size() - storage_settings->max_replicated_logs_to_keep |
107 | : 0]; |
108 | |
109 | /// Replicas that were marked is_lost but are active. |
110 | std::unordered_set<String> recovering_replicas; |
111 | |
112 | /// Lost replica -> a version of 'host' node. |
113 | std::unordered_map<String, UInt32> host_versions_lost_replicas; |
114 | |
115 | /// Replica -> log pointer. |
116 | std::unordered_map<String, String> log_pointers_candidate_lost_replicas; |
117 | |
118 | size_t num_replicas_were_marked_is_lost = 0; |
119 | |
120 | for (const String & replica : replicas) |
121 | { |
122 | Coordination::Stat host_stat; |
123 | zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host" , &host_stat); |
124 | String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer" ); |
125 | |
126 | UInt64 log_pointer = 0; |
127 | |
128 | if (!pointer.empty()) |
129 | log_pointer = parse<UInt64>(pointer); |
130 | |
131 | /// Check status of replica (active or not). |
132 | /// If replica was not active, we could check when its log_pointer locates. |
133 | |
134 | /// There can be three possibilities for "is_lost" node: |
135 | /// It doesn't exist: in old version of ClickHouse. |
136 | /// It exists and value is 0. |
137 | /// It exists and value is 1. |
138 | String is_lost_str; |
139 | |
140 | bool has_is_lost_node = zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost" , is_lost_str); |
141 | |
142 | if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active" )) |
143 | { |
144 | if (has_is_lost_node && is_lost_str == "1" ) |
145 | { |
146 | /// Lost and active: recovering. |
147 | recovering_replicas.insert(replica); |
148 | ++num_replicas_were_marked_is_lost; |
149 | } |
150 | else |
151 | { |
152 | /// Not lost and active: usual case. |
153 | min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); |
154 | } |
155 | } |
156 | else |
157 | { |
158 | if (!has_is_lost_node) |
159 | { |
160 | /// Only to support old versions CH. |
161 | /// If replica did not have "/is_lost" we must save it's log_pointer. |
162 | /// Because old version CH can not work with recovering. |
163 | min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); |
164 | } |
165 | else |
166 | { |
167 | if (is_lost_str == "0" ) |
168 | { |
169 | /// Not active and not lost: a candidate to be marked as lost. |
170 | String log_pointer_str = "log-" + padIndex(log_pointer); |
171 | if (log_pointer_str >= min_saved_record_log_str) |
172 | { |
173 | /// Its log pointer is fresh enough. |
174 | min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); |
175 | } |
176 | else |
177 | { |
178 | /// Its log pointer is stale: will mark replica as lost. |
179 | host_versions_lost_replicas[replica] = host_stat.version; |
180 | log_pointers_candidate_lost_replicas[replica] = log_pointer_str; |
181 | min_log_pointer_lost_candidate = std::min(min_log_pointer_lost_candidate, log_pointer); |
182 | } |
183 | } |
184 | else |
185 | { |
186 | ++num_replicas_were_marked_is_lost; |
187 | host_versions_lost_replicas[replica] = host_stat.version; |
188 | } |
189 | } |
190 | } |
191 | } |
192 | |
193 | /// We must check log_pointer of recovering replicas at the end. |
194 | /// Because log pointer of recovering replicas can move backward. |
195 | for (const String & replica : recovering_replicas) |
196 | { |
197 | String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer" ); |
198 | UInt64 log_pointer = 0; |
199 | if (!pointer.empty()) |
200 | log_pointer = parse<UInt64>(pointer); |
201 | min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); |
202 | } |
203 | |
204 | if (!recovering_replicas.empty()) |
205 | min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate); |
206 | |
207 | /// We will not touch the last `min_replicated_logs_to_keep` records. |
208 | entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage_settings->min_replicated_logs_to_keep), entries.end()); |
209 | /// We will not touch records that are no less than `min_saved_log_pointer`. |
210 | entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end()); |
211 | |
212 | if (entries.empty()) |
213 | return; |
214 | |
215 | markLostReplicas(host_versions_lost_replicas, log_pointers_candidate_lost_replicas, replicas.size() - num_replicas_were_marked_is_lost, zookeeper); |
216 | |
217 | Coordination::Requests ops; |
218 | for (size_t i = 0; i < entries.size(); ++i) |
219 | { |
220 | ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1)); |
221 | |
222 | if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size()) |
223 | { |
224 | /// We need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer. |
225 | for (const auto & host_version : host_versions_lost_replicas) |
226 | ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + host_version.first + "/host" , host_version.second)); |
227 | |
228 | /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. |
229 | ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" , stat.version)); |
230 | zookeeper->multi(ops); |
231 | ops.clear(); |
232 | } |
233 | } |
234 | |
235 | LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back()); |
236 | } |
237 | |
238 | |
239 | void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_lost_replicas, |
240 | const std::unordered_map<String, String> & log_pointers_candidate_lost_replicas, |
241 | size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper) |
242 | { |
243 | Strings candidate_lost_replicas; |
244 | std::vector<Coordination::Requests> requests; |
245 | |
246 | for (const auto & pair : log_pointers_candidate_lost_replicas) |
247 | { |
248 | String replica = pair.first; |
249 | Coordination::Requests ops; |
250 | /// If host changed version we can not mark replicas, because replica started to be active. |
251 | ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host" , host_versions_lost_replicas.at(replica))); |
252 | ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost" , "1" , -1)); |
253 | candidate_lost_replicas.push_back(replica); |
254 | requests.push_back(ops); |
255 | } |
256 | |
257 | if (candidate_lost_replicas.size() == replicas_count) |
258 | throw Exception("All replicas are stale: we won't mark any replica as lost" , ErrorCodes::ALL_REPLICAS_LOST); |
259 | |
260 | std::vector<zkutil::ZooKeeper::FutureMulti> futures; |
261 | for (size_t i = 0; i < candidate_lost_replicas.size(); ++i) |
262 | futures.emplace_back(zookeeper->tryAsyncMulti(requests[i])); |
263 | |
264 | for (size_t i = 0; i < candidate_lost_replicas.size(); ++i) |
265 | { |
266 | auto multi_responses = futures[i].get(); |
267 | if (multi_responses.responses[0]->error == Coordination::Error::ZBADVERSION) |
268 | throw Exception(candidate_lost_replicas[i] + " became active when we marked lost replicas." , DB::ErrorCodes::REPLICA_STATUS_CHANGED); |
269 | zkutil::KeeperMultiException::check(multi_responses.error, requests[i], multi_responses.responses); |
270 | } |
271 | } |
272 | |
273 | |
274 | struct ReplicatedMergeTreeCleanupThread::NodeWithStat |
275 | { |
276 | String node; |
277 | Int64 ctime = 0; |
278 | |
279 | NodeWithStat(String node_, Int64 ctime_) : node(std::move(node_)), ctime(ctime_) {} |
280 | |
281 | static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs) |
282 | { |
283 | return std::forward_as_tuple(lhs.ctime, lhs.node) > std::forward_as_tuple(rhs.ctime, rhs.node); |
284 | } |
285 | }; |
286 | |
287 | void ReplicatedMergeTreeCleanupThread::clearOldBlocks() |
288 | { |
289 | auto zookeeper = storage.getZooKeeper(); |
290 | auto storage_settings = storage.getSettings(); |
291 | |
292 | std::vector<NodeWithStat> timed_blocks; |
293 | getBlocksSortedByTime(*zookeeper, timed_blocks); |
294 | |
295 | if (timed_blocks.empty()) |
296 | return; |
297 | |
298 | /// Use ZooKeeper's first node (last according to time) timestamp as "current" time. |
299 | Int64 current_time = timed_blocks.front().ctime; |
300 | Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds)); |
301 | |
302 | /// Virtual node, all nodes that are "greater" than this one will be deleted |
303 | NodeWithStat block_threshold{{}, time_threshold}; |
304 | |
305 | size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window); |
306 | auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; |
307 | auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); |
308 | auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold); |
309 | |
310 | zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures; |
311 | for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) |
312 | { |
313 | String path = storage.zookeeper_path + "/blocks/" + it->node; |
314 | try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path)); |
315 | } |
316 | |
317 | for (auto & pair : try_remove_futures) |
318 | { |
319 | const String & path = pair.first; |
320 | int32_t rc = pair.second.get().error; |
321 | if (rc == Coordination::ZNOTEMPTY) |
322 | { |
323 | /// Can happen if there are leftover block nodes with children created by previous server versions. |
324 | zookeeper->removeRecursive(path); |
325 | cached_block_stats.erase(first_outdated_block->node); |
326 | } |
327 | else if (rc) |
328 | LOG_WARNING(log, |
329 | "Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring." ); |
330 | else |
331 | { |
332 | /// Successfully removed blocks have to be removed from cache |
333 | cached_block_stats.erase(first_outdated_block->node); |
334 | } |
335 | first_outdated_block++; |
336 | } |
337 | |
338 | auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block; |
339 | if (num_nodes_to_delete) |
340 | LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper" ); |
341 | } |
342 | |
343 | |
344 | void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks) |
345 | { |
346 | timed_blocks.clear(); |
347 | |
348 | Strings blocks; |
349 | Coordination::Stat stat; |
350 | if (zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks" , blocks, &stat)) |
351 | throw Exception(storage.zookeeper_path + "/blocks doesn't exist" , ErrorCodes::NOT_FOUND_NODE); |
352 | |
353 | /// Seems like this code is obsolete, because we delete blocks from cache |
354 | /// when they are deleted from zookeeper. But we don't know about all (maybe future) places in code |
355 | /// where they can be removed, so just to be sure that cache would not leak we check it here. |
356 | { |
357 | NameSet blocks_set(blocks.begin(), blocks.end()); |
358 | for (auto it = cached_block_stats.begin(); it != cached_block_stats.end();) |
359 | { |
360 | if (!blocks_set.count(it->first)) |
361 | it = cached_block_stats.erase(it); |
362 | else |
363 | ++it; |
364 | } |
365 | } |
366 | |
367 | auto not_cached_blocks = stat.numChildren - cached_block_stats.size(); |
368 | if (not_cached_blocks) |
369 | { |
370 | LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)" |
371 | << " to clear old ones from ZooKeeper." ); |
372 | } |
373 | |
374 | zkutil::AsyncResponses<Coordination::ExistsResponse> exists_futures; |
375 | for (const String & block : blocks) |
376 | { |
377 | auto it = cached_block_stats.find(block); |
378 | if (it == cached_block_stats.end()) |
379 | { |
380 | /// New block. Fetch its stat asynchronously. |
381 | exists_futures.emplace_back(block, zookeeper.asyncExists(storage.zookeeper_path + "/blocks/" + block)); |
382 | } |
383 | else |
384 | { |
385 | /// Cached block |
386 | timed_blocks.emplace_back(block, it->second); |
387 | } |
388 | } |
389 | |
390 | /// Put fetched stats into the cache |
391 | for (auto & elem : exists_futures) |
392 | { |
393 | auto status = elem.second.get(); |
394 | if (status.error != Coordination::ZNONODE) |
395 | { |
396 | cached_block_stats.emplace(elem.first, status.stat.ctime); |
397 | timed_blocks.emplace_back(elem.first, status.stat.ctime); |
398 | } |
399 | } |
400 | |
401 | std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime); |
402 | } |
403 | |
404 | |
405 | void ReplicatedMergeTreeCleanupThread::clearOldMutations() |
406 | { |
407 | auto storage_settings = storage.getSettings(); |
408 | if (!storage_settings->finished_mutations_to_keep) |
409 | return; |
410 | |
411 | if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep) |
412 | { |
413 | /// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests. |
414 | /// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything. |
415 | return; |
416 | } |
417 | |
418 | auto zookeeper = storage.getZooKeeper(); |
419 | |
420 | Coordination::Stat replicas_stat; |
421 | Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas" , &replicas_stat); |
422 | |
423 | UInt64 min_pointer = std::numeric_limits<UInt64>::max(); |
424 | for (const String & replica : replicas) |
425 | { |
426 | String pointer; |
427 | zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/mutation_pointer" , pointer); |
428 | if (pointer.empty()) |
429 | return; /// One replica hasn't done anything yet so we can't delete any mutations. |
430 | min_pointer = std::min(parse<UInt64>(pointer), min_pointer); |
431 | } |
432 | |
433 | Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/mutations" ); |
434 | std::sort(entries.begin(), entries.end()); |
435 | |
436 | /// Do not remove entries that are greater than `min_pointer` (they are not done yet). |
437 | entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end()); |
438 | /// Do not remove last `storage_settings->finished_mutations_to_keep` entries. |
439 | if (entries.size() <= storage_settings->finished_mutations_to_keep) |
440 | return; |
441 | entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end()); |
442 | |
443 | if (entries.empty()) |
444 | return; |
445 | |
446 | Coordination::Requests ops; |
447 | size_t batch_start_i = 0; |
448 | for (size_t i = 0; i < entries.size(); ++i) |
449 | { |
450 | ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/mutations/" + entries[i], -1)); |
451 | |
452 | if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size()) |
453 | { |
454 | /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. |
455 | ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" , replicas_stat.version)); |
456 | zookeeper->multi(ops); |
457 | LOG_DEBUG(log, "Removed " << (i + 1 - batch_start_i) << " old mutation entries: " << entries[batch_start_i] << " - " << entries[i]); |
458 | batch_start_i = i + 1; |
459 | ops.clear(); |
460 | } |
461 | } |
462 | } |
463 | |
464 | } |
465 | |