1 | #include <Storages/MergeTree/ReplicatedMergeTreeQueue.h> |
2 | #include <Storages/StorageReplicatedMergeTree.h> |
3 | #include <IO/ReadHelpers.h> |
4 | #include <IO/WriteHelpers.h> |
5 | #include <Storages/MergeTree/MergeTreeDataPart.h> |
6 | #include <Storages/MergeTree/MergeTreeDataMergerMutator.h> |
7 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> |
8 | #include <Common/StringUtils/StringUtils.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int UNEXPECTED_NODE_IN_ZOOKEEPER; |
17 | extern const int UNFINISHED; |
18 | extern const int PART_IS_TEMPORARILY_LOCKED; |
19 | } |
20 | |
21 | |
22 | ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_) |
23 | : storage(storage_) |
24 | , format_version(storage.format_version) |
25 | , current_parts(format_version) |
26 | , virtual_parts(format_version) |
27 | {} |
28 | |
29 | |
30 | void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts) |
31 | { |
32 | std::lock_guard lock(state_mutex); |
33 | |
34 | for (auto part : parts) |
35 | { |
36 | current_parts.add(part->name); |
37 | virtual_parts.add(part->name); |
38 | } |
39 | } |
40 | |
41 | |
42 | bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const |
43 | { |
44 | std::lock_guard lock(state_mutex); |
45 | return virtual_parts.getContainingPart(data_part->info) != data_part->name; |
46 | } |
47 | |
48 | bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) |
49 | { |
50 | auto queue_path = replica_path + "/queue" ; |
51 | LOG_DEBUG(log, "Loading queue from " << queue_path); |
52 | |
53 | bool updated = false; |
54 | std::optional<time_t> min_unprocessed_insert_time_changed; |
55 | |
56 | { |
57 | std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex); |
58 | |
59 | String log_pointer_str = zookeeper->get(replica_path + "/log_pointer" ); |
60 | log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str); |
61 | |
62 | std::unordered_set<String> already_loaded_paths; |
63 | { |
64 | std::lock_guard lock(state_mutex); |
65 | for (const LogEntryPtr & log_entry : queue) |
66 | already_loaded_paths.insert(log_entry->znode_name); |
67 | } |
68 | |
69 | Strings children = zookeeper->getChildren(queue_path); |
70 | |
71 | auto to_remove_it = std::remove_if( |
72 | children.begin(), children.end(), [&](const String & path) |
73 | { |
74 | return already_loaded_paths.count(path); |
75 | }); |
76 | |
77 | LOG_DEBUG(log, |
78 | "Having " << (to_remove_it - children.begin()) << " queue entries to load, " |
79 | << (children.end() - to_remove_it) << " entries already loaded." ); |
80 | children.erase(to_remove_it, children.end()); |
81 | |
82 | std::sort(children.begin(), children.end()); |
83 | |
84 | zkutil::AsyncResponses<Coordination::GetResponse> futures; |
85 | futures.reserve(children.size()); |
86 | |
87 | for (const String & child : children) |
88 | futures.emplace_back(child, zookeeper->asyncGet(queue_path + "/" + child)); |
89 | |
90 | for (auto & future : futures) |
91 | { |
92 | Coordination::GetResponse res = future.second.get(); |
93 | LogEntryPtr entry = LogEntry::parse(res.data, res.stat); |
94 | entry->znode_name = future.first; |
95 | |
96 | std::lock_guard lock(state_mutex); |
97 | |
98 | insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); |
99 | |
100 | updated = true; |
101 | } |
102 | |
103 | zookeeper->tryGet(replica_path + "/mutation_pointer" , mutation_pointer); |
104 | } |
105 | |
106 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); |
107 | |
108 | LOG_TRACE(log, "Loaded queue" ); |
109 | return updated; |
110 | } |
111 | |
112 | |
113 | void ReplicatedMergeTreeQueue::initialize( |
114 | const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, |
115 | const MergeTreeData::DataParts & parts) |
116 | { |
117 | zookeeper_path = zookeeper_path_; |
118 | replica_path = replica_path_; |
119 | logger_name = logger_name_; |
120 | log = &Logger::get(logger_name); |
121 | |
122 | addVirtualParts(parts); |
123 | } |
124 | |
125 | |
126 | void ReplicatedMergeTreeQueue::insertUnlocked( |
127 | const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, |
128 | std::lock_guard<std::mutex> & /* state_lock */) |
129 | { |
130 | for (const String & virtual_part_name : entry->getVirtualPartNames()) |
131 | { |
132 | virtual_parts.add(virtual_part_name); |
133 | updateMutationsPartsToDo(virtual_part_name, /* add = */ true); |
134 | } |
135 | |
136 | /// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted |
137 | if (entry->type != LogEntry::DROP_RANGE) |
138 | queue.push_back(entry); |
139 | else |
140 | queue.push_front(entry); |
141 | |
142 | if (entry->type == LogEntry::GET_PART) |
143 | { |
144 | inserts_by_time.insert(entry); |
145 | |
146 | if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time)) |
147 | { |
148 | min_unprocessed_insert_time = entry->create_time; |
149 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
150 | } |
151 | } |
152 | } |
153 | |
154 | |
155 | void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) |
156 | { |
157 | std::optional<time_t> min_unprocessed_insert_time_changed; |
158 | |
159 | { |
160 | std::lock_guard lock(state_mutex); |
161 | insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); |
162 | } |
163 | |
164 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); |
165 | } |
166 | |
167 | |
168 | void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( |
169 | const LogEntryPtr & entry, |
170 | bool is_successful, |
171 | std::optional<time_t> & min_unprocessed_insert_time_changed, |
172 | std::optional<time_t> & max_processed_insert_time_changed, |
173 | std::unique_lock<std::mutex> & /* queue_lock */) |
174 | { |
175 | /// Update insert times. |
176 | if (entry->type == LogEntry::GET_PART) |
177 | { |
178 | inserts_by_time.erase(entry); |
179 | |
180 | if (inserts_by_time.empty()) |
181 | { |
182 | min_unprocessed_insert_time = 0; |
183 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
184 | } |
185 | else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time) |
186 | { |
187 | min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time; |
188 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
189 | } |
190 | |
191 | if (entry->create_time > max_processed_insert_time) |
192 | { |
193 | max_processed_insert_time = entry->create_time; |
194 | max_processed_insert_time_changed = max_processed_insert_time; |
195 | } |
196 | } |
197 | |
198 | if (is_successful) |
199 | { |
200 | for (const String & virtual_part_name : entry->getVirtualPartNames()) |
201 | { |
202 | Strings replaced_parts; |
203 | current_parts.add(virtual_part_name, &replaced_parts); |
204 | |
205 | /// Each part from `replaced_parts` should become Obsolete as a result of executing the entry. |
206 | /// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion() |
207 | for (const String & replaced_part_name : replaced_parts) |
208 | updateMutationsPartsToDo(replaced_part_name, /* add = */ false); |
209 | } |
210 | |
211 | String drop_range_part_name; |
212 | if (entry->type == LogEntry::DROP_RANGE) |
213 | drop_range_part_name = entry->new_part_name; |
214 | else if (entry->type == LogEntry::REPLACE_RANGE) |
215 | drop_range_part_name = entry->replace_range_entry->drop_range_part_name; |
216 | |
217 | if (!drop_range_part_name.empty()) |
218 | { |
219 | current_parts.remove(drop_range_part_name); |
220 | virtual_parts.remove(drop_range_part_name); |
221 | } |
222 | } |
223 | else |
224 | { |
225 | for (const String & virtual_part_name : entry->getVirtualPartNames()) |
226 | { |
227 | /// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear |
228 | /// so we won't need to mutate it. |
229 | updateMutationsPartsToDo(virtual_part_name, /* add = */ false); |
230 | } |
231 | } |
232 | } |
233 | |
234 | |
235 | void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name, bool add) |
236 | { |
237 | auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); |
238 | auto in_partition = mutations_by_partition.find(part_info.partition_id); |
239 | if (in_partition == mutations_by_partition.end()) |
240 | return; |
241 | |
242 | bool some_mutations_are_probably_done = false; |
243 | |
244 | auto from_it = in_partition->second.upper_bound(part_info.getDataVersion()); |
245 | for (auto it = from_it; it != in_partition->second.end(); ++it) |
246 | { |
247 | MutationStatus & status = *it->second; |
248 | status.parts_to_do += (add ? +1 : -1); |
249 | if (status.parts_to_do <= 0) |
250 | some_mutations_are_probably_done = true; |
251 | |
252 | if (!add && !status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info)) |
253 | { |
254 | status.latest_failed_part.clear(); |
255 | status.latest_failed_part_info = MergeTreePartInfo(); |
256 | status.latest_fail_time = 0; |
257 | status.latest_fail_reason.clear(); |
258 | } |
259 | } |
260 | |
261 | if (some_mutations_are_probably_done) |
262 | storage.mutations_finalizing_task->schedule(); |
263 | } |
264 | |
265 | |
266 | void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( |
267 | zkutil::ZooKeeperPtr zookeeper, |
268 | std::optional<time_t> min_unprocessed_insert_time_changed, |
269 | std::optional<time_t> max_processed_insert_time_changed) const |
270 | { |
271 | /// Here there can be a race condition (with different remove at the same time) |
272 | /// because we update times in ZooKeeper with unlocked mutex, while these times may change. |
273 | /// Consider it unimportant (for a short time, ZK will have a slightly different time value). |
274 | |
275 | Coordination::Requests ops; |
276 | |
277 | if (min_unprocessed_insert_time_changed) |
278 | ops.emplace_back(zkutil::makeSetRequest( |
279 | replica_path + "/min_unprocessed_insert_time" , toString(*min_unprocessed_insert_time_changed), -1)); |
280 | |
281 | if (max_processed_insert_time_changed) |
282 | ops.emplace_back(zkutil::makeSetRequest( |
283 | replica_path + "/max_processed_insert_time" , toString(*max_processed_insert_time_changed), -1)); |
284 | |
285 | if (!ops.empty()) |
286 | { |
287 | Coordination::Responses responses; |
288 | auto code = zookeeper->tryMulti(ops, responses); |
289 | |
290 | if (code) |
291 | LOG_ERROR(log, "Couldn't set value of nodes for insert times (" |
292 | << replica_path << "/min_unprocessed_insert_time, max_processed_insert_time)" << ": " |
293 | << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often." ); |
294 | } |
295 | } |
296 | |
297 | |
298 | void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) |
299 | { |
300 | auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); |
301 | |
302 | if (code) |
303 | LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": " |
304 | << zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often." ); |
305 | |
306 | std::optional<time_t> min_unprocessed_insert_time_changed; |
307 | std::optional<time_t> max_processed_insert_time_changed; |
308 | |
309 | bool found = false; |
310 | size_t queue_size = 0; |
311 | |
312 | { |
313 | std::unique_lock lock(state_mutex); |
314 | |
315 | /// Remove the job from the queue in the RAM. |
316 | /// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task. |
317 | /// Why do we view the queue from the end? |
318 | /// - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end. |
319 | for (Queue::iterator it = queue.end(); it != queue.begin();) |
320 | { |
321 | --it; |
322 | if (*it == entry) |
323 | { |
324 | found = true; |
325 | updateStateOnQueueEntryRemoval( |
326 | entry, /* is_successful = */ true, |
327 | min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); |
328 | |
329 | queue.erase(it); |
330 | queue_size = queue.size(); |
331 | break; |
332 | } |
333 | } |
334 | } |
335 | |
336 | if (!found) |
337 | throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug" , ErrorCodes::LOGICAL_ERROR); |
338 | |
339 | notifySubscribers(queue_size); |
340 | |
341 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); |
342 | } |
343 | |
344 | |
345 | bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name) |
346 | { |
347 | LogEntryPtr found; |
348 | size_t queue_size = 0; |
349 | |
350 | std::optional<time_t> min_unprocessed_insert_time_changed; |
351 | std::optional<time_t> max_processed_insert_time_changed; |
352 | |
353 | { |
354 | std::unique_lock lock(state_mutex); |
355 | |
356 | virtual_parts.remove(part_name); |
357 | |
358 | for (Queue::iterator it = queue.begin(); it != queue.end();) |
359 | { |
360 | if ((*it)->new_part_name == part_name) |
361 | { |
362 | found = *it; |
363 | updateStateOnQueueEntryRemoval( |
364 | found, /* is_successful = */ false, |
365 | min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); |
366 | queue.erase(it++); |
367 | queue_size = queue.size(); |
368 | break; |
369 | } |
370 | else |
371 | ++it; |
372 | } |
373 | } |
374 | |
375 | if (!found) |
376 | return false; |
377 | |
378 | notifySubscribers(queue_size); |
379 | |
380 | zookeeper->tryRemove(replica_path + "/queue/" + found->znode_name); |
381 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); |
382 | |
383 | return true; |
384 | } |
385 | |
386 | |
387 | bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info) |
388 | { |
389 | std::lock_guard lock(state_mutex); |
390 | return virtual_parts.remove(part_info); |
391 | } |
392 | |
393 | void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) |
394 | { |
395 | std::lock_guard lock(pull_logs_to_queue_mutex); |
396 | |
397 | String index_str = zookeeper->get(replica_path + "/log_pointer" ); |
398 | UInt64 index; |
399 | |
400 | Strings log_entries = zookeeper->getChildrenWatch(zookeeper_path + "/log" , nullptr, watch_callback); |
401 | |
402 | /// We update mutations after we have loaded the list of log entries, but before we insert them |
403 | /// in the queue. |
404 | /// With this we ensure that if you read the log state L1 and then the state of mutations M1, |
405 | /// then L1 "happened-before" M1. |
406 | updateMutations(zookeeper); |
407 | |
408 | if (index_str.empty()) |
409 | { |
410 | /// If we do not already have a pointer to the log, put a pointer to the first entry in it. |
411 | index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-" ))); |
412 | |
413 | zookeeper->set(replica_path + "/log_pointer" , toString(index)); |
414 | } |
415 | else |
416 | { |
417 | index = parse<UInt64>(index_str); |
418 | } |
419 | |
420 | String min_log_entry = "log-" + padIndex(index); |
421 | |
422 | /// Multiple log entries that must be copied to the queue. |
423 | |
424 | log_entries.erase( |
425 | std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }), |
426 | log_entries.end()); |
427 | |
428 | if (!log_entries.empty()) |
429 | { |
430 | std::sort(log_entries.begin(), log_entries.end()); |
431 | |
432 | /// ZK contains a limit on the number or total size of operations in a multi-request. |
433 | /// If the limit is exceeded, the connection is simply closed. |
434 | /// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total. |
435 | /// The average size of the node value in this case is less than 10 kilobytes. |
436 | static constexpr auto MAX_MULTI_OPS = 100; |
437 | |
438 | for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += MAX_MULTI_OPS) |
439 | { |
440 | auto begin = log_entries.begin() + entry_idx; |
441 | auto end = entry_idx + MAX_MULTI_OPS >= log_entries.size() |
442 | ? log_entries.end() |
443 | : (begin + MAX_MULTI_OPS); |
444 | auto last = end - 1; |
445 | |
446 | String last_entry = *last; |
447 | if (!startsWith(last_entry, "log-" )) |
448 | throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log" , |
449 | ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER); |
450 | |
451 | UInt64 last_entry_index = parse<UInt64>(last_entry.substr(strlen("log-" ))); |
452 | |
453 | LOG_DEBUG(log, "Pulling " << (end - begin) << " entries to queue: " << *begin << " - " << *last); |
454 | |
455 | zkutil::AsyncResponses<Coordination::GetResponse> futures; |
456 | futures.reserve(end - begin); |
457 | |
458 | for (auto it = begin; it != end; ++it) |
459 | futures.emplace_back(*it, zookeeper->asyncGet(zookeeper_path + "/log/" + *it)); |
460 | |
461 | /// Simultaneously add all new entries to the queue and move the pointer to the log. |
462 | |
463 | Coordination::Requests ops; |
464 | std::vector<LogEntryPtr> copied_entries; |
465 | copied_entries.reserve(end - begin); |
466 | |
467 | std::optional<time_t> min_unprocessed_insert_time_changed; |
468 | |
469 | for (auto & future : futures) |
470 | { |
471 | Coordination::GetResponse res = future.second.get(); |
472 | |
473 | copied_entries.emplace_back(LogEntry::parse(res.data, res.stat)); |
474 | |
475 | ops.emplace_back(zkutil::makeCreateRequest( |
476 | replica_path + "/queue/queue-" , res.data, zkutil::CreateMode::PersistentSequential)); |
477 | |
478 | const auto & entry = *copied_entries.back(); |
479 | if (entry.type == LogEntry::GET_PART) |
480 | { |
481 | std::lock_guard state_lock(state_mutex); |
482 | if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time)) |
483 | { |
484 | min_unprocessed_insert_time = entry.create_time; |
485 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
486 | } |
487 | } |
488 | } |
489 | |
490 | ops.emplace_back(zkutil::makeSetRequest( |
491 | replica_path + "/log_pointer" , toString(last_entry_index + 1), -1)); |
492 | |
493 | if (min_unprocessed_insert_time_changed) |
494 | ops.emplace_back(zkutil::makeSetRequest( |
495 | replica_path + "/min_unprocessed_insert_time" , toString(*min_unprocessed_insert_time_changed), -1)); |
496 | |
497 | auto responses = zookeeper->multi(ops); |
498 | |
499 | /// Now we have successfully updated the queue in ZooKeeper. Update it in RAM. |
500 | |
501 | try |
502 | { |
503 | std::lock_guard state_lock(state_mutex); |
504 | |
505 | log_pointer = last_entry_index + 1; |
506 | |
507 | for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx) |
508 | { |
509 | String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created; |
510 | copied_entries[copied_entry_idx]->znode_name = path_created.substr(path_created.find_last_of('/') + 1); |
511 | |
512 | std::optional<time_t> unused = false; |
513 | insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock); |
514 | } |
515 | |
516 | last_queue_update = time(nullptr); |
517 | } |
518 | catch (...) |
519 | { |
520 | /// If it fails, the data in RAM is incorrect. In order to avoid possible further corruption of data in ZK, we will kill ourselves. |
521 | /// This is possible only if there is an unknown logical error. |
522 | std::terminate(); |
523 | } |
524 | |
525 | if (!copied_entries.empty()) |
526 | LOG_DEBUG(log, "Pulled " << copied_entries.size() << " entries to queue." ); |
527 | } |
528 | |
529 | if (storage.queue_task_handle) |
530 | storage.queue_task_handle->wake(); |
531 | } |
532 | } |
533 | |
534 | |
535 | static Names getPartNamesToMutate( |
536 | const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts) |
537 | { |
538 | Names result; |
539 | for (const auto & pair : mutation.block_numbers) |
540 | { |
541 | const String & partition_id = pair.first; |
542 | Int64 block_num = pair.second; |
543 | |
544 | /// Note that we cannot simply count all parts to mutate using getPartsCoveredBy(appropriate part_info) |
545 | /// because they are not consecutive in `parts`. |
546 | MergeTreePartInfo covering_part_info( |
547 | partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER); |
548 | for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info)) |
549 | { |
550 | auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion()); |
551 | if (part_info.getDataVersion() < block_num) |
552 | result.push_back(covered_part_name); |
553 | } |
554 | } |
555 | |
556 | return result; |
557 | } |
558 | |
559 | |
560 | void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) |
561 | { |
562 | std::lock_guard lock(update_mutations_mutex); |
563 | |
564 | Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations" , nullptr, watch_callback); |
565 | StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end()); |
566 | |
567 | /// Compare with the local state, delete obsolete entries and determine which new entries to load. |
568 | Strings entries_to_load; |
569 | bool some_active_mutations_were_killed = false; |
570 | { |
571 | std::lock_guard state_lock(state_mutex); |
572 | |
573 | for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end();) |
574 | { |
575 | const ReplicatedMergeTreeMutationEntry & entry = *it->second.entry; |
576 | if (!entries_in_zk_set.count(entry.znode_name)) |
577 | { |
578 | if (!it->second.is_done) |
579 | { |
580 | LOG_DEBUG(log, "Removing killed mutation " + entry.znode_name + " from local state." ); |
581 | some_active_mutations_were_killed = true; |
582 | } |
583 | else |
584 | LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state." ); |
585 | |
586 | for (const auto & partition_and_block_num : entry.block_numbers) |
587 | { |
588 | auto & in_partition = mutations_by_partition[partition_and_block_num.first]; |
589 | in_partition.erase(partition_and_block_num.second); |
590 | if (in_partition.empty()) |
591 | mutations_by_partition.erase(partition_and_block_num.first); |
592 | } |
593 | |
594 | it = mutations_by_znode.erase(it); |
595 | } |
596 | else |
597 | ++it; |
598 | } |
599 | |
600 | for (const String & znode : entries_in_zk_set) |
601 | { |
602 | if (!mutations_by_znode.count(znode)) |
603 | entries_to_load.push_back(znode); |
604 | } |
605 | } |
606 | |
607 | if (some_active_mutations_were_killed) |
608 | storage.queue_task_handle->wake(); |
609 | |
610 | if (!entries_to_load.empty()) |
611 | { |
612 | LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: " |
613 | + entries_to_load.front() + " - " + entries_to_load.back()); |
614 | |
615 | std::vector<std::future<Coordination::GetResponse>> futures; |
616 | for (const String & entry : entries_to_load) |
617 | futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry)); |
618 | |
619 | std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations; |
620 | for (size_t i = 0; i < entries_to_load.size(); ++i) |
621 | { |
622 | new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>( |
623 | ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i]))); |
624 | } |
625 | |
626 | bool some_mutations_are_probably_done = false; |
627 | { |
628 | std::lock_guard state_lock(state_mutex); |
629 | |
630 | for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations) |
631 | { |
632 | auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry)) |
633 | .first->second; |
634 | |
635 | for (const auto & pair : entry->block_numbers) |
636 | { |
637 | const String & partition_id = pair.first; |
638 | Int64 block_num = pair.second; |
639 | mutations_by_partition[partition_id].emplace(block_num, &mutation); |
640 | } |
641 | |
642 | /// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`. |
643 | mutation.parts_to_do += getPartNamesToMutate(*entry, current_parts).size(); |
644 | |
645 | /// And next we would need to mutate all parts with getDataVersion() greater than |
646 | /// mutation block number that would appear as a result of executing the queue. |
647 | for (const auto & queue_entry : queue) |
648 | { |
649 | for (const String & produced_part_name : queue_entry->getVirtualPartNames()) |
650 | { |
651 | auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version); |
652 | auto it = entry->block_numbers.find(part_info.partition_id); |
653 | if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion()) |
654 | ++mutation.parts_to_do; |
655 | } |
656 | } |
657 | |
658 | if (mutation.parts_to_do == 0) |
659 | some_mutations_are_probably_done = true; |
660 | } |
661 | } |
662 | |
663 | storage.merge_selecting_task->schedule(); |
664 | |
665 | if (some_mutations_are_probably_done) |
666 | storage.mutations_finalizing_task->schedule(); |
667 | } |
668 | } |
669 | |
670 | |
671 | ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( |
672 | zkutil::ZooKeeperPtr zookeeper, const String & mutation_id) |
673 | { |
674 | std::lock_guard lock(update_mutations_mutex); |
675 | |
676 | auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id); |
677 | if (rc == Coordination::ZOK) |
678 | LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper." ); |
679 | |
680 | ReplicatedMergeTreeMutationEntryPtr entry; |
681 | bool mutation_was_active = false; |
682 | { |
683 | std::lock_guard state_lock(state_mutex); |
684 | |
685 | auto it = mutations_by_znode.find(mutation_id); |
686 | if (it == mutations_by_znode.end()) |
687 | return nullptr; |
688 | |
689 | mutation_was_active = !it->second.is_done; |
690 | |
691 | entry = it->second.entry; |
692 | for (const auto & partition_and_block_num : entry->block_numbers) |
693 | { |
694 | auto & in_partition = mutations_by_partition[partition_and_block_num.first]; |
695 | in_partition.erase(partition_and_block_num.second); |
696 | if (in_partition.empty()) |
697 | mutations_by_partition.erase(partition_and_block_num.first); |
698 | } |
699 | |
700 | mutations_by_znode.erase(it); |
701 | LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state." ); |
702 | } |
703 | |
704 | if (mutation_was_active) |
705 | storage.queue_task_handle->wake(); |
706 | |
707 | return entry; |
708 | } |
709 | |
710 | |
711 | ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name) |
712 | { |
713 | std::lock_guard lock(state_mutex); |
714 | |
715 | /// Let's find the action to merge this part with others. Let's remember others. |
716 | StringSet parts_for_merge; |
717 | Queue::iterator merge_entry = queue.end(); |
718 | for (Queue::iterator it = queue.begin(); it != queue.end(); ++it) |
719 | { |
720 | if ((*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART) |
721 | { |
722 | if (std::find((*it)->source_parts.begin(), (*it)->source_parts.end(), part_name) |
723 | != (*it)->source_parts.end()) |
724 | { |
725 | parts_for_merge = StringSet((*it)->source_parts.begin(), (*it)->source_parts.end()); |
726 | merge_entry = it; |
727 | break; |
728 | } |
729 | } |
730 | } |
731 | |
732 | if (!parts_for_merge.empty()) |
733 | { |
734 | /// Move to the end of queue actions that result in one of the parts in `parts_for_merge`. |
735 | for (Queue::iterator it = queue.begin(); it != queue.end();) |
736 | { |
737 | auto it0 = it; |
738 | ++it; |
739 | |
740 | if (it0 == merge_entry) |
741 | break; |
742 | |
743 | if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART || (*it0)->type == LogEntry::MUTATE_PART) |
744 | && parts_for_merge.count((*it0)->new_part_name)) |
745 | { |
746 | queue.splice(queue.end(), queue, it0, it); |
747 | } |
748 | } |
749 | } |
750 | |
751 | return parts_for_merge; |
752 | } |
753 | |
754 | bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const |
755 | { |
756 | if (entry_ptr->type != LogEntry::REPLACE_RANGE) |
757 | return false; |
758 | |
759 | if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE) |
760 | return false; |
761 | |
762 | if (entry_ptr->replace_range_entry != nullptr && entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves |
763 | return false; |
764 | |
765 | for (const String & new_part_name : entry_ptr->replace_range_entry->new_part_names) |
766 | if (!part_info.contains(MergeTreePartInfo::fromPartName(new_part_name, format_version))) |
767 | return false; |
768 | |
769 | return true; |
770 | } |
771 | |
772 | void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( |
773 | zkutil::ZooKeeperPtr zookeeper, |
774 | const MergeTreePartInfo & part_info, |
775 | const ReplicatedMergeTreeLogEntryData & current) |
776 | { |
777 | Queue to_wait; |
778 | size_t removed_entries = 0; |
779 | std::optional<time_t> min_unprocessed_insert_time_changed; |
780 | std::optional<time_t> max_processed_insert_time_changed; |
781 | |
782 | /// Remove operations with parts, contained in the range to be deleted, from the queue. |
783 | std::unique_lock lock(state_mutex); |
784 | for (Queue::iterator it = queue.begin(); it != queue.end();) |
785 | { |
786 | auto type = (*it)->type; |
787 | |
788 | if (((type == LogEntry::GET_PART || type == LogEntry::MERGE_PARTS || type == LogEntry::MUTATE_PART) |
789 | && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version))) |
790 | || checkReplaceRangeCanBeRemoved(part_info, *it, current)) |
791 | { |
792 | if ((*it)->currently_executing) |
793 | to_wait.push_back(*it); |
794 | auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name); |
795 | if (code) |
796 | LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": " |
797 | << zkutil::ZooKeeper::error2string(code)); |
798 | |
799 | updateStateOnQueueEntryRemoval( |
800 | *it, /* is_successful = */ false, |
801 | min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); |
802 | queue.erase(it++); |
803 | ++removed_entries; |
804 | } |
805 | else |
806 | ++it; |
807 | } |
808 | |
809 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); |
810 | |
811 | LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. " |
812 | "Waiting for " << to_wait.size() << " entries that are currently executing." ); |
813 | |
814 | /// Let's wait for the operations with the parts contained in the range to be deleted. |
815 | for (LogEntryPtr & entry : to_wait) |
816 | entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; }); |
817 | } |
818 | |
819 | |
820 | size_t ReplicatedMergeTreeQueue::getConflictsCountForRange( |
821 | const MergeTreePartInfo & range, const LogEntry & entry, |
822 | String * out_description, std::lock_guard<std::mutex> & /* queue_lock */) const |
823 | { |
824 | std::vector<std::pair<String, LogEntryPtr>> conflicts; |
825 | |
826 | for (auto & future_part_elem : future_parts) |
827 | { |
828 | /// Do not check itself log entry |
829 | if (future_part_elem.second->znode_name == entry.znode_name) |
830 | continue; |
831 | |
832 | if (!range.isDisjoint(MergeTreePartInfo::fromPartName(future_part_elem.first, format_version))) |
833 | { |
834 | conflicts.emplace_back(future_part_elem.first, future_part_elem.second); |
835 | continue; |
836 | } |
837 | } |
838 | |
839 | if (out_description) |
840 | { |
841 | std::stringstream ss; |
842 | ss << "Can't execute command for range " << range.getPartName() << " (entry " << entry.znode_name << "). " ; |
843 | ss << "There are " << conflicts.size() << " currently executing entries blocking it: " ; |
844 | for (const auto & conflict : conflicts) |
845 | ss << conflict.second->typeToString() << " part " << conflict.first << ", " ; |
846 | |
847 | *out_description = ss.str(); |
848 | } |
849 | |
850 | return conflicts.size(); |
851 | } |
852 | |
853 | |
854 | void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry) |
855 | { |
856 | String conflicts_description; |
857 | std::lock_guard lock(state_mutex); |
858 | |
859 | if (0 != getConflictsCountForRange(range, entry, &conflicts_description, lock)) |
860 | throw Exception(conflicts_description, ErrorCodes::UNFINISHED); |
861 | } |
862 | |
863 | |
864 | bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const |
865 | { |
866 | /// Let's check if the same part is now being created by another action. |
867 | if (future_parts.count(new_part_name)) |
868 | { |
869 | out_reason = "Not executing log entry for part " + new_part_name |
870 | + " because another log entry for the same part is being processed. This shouldn't happen often." ; |
871 | return false; |
872 | |
873 | /** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed, |
874 | * and queue element will be processed. |
875 | * Immediately in the `executeLogEntry` function it will be found that we already have a part, |
876 | * and queue element will be immediately treated as processed. |
877 | */ |
878 | } |
879 | |
880 | /// A more complex check is whether another part is currently created by other action that will cover this part. |
881 | /// NOTE The above is redundant, but left for a more convenient message in the log. |
882 | auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version); |
883 | |
884 | /// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited. |
885 | for (const auto & future_part_elem : future_parts) |
886 | { |
887 | auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version); |
888 | |
889 | if (future_part.contains(result_part)) |
890 | { |
891 | out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part " |
892 | + future_part_elem.first + " that is currently executing" ; |
893 | return false; |
894 | } |
895 | } |
896 | |
897 | return true; |
898 | } |
899 | |
900 | bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason) |
901 | { |
902 | std::lock_guard lock(state_mutex); |
903 | |
904 | if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock)) |
905 | { |
906 | CurrentlyExecuting::setActualPartName(entry, part_name, *this); |
907 | return true; |
908 | } |
909 | |
910 | return false; |
911 | } |
912 | |
913 | |
914 | bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( |
915 | const LogEntry & entry, |
916 | String & out_postpone_reason, |
917 | MergeTreeDataMergerMutator & merger_mutator, |
918 | MergeTreeData & data, |
919 | std::lock_guard<std::mutex> & queue_lock) const |
920 | { |
921 | if (entry.type == LogEntry::MERGE_PARTS |
922 | || entry.type == LogEntry::GET_PART |
923 | || entry.type == LogEntry::MUTATE_PART) |
924 | { |
925 | for (const String & new_part_name : entry.getBlockingPartNames()) |
926 | { |
927 | if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, queue_lock)) |
928 | { |
929 | if (!out_postpone_reason.empty()) |
930 | LOG_DEBUG(log, out_postpone_reason); |
931 | return false; |
932 | } |
933 | } |
934 | } |
935 | |
936 | if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART) |
937 | { |
938 | /** If any of the required parts are now fetched or in merge process, wait for the end of this operation. |
939 | * Otherwise, even if all the necessary parts for the merge are not present, you should try to make a merge. |
940 | * If any parts are missing, instead of merge, there will be an attempt to download a part. |
941 | * Such a situation is possible if the receive of a part has failed, and it was moved to the end of the queue. |
942 | */ |
943 | size_t sum_parts_size_in_bytes = 0; |
944 | for (const auto & name : entry.source_parts) |
945 | { |
946 | if (future_parts.count(name)) |
947 | { |
948 | String reason = "Not merging into part " + entry.new_part_name |
949 | + " because part " + name + " is not ready yet (log entry for that part is being processed)." ; |
950 | LOG_TRACE(log, reason); |
951 | out_postpone_reason = reason; |
952 | return false; |
953 | } |
954 | |
955 | auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); |
956 | if (part) |
957 | sum_parts_size_in_bytes += part->bytes_on_disk; |
958 | } |
959 | |
960 | if (merger_mutator.merges_blocker.isCancelled()) |
961 | { |
962 | String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now." ; |
963 | LOG_DEBUG(log, reason); |
964 | out_postpone_reason = reason; |
965 | return false; |
966 | } |
967 | |
968 | UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge() |
969 | : merger_mutator.getMaxSourcePartSizeForMutation(); |
970 | /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), |
971 | * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, |
972 | * because it may be ordered by OPTIMIZE or early with different settings. |
973 | * Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges, |
974 | * because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL). |
975 | */ |
976 | const auto data_settings = data.getSettings(); |
977 | bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool); |
978 | |
979 | if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size) |
980 | { |
981 | String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name |
982 | + " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes) |
983 | + ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ")." ; |
984 | LOG_DEBUG(log, reason); |
985 | out_postpone_reason = reason; |
986 | return false; |
987 | } |
988 | } |
989 | |
990 | /// TODO: it makes sense to check DROP_RANGE also |
991 | if (entry.type == LogEntry::CLEAR_COLUMN || entry.type == LogEntry::REPLACE_RANGE) |
992 | { |
993 | String conflicts_description; |
994 | String range_name = (entry.type == LogEntry::REPLACE_RANGE) ? entry.replace_range_entry->drop_range_part_name : entry.new_part_name; |
995 | auto range = MergeTreePartInfo::fromPartName(range_name, format_version); |
996 | |
997 | if (0 != getConflictsCountForRange(range, entry, &conflicts_description, queue_lock)) |
998 | { |
999 | LOG_DEBUG(log, conflicts_description); |
1000 | return false; |
1001 | } |
1002 | } |
1003 | |
1004 | return true; |
1005 | } |
1006 | |
1007 | |
1008 | Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl( |
1009 | const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const |
1010 | { |
1011 | auto in_partition = mutations_by_partition.find(partition_id); |
1012 | if (in_partition == mutations_by_partition.end()) |
1013 | return 0; |
1014 | |
1015 | auto it = in_partition->second.upper_bound(data_version); |
1016 | if (it == in_partition->second.begin()) |
1017 | return 0; |
1018 | |
1019 | --it; |
1020 | return it->first; |
1021 | } |
1022 | |
1023 | |
1024 | Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partition_id, Int64 data_version) const |
1025 | { |
1026 | std::lock_guard lock(state_mutex); |
1027 | return getCurrentMutationVersionImpl(partition_id, data_version, lock); |
1028 | } |
1029 | |
1030 | |
1031 | ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_) |
1032 | : entry(entry_), queue(queue_) |
1033 | { |
1034 | entry->currently_executing = true; |
1035 | ++entry->num_tries; |
1036 | entry->last_attempt_time = time(nullptr); |
1037 | |
1038 | for (const String & new_part_name : entry->getBlockingPartNames()) |
1039 | { |
1040 | if (!queue.future_parts.emplace(new_part_name, entry).second) |
1041 | throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug." , ErrorCodes::LOGICAL_ERROR); |
1042 | } |
1043 | } |
1044 | |
1045 | |
1046 | void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, |
1047 | const String & actual_part_name, ReplicatedMergeTreeQueue & queue) |
1048 | { |
1049 | if (!entry.actual_new_part_name.empty()) |
1050 | throw Exception("Entry actual part isn't empty yet. This is a bug." , ErrorCodes::LOGICAL_ERROR); |
1051 | |
1052 | entry.actual_new_part_name = actual_part_name; |
1053 | |
1054 | /// Check if it is the same (and already added) part. |
1055 | if (entry.actual_new_part_name == entry.new_part_name) |
1056 | return; |
1057 | |
1058 | if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second) |
1059 | throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug." , ErrorCodes::LOGICAL_ERROR); |
1060 | } |
1061 | |
1062 | |
1063 | ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() |
1064 | { |
1065 | std::lock_guard lock(queue.state_mutex); |
1066 | |
1067 | entry->currently_executing = false; |
1068 | entry->execution_complete.notify_all(); |
1069 | |
1070 | for (const String & new_part_name : entry->getBlockingPartNames()) |
1071 | { |
1072 | if (!queue.future_parts.erase(new_part_name)) |
1073 | LOG_ERROR(queue.log, "Untagging already untagged future part " + new_part_name + ". This is a bug." ); |
1074 | } |
1075 | |
1076 | if (!entry->actual_new_part_name.empty()) |
1077 | { |
1078 | if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name)) |
1079 | LOG_ERROR(queue.log, "Untagging already untagged future part " + entry->actual_new_part_name + ". This is a bug." ); |
1080 | |
1081 | entry->actual_new_part_name.clear(); |
1082 | } |
1083 | } |
1084 | |
1085 | |
1086 | ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data) |
1087 | { |
1088 | LogEntryPtr entry; |
1089 | |
1090 | std::lock_guard lock(state_mutex); |
1091 | |
1092 | for (auto it = queue.begin(); it != queue.end(); ++it) |
1093 | { |
1094 | if ((*it)->currently_executing) |
1095 | continue; |
1096 | |
1097 | if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock)) |
1098 | { |
1099 | entry = *it; |
1100 | queue.splice(queue.end(), queue, it); |
1101 | break; |
1102 | } |
1103 | else |
1104 | { |
1105 | ++(*it)->num_postponed; |
1106 | (*it)->last_postpone_time = time(nullptr); |
1107 | } |
1108 | } |
1109 | |
1110 | if (entry) |
1111 | return { entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) } }; |
1112 | else |
1113 | return {}; |
1114 | } |
1115 | |
1116 | |
1117 | bool ReplicatedMergeTreeQueue::processEntry( |
1118 | std::function<zkutil::ZooKeeperPtr()> get_zookeeper, |
1119 | LogEntryPtr & entry, |
1120 | const std::function<bool(LogEntryPtr &)> func) |
1121 | { |
1122 | std::exception_ptr saved_exception; |
1123 | |
1124 | try |
1125 | { |
1126 | if (func(entry)) |
1127 | removeProcessedEntry(get_zookeeper(), entry); |
1128 | } |
1129 | catch (...) |
1130 | { |
1131 | saved_exception = std::current_exception(); |
1132 | } |
1133 | |
1134 | if (saved_exception) |
1135 | { |
1136 | std::lock_guard lock(state_mutex); |
1137 | |
1138 | entry->exception = saved_exception; |
1139 | |
1140 | if (entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART) |
1141 | { |
1142 | /// Record the exception in the system.mutations table. |
1143 | Int64 result_data_version = MergeTreePartInfo::fromPartName(entry->new_part_name, format_version) |
1144 | .getDataVersion(); |
1145 | auto source_part_info = MergeTreePartInfo::fromPartName( |
1146 | entry->source_parts.at(0), format_version); |
1147 | |
1148 | auto in_partition = mutations_by_partition.find(source_part_info.partition_id); |
1149 | if (in_partition != mutations_by_partition.end()) |
1150 | { |
1151 | auto mutations_begin_it = in_partition->second.upper_bound(source_part_info.getDataVersion()); |
1152 | auto mutations_end_it = in_partition->second.upper_bound(result_data_version); |
1153 | for (auto it = mutations_begin_it; it != mutations_end_it; ++it) |
1154 | { |
1155 | MutationStatus & status = *it->second; |
1156 | status.latest_failed_part = entry->source_parts.at(0); |
1157 | status.latest_failed_part_info = source_part_info; |
1158 | status.latest_fail_time = time(nullptr); |
1159 | status.latest_fail_reason = getExceptionMessage(saved_exception, false); |
1160 | } |
1161 | } |
1162 | } |
1163 | |
1164 | return false; |
1165 | } |
1166 | |
1167 | return true; |
1168 | } |
1169 | |
1170 | |
1171 | std::pair<size_t, size_t> ReplicatedMergeTreeQueue::countMergesAndPartMutations() const |
1172 | { |
1173 | std::lock_guard lock(state_mutex); |
1174 | |
1175 | size_t count_merges = 0; |
1176 | size_t count_mutations = 0; |
1177 | for (const auto & entry : queue) |
1178 | { |
1179 | if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS) |
1180 | ++count_merges; |
1181 | else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART) |
1182 | ++count_mutations; |
1183 | } |
1184 | |
1185 | return std::make_pair(count_merges, count_mutations); |
1186 | } |
1187 | |
1188 | |
1189 | size_t ReplicatedMergeTreeQueue::countMutations() const |
1190 | { |
1191 | std::lock_guard lock(state_mutex); |
1192 | return mutations_by_znode.size(); |
1193 | } |
1194 | |
1195 | |
1196 | size_t ReplicatedMergeTreeQueue::countFinishedMutations() const |
1197 | { |
1198 | std::lock_guard lock(state_mutex); |
1199 | |
1200 | size_t count = 0; |
1201 | for (const auto & pair : mutations_by_znode) |
1202 | { |
1203 | const auto & mutation = pair.second; |
1204 | if (!mutation.is_done) |
1205 | break; |
1206 | |
1207 | ++count; |
1208 | } |
1209 | |
1210 | return count; |
1211 | } |
1212 | |
1213 | |
1214 | ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper) |
1215 | { |
1216 | return ReplicatedMergeTreeMergePredicate(*this, zookeeper); |
1217 | } |
1218 | |
1219 | |
1220 | MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( |
1221 | const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const |
1222 | { |
1223 | /// NOTE: If the corresponding mutation is not found, the error is logged (and not thrown as an exception) |
1224 | /// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry |
1225 | /// from /mutations in ZK and the replicas will simply skip the mutation. |
1226 | |
1227 | if (part->info.getDataVersion() > desired_mutation_version) |
1228 | { |
1229 | LOG_WARNING(log, "Data version of part " << part->name << " is already greater than " |
1230 | "desired mutation version " << desired_mutation_version); |
1231 | return MutationCommands{}; |
1232 | } |
1233 | |
1234 | std::lock_guard lock(state_mutex); |
1235 | |
1236 | auto in_partition = mutations_by_partition.find(part->info.partition_id); |
1237 | if (in_partition == mutations_by_partition.end()) |
1238 | { |
1239 | LOG_WARNING(log, "There are no mutations for partition ID " << part->info.partition_id |
1240 | << " (trying to mutate part " << part->name << " to " << toString(desired_mutation_version) << ")" ); |
1241 | return MutationCommands{}; |
1242 | } |
1243 | |
1244 | auto begin = in_partition->second.upper_bound(part->info.getDataVersion()); |
1245 | |
1246 | auto end = in_partition->second.lower_bound(desired_mutation_version); |
1247 | if (end == in_partition->second.end() || end->first != desired_mutation_version) |
1248 | LOG_WARNING(log, "Mutation with version " << desired_mutation_version |
1249 | << " not found in partition ID " << part->info.partition_id |
1250 | << " (trying to mutate part " << part->name + ")" ); |
1251 | else |
1252 | ++end; |
1253 | |
1254 | MutationCommands commands; |
1255 | for (auto it = begin; it != end; ++it) |
1256 | commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end()); |
1257 | |
1258 | return commands; |
1259 | } |
1260 | |
1261 | |
1262 | bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper) |
1263 | { |
1264 | std::vector<ReplicatedMergeTreeMutationEntryPtr> candidates; |
1265 | { |
1266 | std::lock_guard lock(state_mutex); |
1267 | |
1268 | for (auto & kv : mutations_by_znode) |
1269 | { |
1270 | const String & znode = kv.first; |
1271 | MutationStatus & mutation = kv.second; |
1272 | |
1273 | if (mutation.is_done) |
1274 | continue; |
1275 | |
1276 | if (znode <= mutation_pointer) |
1277 | { |
1278 | LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")" ); |
1279 | mutation.is_done = true; |
1280 | } |
1281 | else if (mutation.parts_to_do == 0) |
1282 | { |
1283 | LOG_TRACE(log, "Will check if mutation " << mutation.entry->znode_name << " is done" ); |
1284 | candidates.push_back(mutation.entry); |
1285 | } |
1286 | } |
1287 | } |
1288 | |
1289 | if (candidates.empty()) |
1290 | return false; |
1291 | |
1292 | auto merge_pred = getMergePredicate(zookeeper); |
1293 | |
1294 | std::vector<const ReplicatedMergeTreeMutationEntry *> finished; |
1295 | for (const ReplicatedMergeTreeMutationEntryPtr & candidate : candidates) |
1296 | { |
1297 | if (merge_pred.isMutationFinished(*candidate)) |
1298 | finished.push_back(candidate.get()); |
1299 | } |
1300 | |
1301 | if (!finished.empty()) |
1302 | { |
1303 | zookeeper->set(replica_path + "/mutation_pointer" , finished.back()->znode_name); |
1304 | |
1305 | std::lock_guard lock(state_mutex); |
1306 | |
1307 | mutation_pointer = finished.back()->znode_name; |
1308 | |
1309 | for (const ReplicatedMergeTreeMutationEntry * entry : finished) |
1310 | { |
1311 | auto it = mutations_by_znode.find(entry->znode_name); |
1312 | if (it != mutations_by_znode.end()) |
1313 | { |
1314 | LOG_TRACE(log, "Mutation " << entry->znode_name << " is done" ); |
1315 | it->second.is_done = true; |
1316 | } |
1317 | } |
1318 | } |
1319 | |
1320 | return candidates.size() != finished.size(); |
1321 | } |
1322 | |
1323 | |
1324 | void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_name) |
1325 | { |
1326 | std::lock_guard lock(state_mutex); |
1327 | virtual_parts.add(part_name); |
1328 | } |
1329 | |
1330 | |
1331 | ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const |
1332 | { |
1333 | std::lock_guard lock(state_mutex); |
1334 | |
1335 | Status res; |
1336 | |
1337 | res.future_parts = future_parts.size(); |
1338 | res.queue_size = queue.size(); |
1339 | res.last_queue_update = last_queue_update; |
1340 | |
1341 | res.inserts_in_queue = 0; |
1342 | res.merges_in_queue = 0; |
1343 | res.part_mutations_in_queue = 0; |
1344 | res.queue_oldest_time = 0; |
1345 | res.inserts_oldest_time = 0; |
1346 | res.merges_oldest_time = 0; |
1347 | res.part_mutations_oldest_time = 0; |
1348 | |
1349 | for (const LogEntryPtr & entry : queue) |
1350 | { |
1351 | if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time)) |
1352 | res.queue_oldest_time = entry->create_time; |
1353 | |
1354 | if (entry->type == LogEntry::GET_PART) |
1355 | { |
1356 | ++res.inserts_in_queue; |
1357 | |
1358 | if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time)) |
1359 | { |
1360 | res.inserts_oldest_time = entry->create_time; |
1361 | res.oldest_part_to_get = entry->new_part_name; |
1362 | } |
1363 | } |
1364 | |
1365 | if (entry->type == LogEntry::MERGE_PARTS) |
1366 | { |
1367 | ++res.merges_in_queue; |
1368 | |
1369 | if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time)) |
1370 | { |
1371 | res.merges_oldest_time = entry->create_time; |
1372 | res.oldest_part_to_merge_to = entry->new_part_name; |
1373 | } |
1374 | } |
1375 | |
1376 | if (entry->type == LogEntry::MUTATE_PART) |
1377 | { |
1378 | ++res.part_mutations_in_queue; |
1379 | |
1380 | if (entry->create_time && (!res.part_mutations_oldest_time || entry->create_time < res.part_mutations_oldest_time)) |
1381 | { |
1382 | res.part_mutations_oldest_time = entry->create_time; |
1383 | res.oldest_part_to_mutate_to = entry->new_part_name; |
1384 | } |
1385 | } |
1386 | } |
1387 | |
1388 | return res; |
1389 | } |
1390 | |
1391 | |
1392 | void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const |
1393 | { |
1394 | res.clear(); |
1395 | std::lock_guard lock(state_mutex); |
1396 | |
1397 | res.reserve(queue.size()); |
1398 | for (const auto & entry : queue) |
1399 | res.emplace_back(*entry); |
1400 | } |
1401 | |
1402 | |
1403 | void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const |
1404 | { |
1405 | std::lock_guard lock(state_mutex); |
1406 | out_min_unprocessed_insert_time = min_unprocessed_insert_time; |
1407 | out_max_processed_insert_time = max_processed_insert_time; |
1408 | } |
1409 | |
1410 | |
1411 | std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatus() const |
1412 | { |
1413 | std::lock_guard lock(state_mutex); |
1414 | |
1415 | std::vector<MergeTreeMutationStatus> result; |
1416 | for (const auto & pair : mutations_by_znode) |
1417 | { |
1418 | const MutationStatus & status = pair.second; |
1419 | const ReplicatedMergeTreeMutationEntry & entry = *status.entry; |
1420 | const Names parts_to_mutate = getPartNamesToMutate(entry, current_parts); |
1421 | |
1422 | for (const MutationCommand & command : entry.commands) |
1423 | { |
1424 | std::stringstream ss; |
1425 | formatAST(*command.ast, ss, false, true); |
1426 | result.push_back(MergeTreeMutationStatus |
1427 | { |
1428 | entry.znode_name, |
1429 | ss.str(), |
1430 | entry.create_time, |
1431 | entry.block_numbers, |
1432 | parts_to_mutate, |
1433 | status.is_done, |
1434 | status.latest_failed_part, |
1435 | status.latest_fail_time, |
1436 | status.latest_fail_reason, |
1437 | }); |
1438 | } |
1439 | } |
1440 | |
1441 | return result; |
1442 | } |
1443 | |
1444 | |
1445 | ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( |
1446 | ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper) |
1447 | : queue(queue_) |
1448 | , prev_virtual_parts(queue.format_version) |
1449 | { |
1450 | { |
1451 | std::lock_guard lock(queue.state_mutex); |
1452 | prev_virtual_parts = queue.virtual_parts; |
1453 | } |
1454 | |
1455 | /// Load current quorum status. |
1456 | auto quorum_last_part_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/last_part" ); |
1457 | auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status" ); |
1458 | |
1459 | /// Load current inserts |
1460 | std::unordered_set<String> lock_holder_paths; |
1461 | for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp" )) |
1462 | { |
1463 | if (startsWith(entry, "abandonable_lock-" )) |
1464 | lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry); |
1465 | } |
1466 | |
1467 | if (!lock_holder_paths.empty()) |
1468 | { |
1469 | Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers" ); |
1470 | std::vector<std::future<Coordination::ListResponse>> lock_futures; |
1471 | for (const String & partition : partitions) |
1472 | lock_futures.push_back(zookeeper->asyncGetChildren(queue.zookeeper_path + "/block_numbers/" + partition)); |
1473 | |
1474 | struct BlockInfo_ |
1475 | { |
1476 | String partition; |
1477 | Int64 number; |
1478 | String zk_path; |
1479 | std::future<Coordination::GetResponse> contents_future; |
1480 | }; |
1481 | |
1482 | std::vector<BlockInfo_> block_infos; |
1483 | for (size_t i = 0; i < partitions.size(); ++i) |
1484 | { |
1485 | Strings partition_block_numbers = lock_futures[i].get().names; |
1486 | for (const String & entry : partition_block_numbers) |
1487 | { |
1488 | /// TODO: cache block numbers that are abandoned. |
1489 | /// We won't need to check them on the next iteration. |
1490 | if (startsWith(entry, "block-" )) |
1491 | { |
1492 | Int64 block_number = parse<Int64>(entry.substr(strlen("block-" ))); |
1493 | String zk_path = queue.zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry; |
1494 | block_infos.emplace_back( |
1495 | BlockInfo_{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)}); |
1496 | } |
1497 | } |
1498 | } |
1499 | |
1500 | for (auto & block : block_infos) |
1501 | { |
1502 | Coordination::GetResponse resp = block.contents_future.get(); |
1503 | if (!resp.error && lock_holder_paths.count(resp.data)) |
1504 | committing_blocks[block.partition].insert(block.number); |
1505 | } |
1506 | } |
1507 | |
1508 | queue_.pullLogsToQueue(zookeeper); |
1509 | |
1510 | Coordination::GetResponse quorum_last_part_response = quorum_last_part_future.get(); |
1511 | if (!quorum_last_part_response.error) |
1512 | { |
1513 | ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(queue.format_version); |
1514 | if (!quorum_last_part_response.data.empty()) |
1515 | { |
1516 | parts_with_quorum.fromString(quorum_last_part_response.data); |
1517 | last_quorum_parts.clear(); |
1518 | for (const auto & added_part : parts_with_quorum.added_parts) |
1519 | last_quorum_parts.emplace(added_part.second); |
1520 | } |
1521 | } |
1522 | |
1523 | Coordination::GetResponse quorum_status_response = quorum_status_future.get(); |
1524 | if (!quorum_status_response.error) |
1525 | { |
1526 | ReplicatedMergeTreeQuorumEntry quorum_status; |
1527 | quorum_status.fromString(quorum_status_response.data); |
1528 | inprogress_quorum_part = quorum_status.part_name; |
1529 | } |
1530 | else |
1531 | inprogress_quorum_part.clear(); |
1532 | } |
1533 | |
1534 | bool ReplicatedMergeTreeMergePredicate::operator()( |
1535 | const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, |
1536 | String * out_reason) const |
1537 | { |
1538 | /// A sketch of a proof of why this method actually works: |
1539 | /// |
1540 | /// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right. |
1541 | /// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h). |
1542 | /// These block numbers are monotonically increasing in a partition. |
1543 | /// |
1544 | /// Because there is a window between the moment the inserted part gets its block number and |
1545 | /// the moment it is committed (appears in the replication log), we can't get the name of all parts up to the given |
1546 | /// block number just by looking at the replication log - some parts with smaller block numbers may be currently committing |
1547 | /// and will appear in the log later than the parts with bigger block numbers. |
1548 | /// |
1549 | /// We also can't take a consistent snapshot of parts that are already committed plus parts that are about to commit |
1550 | /// due to limitations of ZooKeeper transactions. |
1551 | /// |
1552 | /// So we do the following (see the constructor): |
1553 | /// * copy virtual_parts from queue to prev_virtual_parts |
1554 | /// (a set of parts which corresponds to executing the replication log up to a certain point) |
1555 | /// * load committing_blocks (inserts and mutations that have already acquired a block number but haven't appeared in the log yet) |
1556 | /// * do pullLogsToQueue() again to load fresh queue.virtual_parts and mutations. |
1557 | /// |
1558 | /// Now we have an invariant: if some part is in prev_virtual_parts then: |
1559 | /// * all parts with smaller block numbers are either in committing_blocks or in queue.virtual_parts |
1560 | /// (those that managed to commit before we loaded committing_blocks). |
1561 | /// * all mutations with smaller block numbers are either in committing_blocks or in queue.mutations_by_partition |
1562 | /// |
1563 | /// So to check that no new parts will ever appear in the range of blocks between left and right we first check that |
1564 | /// left and right are already present in prev_virtual_parts (we can't give a definite answer for parts that were committed later) |
1565 | /// and then check that there are no blocks between them in committing_blocks and no parts in queue.virtual_parts. |
1566 | /// |
1567 | /// Similarly, to check that there will be no mutation with a block number between two parts from prev_virtual_parts |
1568 | /// (only then we can merge them without mutating the left part), we first check committing_blocks |
1569 | /// and then check that these two parts have the same mutation version according to queue.mutations_by_partition. |
1570 | |
1571 | if (left->info.partition_id != right->info.partition_id) |
1572 | { |
1573 | if (out_reason) |
1574 | *out_reason = "Parts " + left->name + " and " + right->name + " belong to different partitions" ; |
1575 | return false; |
1576 | } |
1577 | |
1578 | for (const MergeTreeData::DataPartPtr & part : {left, right}) |
1579 | { |
1580 | if (last_quorum_parts.find(part->name) != last_quorum_parts.end()) |
1581 | { |
1582 | if (out_reason) |
1583 | *out_reason = "Part " + part->name + " is the most recent part with a satisfied quorum" ; |
1584 | return false; |
1585 | } |
1586 | |
1587 | if (part->name == inprogress_quorum_part) |
1588 | { |
1589 | if (out_reason) |
1590 | *out_reason = "Quorum insert for part " + part->name + " is currently in progress" ; |
1591 | return false; |
1592 | } |
1593 | |
1594 | if (prev_virtual_parts.getContainingPart(part->info).empty()) |
1595 | { |
1596 | if (out_reason) |
1597 | *out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet" ; |
1598 | return false; |
1599 | } |
1600 | } |
1601 | |
1602 | Int64 left_max_block = left->info.max_block; |
1603 | Int64 right_min_block = right->info.min_block; |
1604 | if (left_max_block > right_min_block) |
1605 | std::swap(left_max_block, right_min_block); |
1606 | |
1607 | if (left_max_block + 1 < right_min_block) |
1608 | { |
1609 | auto committing_blocks_in_partition = committing_blocks.find(left->info.partition_id); |
1610 | if (committing_blocks_in_partition != committing_blocks.end()) |
1611 | { |
1612 | const std::set<Int64> & block_numbers = committing_blocks_in_partition->second; |
1613 | |
1614 | auto block_it = block_numbers.upper_bound(left_max_block); |
1615 | if (block_it != block_numbers.end() && *block_it < right_min_block) |
1616 | { |
1617 | if (out_reason) |
1618 | *out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts " |
1619 | + left->name + " and " + right->name; |
1620 | |
1621 | return false; |
1622 | } |
1623 | } |
1624 | } |
1625 | |
1626 | std::lock_guard lock(queue.state_mutex); |
1627 | |
1628 | for (const MergeTreeData::DataPartPtr & part : {left, right}) |
1629 | { |
1630 | /// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer |
1631 | /// and it is guaranteed that it will contain all merges assigned before this object is constructed. |
1632 | String containing_part = queue.virtual_parts.getContainingPart(part->info); |
1633 | if (containing_part != part->name) |
1634 | { |
1635 | if (out_reason) |
1636 | *out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part; |
1637 | return false; |
1638 | } |
1639 | } |
1640 | |
1641 | if (left_max_block + 1 < right_min_block) |
1642 | { |
1643 | /// Fake part which will appear as merge result |
1644 | MergeTreePartInfo gap_part_info( |
1645 | left->info.partition_id, left_max_block + 1, right_min_block - 1, |
1646 | MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER); |
1647 | |
1648 | /// We don't select parts if any smaller part covered by our merge must exist after |
1649 | /// processing replication log up to log_pointer. |
1650 | Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info); |
1651 | if (!covered.empty()) |
1652 | { |
1653 | if (out_reason) |
1654 | *out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front() |
1655 | + " to " + covered.back() + ") that are still not present or beeing processed by " |
1656 | + " other background process on this replica between " + left->name + " and " + right->name; |
1657 | return false; |
1658 | } |
1659 | } |
1660 | |
1661 | Int64 left_mutation_ver = queue.getCurrentMutationVersionImpl( |
1662 | left->info.partition_id, left->info.getDataVersion(), lock); |
1663 | |
1664 | Int64 right_mutation_ver = queue.getCurrentMutationVersionImpl( |
1665 | left->info.partition_id, right->info.getDataVersion(), lock); |
1666 | |
1667 | if (left_mutation_ver != right_mutation_ver) |
1668 | { |
1669 | if (out_reason) |
1670 | *out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: " |
1671 | + toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively" ; |
1672 | return false; |
1673 | } |
1674 | |
1675 | return true; |
1676 | } |
1677 | |
1678 | |
1679 | std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const |
1680 | { |
1681 | /// Assigning mutations is easier than assigning merges because mutations appear in the same order as |
1682 | /// the order of their version numbers (see StorageReplicatedMergeTree::mutate). |
1683 | /// This means that if we have loaded the mutation with version number X then all mutations with |
1684 | /// the version numbers less than X are also loaded and if there is no merge or mutation assigned to |
1685 | /// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to |
1686 | /// version X for this part. |
1687 | |
1688 | if (last_quorum_parts.find(part->name) != last_quorum_parts.end() |
1689 | || part->name == inprogress_quorum_part) |
1690 | return {}; |
1691 | |
1692 | std::lock_guard lock(queue.state_mutex); |
1693 | |
1694 | if (queue.virtual_parts.getContainingPart(part->info) != part->name) |
1695 | return {}; |
1696 | |
1697 | auto in_partition = queue.mutations_by_partition.find(part->info.partition_id); |
1698 | if (in_partition == queue.mutations_by_partition.end()) |
1699 | return {}; |
1700 | |
1701 | Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock); |
1702 | Int64 max_version = in_partition->second.rbegin()->first; |
1703 | if (current_version >= max_version) |
1704 | return {}; |
1705 | |
1706 | return max_version; |
1707 | } |
1708 | |
1709 | |
1710 | bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const |
1711 | { |
1712 | for (const auto & kv : mutation.block_numbers) |
1713 | { |
1714 | const String & partition_id = kv.first; |
1715 | Int64 block_num = kv.second; |
1716 | |
1717 | auto partition_it = committing_blocks.find(partition_id); |
1718 | if (partition_it != committing_blocks.end()) |
1719 | { |
1720 | size_t blocks_count = std::distance( |
1721 | partition_it->second.begin(), partition_it->second.lower_bound(block_num)); |
1722 | if (blocks_count) |
1723 | { |
1724 | LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because " |
1725 | << "in partition ID " << partition_id << " there are still " |
1726 | << blocks_count << " uncommitted blocks." ); |
1727 | return false; |
1728 | } |
1729 | } |
1730 | } |
1731 | |
1732 | { |
1733 | std::lock_guard lock(queue.state_mutex); |
1734 | |
1735 | size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts).size(); |
1736 | if (suddenly_appeared_parts) |
1737 | { |
1738 | LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because " |
1739 | << suddenly_appeared_parts << " parts to mutate suddenly appeared." ); |
1740 | return false; |
1741 | } |
1742 | } |
1743 | |
1744 | return true; |
1745 | } |
1746 | |
1747 | |
1748 | ReplicatedMergeTreeQueue::SubscriberHandler |
1749 | ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback) |
1750 | { |
1751 | std::lock_guard lock(state_mutex); |
1752 | std::lock_guard lock_subscribers(subscribers_mutex); |
1753 | |
1754 | auto it = subscribers.emplace(subscribers.end(), std::move(callback)); |
1755 | |
1756 | /// Atomically notify about current size |
1757 | (*it)(queue.size()); |
1758 | |
1759 | return SubscriberHandler(it, *this); |
1760 | } |
1761 | |
1762 | ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler() |
1763 | { |
1764 | std::lock_guard lock(queue.subscribers_mutex); |
1765 | queue.subscribers.erase(it); |
1766 | } |
1767 | |
1768 | void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size) |
1769 | { |
1770 | std::lock_guard lock_subscribers(subscribers_mutex); |
1771 | for (auto & subscriber_callback : subscribers) |
1772 | subscriber_callback(new_queue_size); |
1773 | } |
1774 | |
1775 | ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue() |
1776 | { |
1777 | notifySubscribers(0); |
1778 | } |
1779 | |
1780 | String padIndex(Int64 index) |
1781 | { |
1782 | String index_str = toString(index); |
1783 | return std::string(10 - index_str.size(), '0') + index_str; |
1784 | } |
1785 | |
1786 | } |
1787 | |