1#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
2#include <Storages/StorageReplicatedMergeTree.h>
3#include <IO/ReadHelpers.h>
4#include <IO/WriteHelpers.h>
5#include <Storages/MergeTree/MergeTreeDataPart.h>
6#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
7#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
8#include <Common/StringUtils/StringUtils.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
17 extern const int UNFINISHED;
18 extern const int PART_IS_TEMPORARILY_LOCKED;
19}
20
21
22ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_)
23 : storage(storage_)
24 , format_version(storage.format_version)
25 , current_parts(format_version)
26 , virtual_parts(format_version)
27{}
28
29
30void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
31{
32 std::lock_guard lock(state_mutex);
33
34 for (auto part : parts)
35 {
36 current_parts.add(part->name);
37 virtual_parts.add(part->name);
38 }
39}
40
41
42bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const
43{
44 std::lock_guard lock(state_mutex);
45 return virtual_parts.getContainingPart(data_part->info) != data_part->name;
46}
47
48bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
49{
50 auto queue_path = replica_path + "/queue";
51 LOG_DEBUG(log, "Loading queue from " << queue_path);
52
53 bool updated = false;
54 std::optional<time_t> min_unprocessed_insert_time_changed;
55
56 {
57 std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex);
58
59 String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
60 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
61
62 std::unordered_set<String> already_loaded_paths;
63 {
64 std::lock_guard lock(state_mutex);
65 for (const LogEntryPtr & log_entry : queue)
66 already_loaded_paths.insert(log_entry->znode_name);
67 }
68
69 Strings children = zookeeper->getChildren(queue_path);
70
71 auto to_remove_it = std::remove_if(
72 children.begin(), children.end(), [&](const String & path)
73 {
74 return already_loaded_paths.count(path);
75 });
76
77 LOG_DEBUG(log,
78 "Having " << (to_remove_it - children.begin()) << " queue entries to load, "
79 << (children.end() - to_remove_it) << " entries already loaded.");
80 children.erase(to_remove_it, children.end());
81
82 std::sort(children.begin(), children.end());
83
84 zkutil::AsyncResponses<Coordination::GetResponse> futures;
85 futures.reserve(children.size());
86
87 for (const String & child : children)
88 futures.emplace_back(child, zookeeper->asyncGet(queue_path + "/" + child));
89
90 for (auto & future : futures)
91 {
92 Coordination::GetResponse res = future.second.get();
93 LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
94 entry->znode_name = future.first;
95
96 std::lock_guard lock(state_mutex);
97
98 insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
99
100 updated = true;
101 }
102
103 zookeeper->tryGet(replica_path + "/mutation_pointer", mutation_pointer);
104 }
105
106 updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
107
108 LOG_TRACE(log, "Loaded queue");
109 return updated;
110}
111
112
113void ReplicatedMergeTreeQueue::initialize(
114 const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
115 const MergeTreeData::DataParts & parts)
116{
117 zookeeper_path = zookeeper_path_;
118 replica_path = replica_path_;
119 logger_name = logger_name_;
120 log = &Logger::get(logger_name);
121
122 addVirtualParts(parts);
123}
124
125
126void ReplicatedMergeTreeQueue::insertUnlocked(
127 const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
128 std::lock_guard<std::mutex> & /* state_lock */)
129{
130 for (const String & virtual_part_name : entry->getVirtualPartNames())
131 {
132 virtual_parts.add(virtual_part_name);
133 updateMutationsPartsToDo(virtual_part_name, /* add = */ true);
134 }
135
136 /// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
137 if (entry->type != LogEntry::DROP_RANGE)
138 queue.push_back(entry);
139 else
140 queue.push_front(entry);
141
142 if (entry->type == LogEntry::GET_PART)
143 {
144 inserts_by_time.insert(entry);
145
146 if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time))
147 {
148 min_unprocessed_insert_time = entry->create_time;
149 min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
150 }
151 }
152}
153
154
155void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
156{
157 std::optional<time_t> min_unprocessed_insert_time_changed;
158
159 {
160 std::lock_guard lock(state_mutex);
161 insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
162 }
163
164 updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
165}
166
167
168void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
169 const LogEntryPtr & entry,
170 bool is_successful,
171 std::optional<time_t> & min_unprocessed_insert_time_changed,
172 std::optional<time_t> & max_processed_insert_time_changed,
173 std::unique_lock<std::mutex> & /* queue_lock */)
174{
175 /// Update insert times.
176 if (entry->type == LogEntry::GET_PART)
177 {
178 inserts_by_time.erase(entry);
179
180 if (inserts_by_time.empty())
181 {
182 min_unprocessed_insert_time = 0;
183 min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
184 }
185 else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time)
186 {
187 min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time;
188 min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
189 }
190
191 if (entry->create_time > max_processed_insert_time)
192 {
193 max_processed_insert_time = entry->create_time;
194 max_processed_insert_time_changed = max_processed_insert_time;
195 }
196 }
197
198 if (is_successful)
199 {
200 for (const String & virtual_part_name : entry->getVirtualPartNames())
201 {
202 Strings replaced_parts;
203 current_parts.add(virtual_part_name, &replaced_parts);
204
205 /// Each part from `replaced_parts` should become Obsolete as a result of executing the entry.
206 /// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion()
207 for (const String & replaced_part_name : replaced_parts)
208 updateMutationsPartsToDo(replaced_part_name, /* add = */ false);
209 }
210
211 String drop_range_part_name;
212 if (entry->type == LogEntry::DROP_RANGE)
213 drop_range_part_name = entry->new_part_name;
214 else if (entry->type == LogEntry::REPLACE_RANGE)
215 drop_range_part_name = entry->replace_range_entry->drop_range_part_name;
216
217 if (!drop_range_part_name.empty())
218 {
219 current_parts.remove(drop_range_part_name);
220 virtual_parts.remove(drop_range_part_name);
221 }
222 }
223 else
224 {
225 for (const String & virtual_part_name : entry->getVirtualPartNames())
226 {
227 /// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear
228 /// so we won't need to mutate it.
229 updateMutationsPartsToDo(virtual_part_name, /* add = */ false);
230 }
231 }
232}
233
234
235void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name, bool add)
236{
237 auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
238 auto in_partition = mutations_by_partition.find(part_info.partition_id);
239 if (in_partition == mutations_by_partition.end())
240 return;
241
242 bool some_mutations_are_probably_done = false;
243
244 auto from_it = in_partition->second.upper_bound(part_info.getDataVersion());
245 for (auto it = from_it; it != in_partition->second.end(); ++it)
246 {
247 MutationStatus & status = *it->second;
248 status.parts_to_do += (add ? +1 : -1);
249 if (status.parts_to_do <= 0)
250 some_mutations_are_probably_done = true;
251
252 if (!add && !status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info))
253 {
254 status.latest_failed_part.clear();
255 status.latest_failed_part_info = MergeTreePartInfo();
256 status.latest_fail_time = 0;
257 status.latest_fail_reason.clear();
258 }
259 }
260
261 if (some_mutations_are_probably_done)
262 storage.mutations_finalizing_task->schedule();
263}
264
265
266void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
267 zkutil::ZooKeeperPtr zookeeper,
268 std::optional<time_t> min_unprocessed_insert_time_changed,
269 std::optional<time_t> max_processed_insert_time_changed) const
270{
271 /// Here there can be a race condition (with different remove at the same time)
272 /// because we update times in ZooKeeper with unlocked mutex, while these times may change.
273 /// Consider it unimportant (for a short time, ZK will have a slightly different time value).
274
275 Coordination::Requests ops;
276
277 if (min_unprocessed_insert_time_changed)
278 ops.emplace_back(zkutil::makeSetRequest(
279 replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
280
281 if (max_processed_insert_time_changed)
282 ops.emplace_back(zkutil::makeSetRequest(
283 replica_path + "/max_processed_insert_time", toString(*max_processed_insert_time_changed), -1));
284
285 if (!ops.empty())
286 {
287 Coordination::Responses responses;
288 auto code = zookeeper->tryMulti(ops, responses);
289
290 if (code)
291 LOG_ERROR(log, "Couldn't set value of nodes for insert times ("
292 << replica_path << "/min_unprocessed_insert_time, max_processed_insert_time)" << ": "
293 << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
294 }
295}
296
297
298void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
299{
300 auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
301
302 if (code)
303 LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
304 << zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
305
306 std::optional<time_t> min_unprocessed_insert_time_changed;
307 std::optional<time_t> max_processed_insert_time_changed;
308
309 bool found = false;
310 size_t queue_size = 0;
311
312 {
313 std::unique_lock lock(state_mutex);
314
315 /// Remove the job from the queue in the RAM.
316 /// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
317 /// Why do we view the queue from the end?
318 /// - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end.
319 for (Queue::iterator it = queue.end(); it != queue.begin();)
320 {
321 --it;
322 if (*it == entry)
323 {
324 found = true;
325 updateStateOnQueueEntryRemoval(
326 entry, /* is_successful = */ true,
327 min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
328
329 queue.erase(it);
330 queue_size = queue.size();
331 break;
332 }
333 }
334 }
335
336 if (!found)
337 throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug", ErrorCodes::LOGICAL_ERROR);
338
339 notifySubscribers(queue_size);
340
341 updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
342}
343
344
345bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
346{
347 LogEntryPtr found;
348 size_t queue_size = 0;
349
350 std::optional<time_t> min_unprocessed_insert_time_changed;
351 std::optional<time_t> max_processed_insert_time_changed;
352
353 {
354 std::unique_lock lock(state_mutex);
355
356 virtual_parts.remove(part_name);
357
358 for (Queue::iterator it = queue.begin(); it != queue.end();)
359 {
360 if ((*it)->new_part_name == part_name)
361 {
362 found = *it;
363 updateStateOnQueueEntryRemoval(
364 found, /* is_successful = */ false,
365 min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
366 queue.erase(it++);
367 queue_size = queue.size();
368 break;
369 }
370 else
371 ++it;
372 }
373 }
374
375 if (!found)
376 return false;
377
378 notifySubscribers(queue_size);
379
380 zookeeper->tryRemove(replica_path + "/queue/" + found->znode_name);
381 updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
382
383 return true;
384}
385
386
387bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info)
388{
389 std::lock_guard lock(state_mutex);
390 return virtual_parts.remove(part_info);
391}
392
393void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
394{
395 std::lock_guard lock(pull_logs_to_queue_mutex);
396
397 String index_str = zookeeper->get(replica_path + "/log_pointer");
398 UInt64 index;
399
400 Strings log_entries = zookeeper->getChildrenWatch(zookeeper_path + "/log", nullptr, watch_callback);
401
402 /// We update mutations after we have loaded the list of log entries, but before we insert them
403 /// in the queue.
404 /// With this we ensure that if you read the log state L1 and then the state of mutations M1,
405 /// then L1 "happened-before" M1.
406 updateMutations(zookeeper);
407
408 if (index_str.empty())
409 {
410 /// If we do not already have a pointer to the log, put a pointer to the first entry in it.
411 index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-")));
412
413 zookeeper->set(replica_path + "/log_pointer", toString(index));
414 }
415 else
416 {
417 index = parse<UInt64>(index_str);
418 }
419
420 String min_log_entry = "log-" + padIndex(index);
421
422 /// Multiple log entries that must be copied to the queue.
423
424 log_entries.erase(
425 std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }),
426 log_entries.end());
427
428 if (!log_entries.empty())
429 {
430 std::sort(log_entries.begin(), log_entries.end());
431
432 /// ZK contains a limit on the number or total size of operations in a multi-request.
433 /// If the limit is exceeded, the connection is simply closed.
434 /// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total.
435 /// The average size of the node value in this case is less than 10 kilobytes.
436 static constexpr auto MAX_MULTI_OPS = 100;
437
438 for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += MAX_MULTI_OPS)
439 {
440 auto begin = log_entries.begin() + entry_idx;
441 auto end = entry_idx + MAX_MULTI_OPS >= log_entries.size()
442 ? log_entries.end()
443 : (begin + MAX_MULTI_OPS);
444 auto last = end - 1;
445
446 String last_entry = *last;
447 if (!startsWith(last_entry, "log-"))
448 throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log",
449 ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER);
450
451 UInt64 last_entry_index = parse<UInt64>(last_entry.substr(strlen("log-")));
452
453 LOG_DEBUG(log, "Pulling " << (end - begin) << " entries to queue: " << *begin << " - " << *last);
454
455 zkutil::AsyncResponses<Coordination::GetResponse> futures;
456 futures.reserve(end - begin);
457
458 for (auto it = begin; it != end; ++it)
459 futures.emplace_back(*it, zookeeper->asyncGet(zookeeper_path + "/log/" + *it));
460
461 /// Simultaneously add all new entries to the queue and move the pointer to the log.
462
463 Coordination::Requests ops;
464 std::vector<LogEntryPtr> copied_entries;
465 copied_entries.reserve(end - begin);
466
467 std::optional<time_t> min_unprocessed_insert_time_changed;
468
469 for (auto & future : futures)
470 {
471 Coordination::GetResponse res = future.second.get();
472
473 copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));
474
475 ops.emplace_back(zkutil::makeCreateRequest(
476 replica_path + "/queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
477
478 const auto & entry = *copied_entries.back();
479 if (entry.type == LogEntry::GET_PART)
480 {
481 std::lock_guard state_lock(state_mutex);
482 if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time))
483 {
484 min_unprocessed_insert_time = entry.create_time;
485 min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
486 }
487 }
488 }
489
490 ops.emplace_back(zkutil::makeSetRequest(
491 replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
492
493 if (min_unprocessed_insert_time_changed)
494 ops.emplace_back(zkutil::makeSetRequest(
495 replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
496
497 auto responses = zookeeper->multi(ops);
498
499 /// Now we have successfully updated the queue in ZooKeeper. Update it in RAM.
500
501 try
502 {
503 std::lock_guard state_lock(state_mutex);
504
505 log_pointer = last_entry_index + 1;
506
507 for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx)
508 {
509 String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created;
510 copied_entries[copied_entry_idx]->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
511
512 std::optional<time_t> unused = false;
513 insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock);
514 }
515
516 last_queue_update = time(nullptr);
517 }
518 catch (...)
519 {
520 /// If it fails, the data in RAM is incorrect. In order to avoid possible further corruption of data in ZK, we will kill ourselves.
521 /// This is possible only if there is an unknown logical error.
522 std::terminate();
523 }
524
525 if (!copied_entries.empty())
526 LOG_DEBUG(log, "Pulled " << copied_entries.size() << " entries to queue.");
527 }
528
529 if (storage.queue_task_handle)
530 storage.queue_task_handle->wake();
531 }
532}
533
534
535static Names getPartNamesToMutate(
536 const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts)
537{
538 Names result;
539 for (const auto & pair : mutation.block_numbers)
540 {
541 const String & partition_id = pair.first;
542 Int64 block_num = pair.second;
543
544 /// Note that we cannot simply count all parts to mutate using getPartsCoveredBy(appropriate part_info)
545 /// because they are not consecutive in `parts`.
546 MergeTreePartInfo covering_part_info(
547 partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
548 for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info))
549 {
550 auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion());
551 if (part_info.getDataVersion() < block_num)
552 result.push_back(covered_part_name);
553 }
554 }
555
556 return result;
557}
558
559
560void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
561{
562 std::lock_guard lock(update_mutations_mutex);
563
564 Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations", nullptr, watch_callback);
565 StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end());
566
567 /// Compare with the local state, delete obsolete entries and determine which new entries to load.
568 Strings entries_to_load;
569 bool some_active_mutations_were_killed = false;
570 {
571 std::lock_guard state_lock(state_mutex);
572
573 for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end();)
574 {
575 const ReplicatedMergeTreeMutationEntry & entry = *it->second.entry;
576 if (!entries_in_zk_set.count(entry.znode_name))
577 {
578 if (!it->second.is_done)
579 {
580 LOG_DEBUG(log, "Removing killed mutation " + entry.znode_name + " from local state.");
581 some_active_mutations_were_killed = true;
582 }
583 else
584 LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state.");
585
586 for (const auto & partition_and_block_num : entry.block_numbers)
587 {
588 auto & in_partition = mutations_by_partition[partition_and_block_num.first];
589 in_partition.erase(partition_and_block_num.second);
590 if (in_partition.empty())
591 mutations_by_partition.erase(partition_and_block_num.first);
592 }
593
594 it = mutations_by_znode.erase(it);
595 }
596 else
597 ++it;
598 }
599
600 for (const String & znode : entries_in_zk_set)
601 {
602 if (!mutations_by_znode.count(znode))
603 entries_to_load.push_back(znode);
604 }
605 }
606
607 if (some_active_mutations_were_killed)
608 storage.queue_task_handle->wake();
609
610 if (!entries_to_load.empty())
611 {
612 LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: "
613 + entries_to_load.front() + " - " + entries_to_load.back());
614
615 std::vector<std::future<Coordination::GetResponse>> futures;
616 for (const String & entry : entries_to_load)
617 futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry));
618
619 std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations;
620 for (size_t i = 0; i < entries_to_load.size(); ++i)
621 {
622 new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>(
623 ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])));
624 }
625
626 bool some_mutations_are_probably_done = false;
627 {
628 std::lock_guard state_lock(state_mutex);
629
630 for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
631 {
632 auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry))
633 .first->second;
634
635 for (const auto & pair : entry->block_numbers)
636 {
637 const String & partition_id = pair.first;
638 Int64 block_num = pair.second;
639 mutations_by_partition[partition_id].emplace(block_num, &mutation);
640 }
641
642 /// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`.
643 mutation.parts_to_do += getPartNamesToMutate(*entry, current_parts).size();
644
645 /// And next we would need to mutate all parts with getDataVersion() greater than
646 /// mutation block number that would appear as a result of executing the queue.
647 for (const auto & queue_entry : queue)
648 {
649 for (const String & produced_part_name : queue_entry->getVirtualPartNames())
650 {
651 auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
652 auto it = entry->block_numbers.find(part_info.partition_id);
653 if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
654 ++mutation.parts_to_do;
655 }
656 }
657
658 if (mutation.parts_to_do == 0)
659 some_mutations_are_probably_done = true;
660 }
661 }
662
663 storage.merge_selecting_task->schedule();
664
665 if (some_mutations_are_probably_done)
666 storage.mutations_finalizing_task->schedule();
667 }
668}
669
670
671ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
672 zkutil::ZooKeeperPtr zookeeper, const String & mutation_id)
673{
674 std::lock_guard lock(update_mutations_mutex);
675
676 auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id);
677 if (rc == Coordination::ZOK)
678 LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper.");
679
680 ReplicatedMergeTreeMutationEntryPtr entry;
681 bool mutation_was_active = false;
682 {
683 std::lock_guard state_lock(state_mutex);
684
685 auto it = mutations_by_znode.find(mutation_id);
686 if (it == mutations_by_znode.end())
687 return nullptr;
688
689 mutation_was_active = !it->second.is_done;
690
691 entry = it->second.entry;
692 for (const auto & partition_and_block_num : entry->block_numbers)
693 {
694 auto & in_partition = mutations_by_partition[partition_and_block_num.first];
695 in_partition.erase(partition_and_block_num.second);
696 if (in_partition.empty())
697 mutations_by_partition.erase(partition_and_block_num.first);
698 }
699
700 mutations_by_znode.erase(it);
701 LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state.");
702 }
703
704 if (mutation_was_active)
705 storage.queue_task_handle->wake();
706
707 return entry;
708}
709
710
711ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name)
712{
713 std::lock_guard lock(state_mutex);
714
715 /// Let's find the action to merge this part with others. Let's remember others.
716 StringSet parts_for_merge;
717 Queue::iterator merge_entry = queue.end();
718 for (Queue::iterator it = queue.begin(); it != queue.end(); ++it)
719 {
720 if ((*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART)
721 {
722 if (std::find((*it)->source_parts.begin(), (*it)->source_parts.end(), part_name)
723 != (*it)->source_parts.end())
724 {
725 parts_for_merge = StringSet((*it)->source_parts.begin(), (*it)->source_parts.end());
726 merge_entry = it;
727 break;
728 }
729 }
730 }
731
732 if (!parts_for_merge.empty())
733 {
734 /// Move to the end of queue actions that result in one of the parts in `parts_for_merge`.
735 for (Queue::iterator it = queue.begin(); it != queue.end();)
736 {
737 auto it0 = it;
738 ++it;
739
740 if (it0 == merge_entry)
741 break;
742
743 if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART || (*it0)->type == LogEntry::MUTATE_PART)
744 && parts_for_merge.count((*it0)->new_part_name))
745 {
746 queue.splice(queue.end(), queue, it0, it);
747 }
748 }
749 }
750
751 return parts_for_merge;
752}
753
754bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const
755{
756 if (entry_ptr->type != LogEntry::REPLACE_RANGE)
757 return false;
758
759 if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE)
760 return false;
761
762 if (entry_ptr->replace_range_entry != nullptr && entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves
763 return false;
764
765 for (const String & new_part_name : entry_ptr->replace_range_entry->new_part_names)
766 if (!part_info.contains(MergeTreePartInfo::fromPartName(new_part_name, format_version)))
767 return false;
768
769 return true;
770}
771
772void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
773 zkutil::ZooKeeperPtr zookeeper,
774 const MergeTreePartInfo & part_info,
775 const ReplicatedMergeTreeLogEntryData & current)
776{
777 Queue to_wait;
778 size_t removed_entries = 0;
779 std::optional<time_t> min_unprocessed_insert_time_changed;
780 std::optional<time_t> max_processed_insert_time_changed;
781
782 /// Remove operations with parts, contained in the range to be deleted, from the queue.
783 std::unique_lock lock(state_mutex);
784 for (Queue::iterator it = queue.begin(); it != queue.end();)
785 {
786 auto type = (*it)->type;
787
788 if (((type == LogEntry::GET_PART || type == LogEntry::MERGE_PARTS || type == LogEntry::MUTATE_PART)
789 && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version)))
790 || checkReplaceRangeCanBeRemoved(part_info, *it, current))
791 {
792 if ((*it)->currently_executing)
793 to_wait.push_back(*it);
794 auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
795 if (code)
796 LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
797 << zkutil::ZooKeeper::error2string(code));
798
799 updateStateOnQueueEntryRemoval(
800 *it, /* is_successful = */ false,
801 min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
802 queue.erase(it++);
803 ++removed_entries;
804 }
805 else
806 ++it;
807 }
808
809 updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
810
811 LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. "
812 "Waiting for " << to_wait.size() << " entries that are currently executing.");
813
814 /// Let's wait for the operations with the parts contained in the range to be deleted.
815 for (LogEntryPtr & entry : to_wait)
816 entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
817}
818
819
820size_t ReplicatedMergeTreeQueue::getConflictsCountForRange(
821 const MergeTreePartInfo & range, const LogEntry & entry,
822 String * out_description, std::lock_guard<std::mutex> & /* queue_lock */) const
823{
824 std::vector<std::pair<String, LogEntryPtr>> conflicts;
825
826 for (auto & future_part_elem : future_parts)
827 {
828 /// Do not check itself log entry
829 if (future_part_elem.second->znode_name == entry.znode_name)
830 continue;
831
832 if (!range.isDisjoint(MergeTreePartInfo::fromPartName(future_part_elem.first, format_version)))
833 {
834 conflicts.emplace_back(future_part_elem.first, future_part_elem.second);
835 continue;
836 }
837 }
838
839 if (out_description)
840 {
841 std::stringstream ss;
842 ss << "Can't execute command for range " << range.getPartName() << " (entry " << entry.znode_name << "). ";
843 ss << "There are " << conflicts.size() << " currently executing entries blocking it: ";
844 for (const auto & conflict : conflicts)
845 ss << conflict.second->typeToString() << " part " << conflict.first << ", ";
846
847 *out_description = ss.str();
848 }
849
850 return conflicts.size();
851}
852
853
854void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry)
855{
856 String conflicts_description;
857 std::lock_guard lock(state_mutex);
858
859 if (0 != getConflictsCountForRange(range, entry, &conflicts_description, lock))
860 throw Exception(conflicts_description, ErrorCodes::UNFINISHED);
861}
862
863
864bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
865{
866 /// Let's check if the same part is now being created by another action.
867 if (future_parts.count(new_part_name))
868 {
869 out_reason = "Not executing log entry for part " + new_part_name
870 + " because another log entry for the same part is being processed. This shouldn't happen often.";
871 return false;
872
873 /** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
874 * and queue element will be processed.
875 * Immediately in the `executeLogEntry` function it will be found that we already have a part,
876 * and queue element will be immediately treated as processed.
877 */
878 }
879
880 /// A more complex check is whether another part is currently created by other action that will cover this part.
881 /// NOTE The above is redundant, but left for a more convenient message in the log.
882 auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);
883
884 /// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited.
885 for (const auto & future_part_elem : future_parts)
886 {
887 auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version);
888
889 if (future_part.contains(result_part))
890 {
891 out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part "
892 + future_part_elem.first + " that is currently executing";
893 return false;
894 }
895 }
896
897 return true;
898}
899
900bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason)
901{
902 std::lock_guard lock(state_mutex);
903
904 if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock))
905 {
906 CurrentlyExecuting::setActualPartName(entry, part_name, *this);
907 return true;
908 }
909
910 return false;
911}
912
913
914bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
915 const LogEntry & entry,
916 String & out_postpone_reason,
917 MergeTreeDataMergerMutator & merger_mutator,
918 MergeTreeData & data,
919 std::lock_guard<std::mutex> & queue_lock) const
920{
921 if (entry.type == LogEntry::MERGE_PARTS
922 || entry.type == LogEntry::GET_PART
923 || entry.type == LogEntry::MUTATE_PART)
924 {
925 for (const String & new_part_name : entry.getBlockingPartNames())
926 {
927 if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, queue_lock))
928 {
929 if (!out_postpone_reason.empty())
930 LOG_DEBUG(log, out_postpone_reason);
931 return false;
932 }
933 }
934 }
935
936 if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)
937 {
938 /** If any of the required parts are now fetched or in merge process, wait for the end of this operation.
939 * Otherwise, even if all the necessary parts for the merge are not present, you should try to make a merge.
940 * If any parts are missing, instead of merge, there will be an attempt to download a part.
941 * Such a situation is possible if the receive of a part has failed, and it was moved to the end of the queue.
942 */
943 size_t sum_parts_size_in_bytes = 0;
944 for (const auto & name : entry.source_parts)
945 {
946 if (future_parts.count(name))
947 {
948 String reason = "Not merging into part " + entry.new_part_name
949 + " because part " + name + " is not ready yet (log entry for that part is being processed).";
950 LOG_TRACE(log, reason);
951 out_postpone_reason = reason;
952 return false;
953 }
954
955 auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
956 if (part)
957 sum_parts_size_in_bytes += part->bytes_on_disk;
958 }
959
960 if (merger_mutator.merges_blocker.isCancelled())
961 {
962 String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now.";
963 LOG_DEBUG(log, reason);
964 out_postpone_reason = reason;
965 return false;
966 }
967
968 UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge()
969 : merger_mutator.getMaxSourcePartSizeForMutation();
970 /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),
971 * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,
972 * because it may be ordered by OPTIMIZE or early with different settings.
973 * Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,
974 * because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
975 */
976 const auto data_settings = data.getSettings();
977 bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool);
978
979 if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
980 {
981 String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name
982 + " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes)
983 + ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ").";
984 LOG_DEBUG(log, reason);
985 out_postpone_reason = reason;
986 return false;
987 }
988 }
989
990 /// TODO: it makes sense to check DROP_RANGE also
991 if (entry.type == LogEntry::CLEAR_COLUMN || entry.type == LogEntry::REPLACE_RANGE)
992 {
993 String conflicts_description;
994 String range_name = (entry.type == LogEntry::REPLACE_RANGE) ? entry.replace_range_entry->drop_range_part_name : entry.new_part_name;
995 auto range = MergeTreePartInfo::fromPartName(range_name, format_version);
996
997 if (0 != getConflictsCountForRange(range, entry, &conflicts_description, queue_lock))
998 {
999 LOG_DEBUG(log, conflicts_description);
1000 return false;
1001 }
1002 }
1003
1004 return true;
1005}
1006
1007
1008Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl(
1009 const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const
1010{
1011 auto in_partition = mutations_by_partition.find(partition_id);
1012 if (in_partition == mutations_by_partition.end())
1013 return 0;
1014
1015 auto it = in_partition->second.upper_bound(data_version);
1016 if (it == in_partition->second.begin())
1017 return 0;
1018
1019 --it;
1020 return it->first;
1021}
1022
1023
1024Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partition_id, Int64 data_version) const
1025{
1026 std::lock_guard lock(state_mutex);
1027 return getCurrentMutationVersionImpl(partition_id, data_version, lock);
1028}
1029
1030
1031ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_)
1032 : entry(entry_), queue(queue_)
1033{
1034 entry->currently_executing = true;
1035 ++entry->num_tries;
1036 entry->last_attempt_time = time(nullptr);
1037
1038 for (const String & new_part_name : entry->getBlockingPartNames())
1039 {
1040 if (!queue.future_parts.emplace(new_part_name, entry).second)
1041 throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
1042 }
1043}
1044
1045
1046void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry,
1047 const String & actual_part_name, ReplicatedMergeTreeQueue & queue)
1048{
1049 if (!entry.actual_new_part_name.empty())
1050 throw Exception("Entry actual part isn't empty yet. This is a bug.", ErrorCodes::LOGICAL_ERROR);
1051
1052 entry.actual_new_part_name = actual_part_name;
1053
1054 /// Check if it is the same (and already added) part.
1055 if (entry.actual_new_part_name == entry.new_part_name)
1056 return;
1057
1058 if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
1059 throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
1060}
1061
1062
1063ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
1064{
1065 std::lock_guard lock(queue.state_mutex);
1066
1067 entry->currently_executing = false;
1068 entry->execution_complete.notify_all();
1069
1070 for (const String & new_part_name : entry->getBlockingPartNames())
1071 {
1072 if (!queue.future_parts.erase(new_part_name))
1073 LOG_ERROR(queue.log, "Untagging already untagged future part " + new_part_name + ". This is a bug.");
1074 }
1075
1076 if (!entry->actual_new_part_name.empty())
1077 {
1078 if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
1079 LOG_ERROR(queue.log, "Untagging already untagged future part " + entry->actual_new_part_name + ". This is a bug.");
1080
1081 entry->actual_new_part_name.clear();
1082 }
1083}
1084
1085
1086ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data)
1087{
1088 LogEntryPtr entry;
1089
1090 std::lock_guard lock(state_mutex);
1091
1092 for (auto it = queue.begin(); it != queue.end(); ++it)
1093 {
1094 if ((*it)->currently_executing)
1095 continue;
1096
1097 if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock))
1098 {
1099 entry = *it;
1100 queue.splice(queue.end(), queue, it);
1101 break;
1102 }
1103 else
1104 {
1105 ++(*it)->num_postponed;
1106 (*it)->last_postpone_time = time(nullptr);
1107 }
1108 }
1109
1110 if (entry)
1111 return { entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) } };
1112 else
1113 return {};
1114}
1115
1116
1117bool ReplicatedMergeTreeQueue::processEntry(
1118 std::function<zkutil::ZooKeeperPtr()> get_zookeeper,
1119 LogEntryPtr & entry,
1120 const std::function<bool(LogEntryPtr &)> func)
1121{
1122 std::exception_ptr saved_exception;
1123
1124 try
1125 {
1126 if (func(entry))
1127 removeProcessedEntry(get_zookeeper(), entry);
1128 }
1129 catch (...)
1130 {
1131 saved_exception = std::current_exception();
1132 }
1133
1134 if (saved_exception)
1135 {
1136 std::lock_guard lock(state_mutex);
1137
1138 entry->exception = saved_exception;
1139
1140 if (entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
1141 {
1142 /// Record the exception in the system.mutations table.
1143 Int64 result_data_version = MergeTreePartInfo::fromPartName(entry->new_part_name, format_version)
1144 .getDataVersion();
1145 auto source_part_info = MergeTreePartInfo::fromPartName(
1146 entry->source_parts.at(0), format_version);
1147
1148 auto in_partition = mutations_by_partition.find(source_part_info.partition_id);
1149 if (in_partition != mutations_by_partition.end())
1150 {
1151 auto mutations_begin_it = in_partition->second.upper_bound(source_part_info.getDataVersion());
1152 auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
1153 for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
1154 {
1155 MutationStatus & status = *it->second;
1156 status.latest_failed_part = entry->source_parts.at(0);
1157 status.latest_failed_part_info = source_part_info;
1158 status.latest_fail_time = time(nullptr);
1159 status.latest_fail_reason = getExceptionMessage(saved_exception, false);
1160 }
1161 }
1162 }
1163
1164 return false;
1165 }
1166
1167 return true;
1168}
1169
1170
1171std::pair<size_t, size_t> ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
1172{
1173 std::lock_guard lock(state_mutex);
1174
1175 size_t count_merges = 0;
1176 size_t count_mutations = 0;
1177 for (const auto & entry : queue)
1178 {
1179 if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS)
1180 ++count_merges;
1181 else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
1182 ++count_mutations;
1183 }
1184
1185 return std::make_pair(count_merges, count_mutations);
1186}
1187
1188
1189size_t ReplicatedMergeTreeQueue::countMutations() const
1190{
1191 std::lock_guard lock(state_mutex);
1192 return mutations_by_znode.size();
1193}
1194
1195
1196size_t ReplicatedMergeTreeQueue::countFinishedMutations() const
1197{
1198 std::lock_guard lock(state_mutex);
1199
1200 size_t count = 0;
1201 for (const auto & pair : mutations_by_znode)
1202 {
1203 const auto & mutation = pair.second;
1204 if (!mutation.is_done)
1205 break;
1206
1207 ++count;
1208 }
1209
1210 return count;
1211}
1212
1213
1214ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper)
1215{
1216 return ReplicatedMergeTreeMergePredicate(*this, zookeeper);
1217}
1218
1219
1220MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
1221 const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const
1222{
1223 /// NOTE: If the corresponding mutation is not found, the error is logged (and not thrown as an exception)
1224 /// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry
1225 /// from /mutations in ZK and the replicas will simply skip the mutation.
1226
1227 if (part->info.getDataVersion() > desired_mutation_version)
1228 {
1229 LOG_WARNING(log, "Data version of part " << part->name << " is already greater than "
1230 "desired mutation version " << desired_mutation_version);
1231 return MutationCommands{};
1232 }
1233
1234 std::lock_guard lock(state_mutex);
1235
1236 auto in_partition = mutations_by_partition.find(part->info.partition_id);
1237 if (in_partition == mutations_by_partition.end())
1238 {
1239 LOG_WARNING(log, "There are no mutations for partition ID " << part->info.partition_id
1240 << " (trying to mutate part " << part->name << " to " << toString(desired_mutation_version) << ")");
1241 return MutationCommands{};
1242 }
1243
1244 auto begin = in_partition->second.upper_bound(part->info.getDataVersion());
1245
1246 auto end = in_partition->second.lower_bound(desired_mutation_version);
1247 if (end == in_partition->second.end() || end->first != desired_mutation_version)
1248 LOG_WARNING(log, "Mutation with version " << desired_mutation_version
1249 << " not found in partition ID " << part->info.partition_id
1250 << " (trying to mutate part " << part->name + ")");
1251 else
1252 ++end;
1253
1254 MutationCommands commands;
1255 for (auto it = begin; it != end; ++it)
1256 commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end());
1257
1258 return commands;
1259}
1260
1261
1262bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper)
1263{
1264 std::vector<ReplicatedMergeTreeMutationEntryPtr> candidates;
1265 {
1266 std::lock_guard lock(state_mutex);
1267
1268 for (auto & kv : mutations_by_znode)
1269 {
1270 const String & znode = kv.first;
1271 MutationStatus & mutation = kv.second;
1272
1273 if (mutation.is_done)
1274 continue;
1275
1276 if (znode <= mutation_pointer)
1277 {
1278 LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")");
1279 mutation.is_done = true;
1280 }
1281 else if (mutation.parts_to_do == 0)
1282 {
1283 LOG_TRACE(log, "Will check if mutation " << mutation.entry->znode_name << " is done");
1284 candidates.push_back(mutation.entry);
1285 }
1286 }
1287 }
1288
1289 if (candidates.empty())
1290 return false;
1291
1292 auto merge_pred = getMergePredicate(zookeeper);
1293
1294 std::vector<const ReplicatedMergeTreeMutationEntry *> finished;
1295 for (const ReplicatedMergeTreeMutationEntryPtr & candidate : candidates)
1296 {
1297 if (merge_pred.isMutationFinished(*candidate))
1298 finished.push_back(candidate.get());
1299 }
1300
1301 if (!finished.empty())
1302 {
1303 zookeeper->set(replica_path + "/mutation_pointer", finished.back()->znode_name);
1304
1305 std::lock_guard lock(state_mutex);
1306
1307 mutation_pointer = finished.back()->znode_name;
1308
1309 for (const ReplicatedMergeTreeMutationEntry * entry : finished)
1310 {
1311 auto it = mutations_by_znode.find(entry->znode_name);
1312 if (it != mutations_by_znode.end())
1313 {
1314 LOG_TRACE(log, "Mutation " << entry->znode_name << " is done");
1315 it->second.is_done = true;
1316 }
1317 }
1318 }
1319
1320 return candidates.size() != finished.size();
1321}
1322
1323
1324void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_name)
1325{
1326 std::lock_guard lock(state_mutex);
1327 virtual_parts.add(part_name);
1328}
1329
1330
1331ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
1332{
1333 std::lock_guard lock(state_mutex);
1334
1335 Status res;
1336
1337 res.future_parts = future_parts.size();
1338 res.queue_size = queue.size();
1339 res.last_queue_update = last_queue_update;
1340
1341 res.inserts_in_queue = 0;
1342 res.merges_in_queue = 0;
1343 res.part_mutations_in_queue = 0;
1344 res.queue_oldest_time = 0;
1345 res.inserts_oldest_time = 0;
1346 res.merges_oldest_time = 0;
1347 res.part_mutations_oldest_time = 0;
1348
1349 for (const LogEntryPtr & entry : queue)
1350 {
1351 if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time))
1352 res.queue_oldest_time = entry->create_time;
1353
1354 if (entry->type == LogEntry::GET_PART)
1355 {
1356 ++res.inserts_in_queue;
1357
1358 if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time))
1359 {
1360 res.inserts_oldest_time = entry->create_time;
1361 res.oldest_part_to_get = entry->new_part_name;
1362 }
1363 }
1364
1365 if (entry->type == LogEntry::MERGE_PARTS)
1366 {
1367 ++res.merges_in_queue;
1368
1369 if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time))
1370 {
1371 res.merges_oldest_time = entry->create_time;
1372 res.oldest_part_to_merge_to = entry->new_part_name;
1373 }
1374 }
1375
1376 if (entry->type == LogEntry::MUTATE_PART)
1377 {
1378 ++res.part_mutations_in_queue;
1379
1380 if (entry->create_time && (!res.part_mutations_oldest_time || entry->create_time < res.part_mutations_oldest_time))
1381 {
1382 res.part_mutations_oldest_time = entry->create_time;
1383 res.oldest_part_to_mutate_to = entry->new_part_name;
1384 }
1385 }
1386 }
1387
1388 return res;
1389}
1390
1391
1392void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const
1393{
1394 res.clear();
1395 std::lock_guard lock(state_mutex);
1396
1397 res.reserve(queue.size());
1398 for (const auto & entry : queue)
1399 res.emplace_back(*entry);
1400}
1401
1402
1403void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const
1404{
1405 std::lock_guard lock(state_mutex);
1406 out_min_unprocessed_insert_time = min_unprocessed_insert_time;
1407 out_max_processed_insert_time = max_processed_insert_time;
1408}
1409
1410
1411std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatus() const
1412{
1413 std::lock_guard lock(state_mutex);
1414
1415 std::vector<MergeTreeMutationStatus> result;
1416 for (const auto & pair : mutations_by_znode)
1417 {
1418 const MutationStatus & status = pair.second;
1419 const ReplicatedMergeTreeMutationEntry & entry = *status.entry;
1420 const Names parts_to_mutate = getPartNamesToMutate(entry, current_parts);
1421
1422 for (const MutationCommand & command : entry.commands)
1423 {
1424 std::stringstream ss;
1425 formatAST(*command.ast, ss, false, true);
1426 result.push_back(MergeTreeMutationStatus
1427 {
1428 entry.znode_name,
1429 ss.str(),
1430 entry.create_time,
1431 entry.block_numbers,
1432 parts_to_mutate,
1433 status.is_done,
1434 status.latest_failed_part,
1435 status.latest_fail_time,
1436 status.latest_fail_reason,
1437 });
1438 }
1439 }
1440
1441 return result;
1442}
1443
1444
1445ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
1446 ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper)
1447 : queue(queue_)
1448 , prev_virtual_parts(queue.format_version)
1449{
1450 {
1451 std::lock_guard lock(queue.state_mutex);
1452 prev_virtual_parts = queue.virtual_parts;
1453 }
1454
1455 /// Load current quorum status.
1456 auto quorum_last_part_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/last_part");
1457 auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status");
1458
1459 /// Load current inserts
1460 std::unordered_set<String> lock_holder_paths;
1461 for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp"))
1462 {
1463 if (startsWith(entry, "abandonable_lock-"))
1464 lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry);
1465 }
1466
1467 if (!lock_holder_paths.empty())
1468 {
1469 Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers");
1470 std::vector<std::future<Coordination::ListResponse>> lock_futures;
1471 for (const String & partition : partitions)
1472 lock_futures.push_back(zookeeper->asyncGetChildren(queue.zookeeper_path + "/block_numbers/" + partition));
1473
1474 struct BlockInfo_
1475 {
1476 String partition;
1477 Int64 number;
1478 String zk_path;
1479 std::future<Coordination::GetResponse> contents_future;
1480 };
1481
1482 std::vector<BlockInfo_> block_infos;
1483 for (size_t i = 0; i < partitions.size(); ++i)
1484 {
1485 Strings partition_block_numbers = lock_futures[i].get().names;
1486 for (const String & entry : partition_block_numbers)
1487 {
1488 /// TODO: cache block numbers that are abandoned.
1489 /// We won't need to check them on the next iteration.
1490 if (startsWith(entry, "block-"))
1491 {
1492 Int64 block_number = parse<Int64>(entry.substr(strlen("block-")));
1493 String zk_path = queue.zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry;
1494 block_infos.emplace_back(
1495 BlockInfo_{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)});
1496 }
1497 }
1498 }
1499
1500 for (auto & block : block_infos)
1501 {
1502 Coordination::GetResponse resp = block.contents_future.get();
1503 if (!resp.error && lock_holder_paths.count(resp.data))
1504 committing_blocks[block.partition].insert(block.number);
1505 }
1506 }
1507
1508 queue_.pullLogsToQueue(zookeeper);
1509
1510 Coordination::GetResponse quorum_last_part_response = quorum_last_part_future.get();
1511 if (!quorum_last_part_response.error)
1512 {
1513 ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(queue.format_version);
1514 if (!quorum_last_part_response.data.empty())
1515 {
1516 parts_with_quorum.fromString(quorum_last_part_response.data);
1517 last_quorum_parts.clear();
1518 for (const auto & added_part : parts_with_quorum.added_parts)
1519 last_quorum_parts.emplace(added_part.second);
1520 }
1521 }
1522
1523 Coordination::GetResponse quorum_status_response = quorum_status_future.get();
1524 if (!quorum_status_response.error)
1525 {
1526 ReplicatedMergeTreeQuorumEntry quorum_status;
1527 quorum_status.fromString(quorum_status_response.data);
1528 inprogress_quorum_part = quorum_status.part_name;
1529 }
1530 else
1531 inprogress_quorum_part.clear();
1532}
1533
1534bool ReplicatedMergeTreeMergePredicate::operator()(
1535 const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
1536 String * out_reason) const
1537{
1538 /// A sketch of a proof of why this method actually works:
1539 ///
1540 /// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right.
1541 /// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h).
1542 /// These block numbers are monotonically increasing in a partition.
1543 ///
1544 /// Because there is a window between the moment the inserted part gets its block number and
1545 /// the moment it is committed (appears in the replication log), we can't get the name of all parts up to the given
1546 /// block number just by looking at the replication log - some parts with smaller block numbers may be currently committing
1547 /// and will appear in the log later than the parts with bigger block numbers.
1548 ///
1549 /// We also can't take a consistent snapshot of parts that are already committed plus parts that are about to commit
1550 /// due to limitations of ZooKeeper transactions.
1551 ///
1552 /// So we do the following (see the constructor):
1553 /// * copy virtual_parts from queue to prev_virtual_parts
1554 /// (a set of parts which corresponds to executing the replication log up to a certain point)
1555 /// * load committing_blocks (inserts and mutations that have already acquired a block number but haven't appeared in the log yet)
1556 /// * do pullLogsToQueue() again to load fresh queue.virtual_parts and mutations.
1557 ///
1558 /// Now we have an invariant: if some part is in prev_virtual_parts then:
1559 /// * all parts with smaller block numbers are either in committing_blocks or in queue.virtual_parts
1560 /// (those that managed to commit before we loaded committing_blocks).
1561 /// * all mutations with smaller block numbers are either in committing_blocks or in queue.mutations_by_partition
1562 ///
1563 /// So to check that no new parts will ever appear in the range of blocks between left and right we first check that
1564 /// left and right are already present in prev_virtual_parts (we can't give a definite answer for parts that were committed later)
1565 /// and then check that there are no blocks between them in committing_blocks and no parts in queue.virtual_parts.
1566 ///
1567 /// Similarly, to check that there will be no mutation with a block number between two parts from prev_virtual_parts
1568 /// (only then we can merge them without mutating the left part), we first check committing_blocks
1569 /// and then check that these two parts have the same mutation version according to queue.mutations_by_partition.
1570
1571 if (left->info.partition_id != right->info.partition_id)
1572 {
1573 if (out_reason)
1574 *out_reason = "Parts " + left->name + " and " + right->name + " belong to different partitions";
1575 return false;
1576 }
1577
1578 for (const MergeTreeData::DataPartPtr & part : {left, right})
1579 {
1580 if (last_quorum_parts.find(part->name) != last_quorum_parts.end())
1581 {
1582 if (out_reason)
1583 *out_reason = "Part " + part->name + " is the most recent part with a satisfied quorum";
1584 return false;
1585 }
1586
1587 if (part->name == inprogress_quorum_part)
1588 {
1589 if (out_reason)
1590 *out_reason = "Quorum insert for part " + part->name + " is currently in progress";
1591 return false;
1592 }
1593
1594 if (prev_virtual_parts.getContainingPart(part->info).empty())
1595 {
1596 if (out_reason)
1597 *out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
1598 return false;
1599 }
1600 }
1601
1602 Int64 left_max_block = left->info.max_block;
1603 Int64 right_min_block = right->info.min_block;
1604 if (left_max_block > right_min_block)
1605 std::swap(left_max_block, right_min_block);
1606
1607 if (left_max_block + 1 < right_min_block)
1608 {
1609 auto committing_blocks_in_partition = committing_blocks.find(left->info.partition_id);
1610 if (committing_blocks_in_partition != committing_blocks.end())
1611 {
1612 const std::set<Int64> & block_numbers = committing_blocks_in_partition->second;
1613
1614 auto block_it = block_numbers.upper_bound(left_max_block);
1615 if (block_it != block_numbers.end() && *block_it < right_min_block)
1616 {
1617 if (out_reason)
1618 *out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts "
1619 + left->name + " and " + right->name;
1620
1621 return false;
1622 }
1623 }
1624 }
1625
1626 std::lock_guard lock(queue.state_mutex);
1627
1628 for (const MergeTreeData::DataPartPtr & part : {left, right})
1629 {
1630 /// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
1631 /// and it is guaranteed that it will contain all merges assigned before this object is constructed.
1632 String containing_part = queue.virtual_parts.getContainingPart(part->info);
1633 if (containing_part != part->name)
1634 {
1635 if (out_reason)
1636 *out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part;
1637 return false;
1638 }
1639 }
1640
1641 if (left_max_block + 1 < right_min_block)
1642 {
1643 /// Fake part which will appear as merge result
1644 MergeTreePartInfo gap_part_info(
1645 left->info.partition_id, left_max_block + 1, right_min_block - 1,
1646 MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
1647
1648 /// We don't select parts if any smaller part covered by our merge must exist after
1649 /// processing replication log up to log_pointer.
1650 Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
1651 if (!covered.empty())
1652 {
1653 if (out_reason)
1654 *out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
1655 + " to " + covered.back() + ") that are still not present or beeing processed by "
1656 + " other background process on this replica between " + left->name + " and " + right->name;
1657 return false;
1658 }
1659 }
1660
1661 Int64 left_mutation_ver = queue.getCurrentMutationVersionImpl(
1662 left->info.partition_id, left->info.getDataVersion(), lock);
1663
1664 Int64 right_mutation_ver = queue.getCurrentMutationVersionImpl(
1665 left->info.partition_id, right->info.getDataVersion(), lock);
1666
1667 if (left_mutation_ver != right_mutation_ver)
1668 {
1669 if (out_reason)
1670 *out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: "
1671 + toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively";
1672 return false;
1673 }
1674
1675 return true;
1676}
1677
1678
1679std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
1680{
1681 /// Assigning mutations is easier than assigning merges because mutations appear in the same order as
1682 /// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
1683 /// This means that if we have loaded the mutation with version number X then all mutations with
1684 /// the version numbers less than X are also loaded and if there is no merge or mutation assigned to
1685 /// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to
1686 /// version X for this part.
1687
1688 if (last_quorum_parts.find(part->name) != last_quorum_parts.end()
1689 || part->name == inprogress_quorum_part)
1690 return {};
1691
1692 std::lock_guard lock(queue.state_mutex);
1693
1694 if (queue.virtual_parts.getContainingPart(part->info) != part->name)
1695 return {};
1696
1697 auto in_partition = queue.mutations_by_partition.find(part->info.partition_id);
1698 if (in_partition == queue.mutations_by_partition.end())
1699 return {};
1700
1701 Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock);
1702 Int64 max_version = in_partition->second.rbegin()->first;
1703 if (current_version >= max_version)
1704 return {};
1705
1706 return max_version;
1707}
1708
1709
1710bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const
1711{
1712 for (const auto & kv : mutation.block_numbers)
1713 {
1714 const String & partition_id = kv.first;
1715 Int64 block_num = kv.second;
1716
1717 auto partition_it = committing_blocks.find(partition_id);
1718 if (partition_it != committing_blocks.end())
1719 {
1720 size_t blocks_count = std::distance(
1721 partition_it->second.begin(), partition_it->second.lower_bound(block_num));
1722 if (blocks_count)
1723 {
1724 LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because "
1725 << "in partition ID " << partition_id << " there are still "
1726 << blocks_count << " uncommitted blocks.");
1727 return false;
1728 }
1729 }
1730 }
1731
1732 {
1733 std::lock_guard lock(queue.state_mutex);
1734
1735 size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts).size();
1736 if (suddenly_appeared_parts)
1737 {
1738 LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because "
1739 << suddenly_appeared_parts << " parts to mutate suddenly appeared.");
1740 return false;
1741 }
1742 }
1743
1744 return true;
1745}
1746
1747
1748ReplicatedMergeTreeQueue::SubscriberHandler
1749ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)
1750{
1751 std::lock_guard lock(state_mutex);
1752 std::lock_guard lock_subscribers(subscribers_mutex);
1753
1754 auto it = subscribers.emplace(subscribers.end(), std::move(callback));
1755
1756 /// Atomically notify about current size
1757 (*it)(queue.size());
1758
1759 return SubscriberHandler(it, *this);
1760}
1761
1762ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
1763{
1764 std::lock_guard lock(queue.subscribers_mutex);
1765 queue.subscribers.erase(it);
1766}
1767
1768void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size)
1769{
1770 std::lock_guard lock_subscribers(subscribers_mutex);
1771 for (auto & subscriber_callback : subscribers)
1772 subscriber_callback(new_queue_size);
1773}
1774
1775ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
1776{
1777 notifySubscribers(0);
1778}
1779
1780String padIndex(Int64 index)
1781{
1782 String index_str = toString(index);
1783 return std::string(10 - index_str.size(), '0') + index_str;
1784}
1785
1786}
1787