| 1 | #include <Storages/MergeTree/ReplicatedMergeTreeQueue.h> |
| 2 | #include <Storages/StorageReplicatedMergeTree.h> |
| 3 | #include <IO/ReadHelpers.h> |
| 4 | #include <IO/WriteHelpers.h> |
| 5 | #include <Storages/MergeTree/MergeTreeDataPart.h> |
| 6 | #include <Storages/MergeTree/MergeTreeDataMergerMutator.h> |
| 7 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> |
| 8 | #include <Common/StringUtils/StringUtils.h> |
| 9 | |
| 10 | |
| 11 | namespace DB |
| 12 | { |
| 13 | |
| 14 | namespace ErrorCodes |
| 15 | { |
| 16 | extern const int UNEXPECTED_NODE_IN_ZOOKEEPER; |
| 17 | extern const int UNFINISHED; |
| 18 | extern const int PART_IS_TEMPORARILY_LOCKED; |
| 19 | } |
| 20 | |
| 21 | |
| 22 | ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_) |
| 23 | : storage(storage_) |
| 24 | , format_version(storage.format_version) |
| 25 | , current_parts(format_version) |
| 26 | , virtual_parts(format_version) |
| 27 | {} |
| 28 | |
| 29 | |
| 30 | void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts) |
| 31 | { |
| 32 | std::lock_guard lock(state_mutex); |
| 33 | |
| 34 | for (auto part : parts) |
| 35 | { |
| 36 | current_parts.add(part->name); |
| 37 | virtual_parts.add(part->name); |
| 38 | } |
| 39 | } |
| 40 | |
| 41 | |
| 42 | bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const |
| 43 | { |
| 44 | std::lock_guard lock(state_mutex); |
| 45 | return virtual_parts.getContainingPart(data_part->info) != data_part->name; |
| 46 | } |
| 47 | |
| 48 | bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) |
| 49 | { |
| 50 | auto queue_path = replica_path + "/queue" ; |
| 51 | LOG_DEBUG(log, "Loading queue from " << queue_path); |
| 52 | |
| 53 | bool updated = false; |
| 54 | std::optional<time_t> min_unprocessed_insert_time_changed; |
| 55 | |
| 56 | { |
| 57 | std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex); |
| 58 | |
| 59 | String log_pointer_str = zookeeper->get(replica_path + "/log_pointer" ); |
| 60 | log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str); |
| 61 | |
| 62 | std::unordered_set<String> already_loaded_paths; |
| 63 | { |
| 64 | std::lock_guard lock(state_mutex); |
| 65 | for (const LogEntryPtr & log_entry : queue) |
| 66 | already_loaded_paths.insert(log_entry->znode_name); |
| 67 | } |
| 68 | |
| 69 | Strings children = zookeeper->getChildren(queue_path); |
| 70 | |
| 71 | auto to_remove_it = std::remove_if( |
| 72 | children.begin(), children.end(), [&](const String & path) |
| 73 | { |
| 74 | return already_loaded_paths.count(path); |
| 75 | }); |
| 76 | |
| 77 | LOG_DEBUG(log, |
| 78 | "Having " << (to_remove_it - children.begin()) << " queue entries to load, " |
| 79 | << (children.end() - to_remove_it) << " entries already loaded." ); |
| 80 | children.erase(to_remove_it, children.end()); |
| 81 | |
| 82 | std::sort(children.begin(), children.end()); |
| 83 | |
| 84 | zkutil::AsyncResponses<Coordination::GetResponse> futures; |
| 85 | futures.reserve(children.size()); |
| 86 | |
| 87 | for (const String & child : children) |
| 88 | futures.emplace_back(child, zookeeper->asyncGet(queue_path + "/" + child)); |
| 89 | |
| 90 | for (auto & future : futures) |
| 91 | { |
| 92 | Coordination::GetResponse res = future.second.get(); |
| 93 | LogEntryPtr entry = LogEntry::parse(res.data, res.stat); |
| 94 | entry->znode_name = future.first; |
| 95 | |
| 96 | std::lock_guard lock(state_mutex); |
| 97 | |
| 98 | insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); |
| 99 | |
| 100 | updated = true; |
| 101 | } |
| 102 | |
| 103 | zookeeper->tryGet(replica_path + "/mutation_pointer" , mutation_pointer); |
| 104 | } |
| 105 | |
| 106 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); |
| 107 | |
| 108 | LOG_TRACE(log, "Loaded queue" ); |
| 109 | return updated; |
| 110 | } |
| 111 | |
| 112 | |
| 113 | void ReplicatedMergeTreeQueue::initialize( |
| 114 | const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, |
| 115 | const MergeTreeData::DataParts & parts) |
| 116 | { |
| 117 | zookeeper_path = zookeeper_path_; |
| 118 | replica_path = replica_path_; |
| 119 | logger_name = logger_name_; |
| 120 | log = &Logger::get(logger_name); |
| 121 | |
| 122 | addVirtualParts(parts); |
| 123 | } |
| 124 | |
| 125 | |
| 126 | void ReplicatedMergeTreeQueue::insertUnlocked( |
| 127 | const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, |
| 128 | std::lock_guard<std::mutex> & /* state_lock */) |
| 129 | { |
| 130 | for (const String & virtual_part_name : entry->getVirtualPartNames()) |
| 131 | { |
| 132 | virtual_parts.add(virtual_part_name); |
| 133 | updateMutationsPartsToDo(virtual_part_name, /* add = */ true); |
| 134 | } |
| 135 | |
| 136 | /// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted |
| 137 | if (entry->type != LogEntry::DROP_RANGE) |
| 138 | queue.push_back(entry); |
| 139 | else |
| 140 | queue.push_front(entry); |
| 141 | |
| 142 | if (entry->type == LogEntry::GET_PART) |
| 143 | { |
| 144 | inserts_by_time.insert(entry); |
| 145 | |
| 146 | if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time)) |
| 147 | { |
| 148 | min_unprocessed_insert_time = entry->create_time; |
| 149 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
| 150 | } |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | |
| 155 | void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) |
| 156 | { |
| 157 | std::optional<time_t> min_unprocessed_insert_time_changed; |
| 158 | |
| 159 | { |
| 160 | std::lock_guard lock(state_mutex); |
| 161 | insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); |
| 162 | } |
| 163 | |
| 164 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); |
| 165 | } |
| 166 | |
| 167 | |
| 168 | void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( |
| 169 | const LogEntryPtr & entry, |
| 170 | bool is_successful, |
| 171 | std::optional<time_t> & min_unprocessed_insert_time_changed, |
| 172 | std::optional<time_t> & max_processed_insert_time_changed, |
| 173 | std::unique_lock<std::mutex> & /* queue_lock */) |
| 174 | { |
| 175 | /// Update insert times. |
| 176 | if (entry->type == LogEntry::GET_PART) |
| 177 | { |
| 178 | inserts_by_time.erase(entry); |
| 179 | |
| 180 | if (inserts_by_time.empty()) |
| 181 | { |
| 182 | min_unprocessed_insert_time = 0; |
| 183 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
| 184 | } |
| 185 | else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time) |
| 186 | { |
| 187 | min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time; |
| 188 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
| 189 | } |
| 190 | |
| 191 | if (entry->create_time > max_processed_insert_time) |
| 192 | { |
| 193 | max_processed_insert_time = entry->create_time; |
| 194 | max_processed_insert_time_changed = max_processed_insert_time; |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | if (is_successful) |
| 199 | { |
| 200 | for (const String & virtual_part_name : entry->getVirtualPartNames()) |
| 201 | { |
| 202 | Strings replaced_parts; |
| 203 | current_parts.add(virtual_part_name, &replaced_parts); |
| 204 | |
| 205 | /// Each part from `replaced_parts` should become Obsolete as a result of executing the entry. |
| 206 | /// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion() |
| 207 | for (const String & replaced_part_name : replaced_parts) |
| 208 | updateMutationsPartsToDo(replaced_part_name, /* add = */ false); |
| 209 | } |
| 210 | |
| 211 | String drop_range_part_name; |
| 212 | if (entry->type == LogEntry::DROP_RANGE) |
| 213 | drop_range_part_name = entry->new_part_name; |
| 214 | else if (entry->type == LogEntry::REPLACE_RANGE) |
| 215 | drop_range_part_name = entry->replace_range_entry->drop_range_part_name; |
| 216 | |
| 217 | if (!drop_range_part_name.empty()) |
| 218 | { |
| 219 | current_parts.remove(drop_range_part_name); |
| 220 | virtual_parts.remove(drop_range_part_name); |
| 221 | } |
| 222 | } |
| 223 | else |
| 224 | { |
| 225 | for (const String & virtual_part_name : entry->getVirtualPartNames()) |
| 226 | { |
| 227 | /// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear |
| 228 | /// so we won't need to mutate it. |
| 229 | updateMutationsPartsToDo(virtual_part_name, /* add = */ false); |
| 230 | } |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | |
| 235 | void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name, bool add) |
| 236 | { |
| 237 | auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); |
| 238 | auto in_partition = mutations_by_partition.find(part_info.partition_id); |
| 239 | if (in_partition == mutations_by_partition.end()) |
| 240 | return; |
| 241 | |
| 242 | bool some_mutations_are_probably_done = false; |
| 243 | |
| 244 | auto from_it = in_partition->second.upper_bound(part_info.getDataVersion()); |
| 245 | for (auto it = from_it; it != in_partition->second.end(); ++it) |
| 246 | { |
| 247 | MutationStatus & status = *it->second; |
| 248 | status.parts_to_do += (add ? +1 : -1); |
| 249 | if (status.parts_to_do <= 0) |
| 250 | some_mutations_are_probably_done = true; |
| 251 | |
| 252 | if (!add && !status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info)) |
| 253 | { |
| 254 | status.latest_failed_part.clear(); |
| 255 | status.latest_failed_part_info = MergeTreePartInfo(); |
| 256 | status.latest_fail_time = 0; |
| 257 | status.latest_fail_reason.clear(); |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | if (some_mutations_are_probably_done) |
| 262 | storage.mutations_finalizing_task->schedule(); |
| 263 | } |
| 264 | |
| 265 | |
| 266 | void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( |
| 267 | zkutil::ZooKeeperPtr zookeeper, |
| 268 | std::optional<time_t> min_unprocessed_insert_time_changed, |
| 269 | std::optional<time_t> max_processed_insert_time_changed) const |
| 270 | { |
| 271 | /// Here there can be a race condition (with different remove at the same time) |
| 272 | /// because we update times in ZooKeeper with unlocked mutex, while these times may change. |
| 273 | /// Consider it unimportant (for a short time, ZK will have a slightly different time value). |
| 274 | |
| 275 | Coordination::Requests ops; |
| 276 | |
| 277 | if (min_unprocessed_insert_time_changed) |
| 278 | ops.emplace_back(zkutil::makeSetRequest( |
| 279 | replica_path + "/min_unprocessed_insert_time" , toString(*min_unprocessed_insert_time_changed), -1)); |
| 280 | |
| 281 | if (max_processed_insert_time_changed) |
| 282 | ops.emplace_back(zkutil::makeSetRequest( |
| 283 | replica_path + "/max_processed_insert_time" , toString(*max_processed_insert_time_changed), -1)); |
| 284 | |
| 285 | if (!ops.empty()) |
| 286 | { |
| 287 | Coordination::Responses responses; |
| 288 | auto code = zookeeper->tryMulti(ops, responses); |
| 289 | |
| 290 | if (code) |
| 291 | LOG_ERROR(log, "Couldn't set value of nodes for insert times (" |
| 292 | << replica_path << "/min_unprocessed_insert_time, max_processed_insert_time)" << ": " |
| 293 | << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often." ); |
| 294 | } |
| 295 | } |
| 296 | |
| 297 | |
| 298 | void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) |
| 299 | { |
| 300 | auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); |
| 301 | |
| 302 | if (code) |
| 303 | LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": " |
| 304 | << zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often." ); |
| 305 | |
| 306 | std::optional<time_t> min_unprocessed_insert_time_changed; |
| 307 | std::optional<time_t> max_processed_insert_time_changed; |
| 308 | |
| 309 | bool found = false; |
| 310 | size_t queue_size = 0; |
| 311 | |
| 312 | { |
| 313 | std::unique_lock lock(state_mutex); |
| 314 | |
| 315 | /// Remove the job from the queue in the RAM. |
| 316 | /// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task. |
| 317 | /// Why do we view the queue from the end? |
| 318 | /// - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end. |
| 319 | for (Queue::iterator it = queue.end(); it != queue.begin();) |
| 320 | { |
| 321 | --it; |
| 322 | if (*it == entry) |
| 323 | { |
| 324 | found = true; |
| 325 | updateStateOnQueueEntryRemoval( |
| 326 | entry, /* is_successful = */ true, |
| 327 | min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); |
| 328 | |
| 329 | queue.erase(it); |
| 330 | queue_size = queue.size(); |
| 331 | break; |
| 332 | } |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | if (!found) |
| 337 | throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug" , ErrorCodes::LOGICAL_ERROR); |
| 338 | |
| 339 | notifySubscribers(queue_size); |
| 340 | |
| 341 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); |
| 342 | } |
| 343 | |
| 344 | |
| 345 | bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name) |
| 346 | { |
| 347 | LogEntryPtr found; |
| 348 | size_t queue_size = 0; |
| 349 | |
| 350 | std::optional<time_t> min_unprocessed_insert_time_changed; |
| 351 | std::optional<time_t> max_processed_insert_time_changed; |
| 352 | |
| 353 | { |
| 354 | std::unique_lock lock(state_mutex); |
| 355 | |
| 356 | virtual_parts.remove(part_name); |
| 357 | |
| 358 | for (Queue::iterator it = queue.begin(); it != queue.end();) |
| 359 | { |
| 360 | if ((*it)->new_part_name == part_name) |
| 361 | { |
| 362 | found = *it; |
| 363 | updateStateOnQueueEntryRemoval( |
| 364 | found, /* is_successful = */ false, |
| 365 | min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); |
| 366 | queue.erase(it++); |
| 367 | queue_size = queue.size(); |
| 368 | break; |
| 369 | } |
| 370 | else |
| 371 | ++it; |
| 372 | } |
| 373 | } |
| 374 | |
| 375 | if (!found) |
| 376 | return false; |
| 377 | |
| 378 | notifySubscribers(queue_size); |
| 379 | |
| 380 | zookeeper->tryRemove(replica_path + "/queue/" + found->znode_name); |
| 381 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); |
| 382 | |
| 383 | return true; |
| 384 | } |
| 385 | |
| 386 | |
| 387 | bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info) |
| 388 | { |
| 389 | std::lock_guard lock(state_mutex); |
| 390 | return virtual_parts.remove(part_info); |
| 391 | } |
| 392 | |
| 393 | void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) |
| 394 | { |
| 395 | std::lock_guard lock(pull_logs_to_queue_mutex); |
| 396 | |
| 397 | String index_str = zookeeper->get(replica_path + "/log_pointer" ); |
| 398 | UInt64 index; |
| 399 | |
| 400 | Strings log_entries = zookeeper->getChildrenWatch(zookeeper_path + "/log" , nullptr, watch_callback); |
| 401 | |
| 402 | /// We update mutations after we have loaded the list of log entries, but before we insert them |
| 403 | /// in the queue. |
| 404 | /// With this we ensure that if you read the log state L1 and then the state of mutations M1, |
| 405 | /// then L1 "happened-before" M1. |
| 406 | updateMutations(zookeeper); |
| 407 | |
| 408 | if (index_str.empty()) |
| 409 | { |
| 410 | /// If we do not already have a pointer to the log, put a pointer to the first entry in it. |
| 411 | index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-" ))); |
| 412 | |
| 413 | zookeeper->set(replica_path + "/log_pointer" , toString(index)); |
| 414 | } |
| 415 | else |
| 416 | { |
| 417 | index = parse<UInt64>(index_str); |
| 418 | } |
| 419 | |
| 420 | String min_log_entry = "log-" + padIndex(index); |
| 421 | |
| 422 | /// Multiple log entries that must be copied to the queue. |
| 423 | |
| 424 | log_entries.erase( |
| 425 | std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }), |
| 426 | log_entries.end()); |
| 427 | |
| 428 | if (!log_entries.empty()) |
| 429 | { |
| 430 | std::sort(log_entries.begin(), log_entries.end()); |
| 431 | |
| 432 | /// ZK contains a limit on the number or total size of operations in a multi-request. |
| 433 | /// If the limit is exceeded, the connection is simply closed. |
| 434 | /// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total. |
| 435 | /// The average size of the node value in this case is less than 10 kilobytes. |
| 436 | static constexpr auto MAX_MULTI_OPS = 100; |
| 437 | |
| 438 | for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += MAX_MULTI_OPS) |
| 439 | { |
| 440 | auto begin = log_entries.begin() + entry_idx; |
| 441 | auto end = entry_idx + MAX_MULTI_OPS >= log_entries.size() |
| 442 | ? log_entries.end() |
| 443 | : (begin + MAX_MULTI_OPS); |
| 444 | auto last = end - 1; |
| 445 | |
| 446 | String last_entry = *last; |
| 447 | if (!startsWith(last_entry, "log-" )) |
| 448 | throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log" , |
| 449 | ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER); |
| 450 | |
| 451 | UInt64 last_entry_index = parse<UInt64>(last_entry.substr(strlen("log-" ))); |
| 452 | |
| 453 | LOG_DEBUG(log, "Pulling " << (end - begin) << " entries to queue: " << *begin << " - " << *last); |
| 454 | |
| 455 | zkutil::AsyncResponses<Coordination::GetResponse> futures; |
| 456 | futures.reserve(end - begin); |
| 457 | |
| 458 | for (auto it = begin; it != end; ++it) |
| 459 | futures.emplace_back(*it, zookeeper->asyncGet(zookeeper_path + "/log/" + *it)); |
| 460 | |
| 461 | /// Simultaneously add all new entries to the queue and move the pointer to the log. |
| 462 | |
| 463 | Coordination::Requests ops; |
| 464 | std::vector<LogEntryPtr> copied_entries; |
| 465 | copied_entries.reserve(end - begin); |
| 466 | |
| 467 | std::optional<time_t> min_unprocessed_insert_time_changed; |
| 468 | |
| 469 | for (auto & future : futures) |
| 470 | { |
| 471 | Coordination::GetResponse res = future.second.get(); |
| 472 | |
| 473 | copied_entries.emplace_back(LogEntry::parse(res.data, res.stat)); |
| 474 | |
| 475 | ops.emplace_back(zkutil::makeCreateRequest( |
| 476 | replica_path + "/queue/queue-" , res.data, zkutil::CreateMode::PersistentSequential)); |
| 477 | |
| 478 | const auto & entry = *copied_entries.back(); |
| 479 | if (entry.type == LogEntry::GET_PART) |
| 480 | { |
| 481 | std::lock_guard state_lock(state_mutex); |
| 482 | if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time)) |
| 483 | { |
| 484 | min_unprocessed_insert_time = entry.create_time; |
| 485 | min_unprocessed_insert_time_changed = min_unprocessed_insert_time; |
| 486 | } |
| 487 | } |
| 488 | } |
| 489 | |
| 490 | ops.emplace_back(zkutil::makeSetRequest( |
| 491 | replica_path + "/log_pointer" , toString(last_entry_index + 1), -1)); |
| 492 | |
| 493 | if (min_unprocessed_insert_time_changed) |
| 494 | ops.emplace_back(zkutil::makeSetRequest( |
| 495 | replica_path + "/min_unprocessed_insert_time" , toString(*min_unprocessed_insert_time_changed), -1)); |
| 496 | |
| 497 | auto responses = zookeeper->multi(ops); |
| 498 | |
| 499 | /// Now we have successfully updated the queue in ZooKeeper. Update it in RAM. |
| 500 | |
| 501 | try |
| 502 | { |
| 503 | std::lock_guard state_lock(state_mutex); |
| 504 | |
| 505 | log_pointer = last_entry_index + 1; |
| 506 | |
| 507 | for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx) |
| 508 | { |
| 509 | String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created; |
| 510 | copied_entries[copied_entry_idx]->znode_name = path_created.substr(path_created.find_last_of('/') + 1); |
| 511 | |
| 512 | std::optional<time_t> unused = false; |
| 513 | insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock); |
| 514 | } |
| 515 | |
| 516 | last_queue_update = time(nullptr); |
| 517 | } |
| 518 | catch (...) |
| 519 | { |
| 520 | /// If it fails, the data in RAM is incorrect. In order to avoid possible further corruption of data in ZK, we will kill ourselves. |
| 521 | /// This is possible only if there is an unknown logical error. |
| 522 | std::terminate(); |
| 523 | } |
| 524 | |
| 525 | if (!copied_entries.empty()) |
| 526 | LOG_DEBUG(log, "Pulled " << copied_entries.size() << " entries to queue." ); |
| 527 | } |
| 528 | |
| 529 | if (storage.queue_task_handle) |
| 530 | storage.queue_task_handle->wake(); |
| 531 | } |
| 532 | } |
| 533 | |
| 534 | |
| 535 | static Names getPartNamesToMutate( |
| 536 | const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts) |
| 537 | { |
| 538 | Names result; |
| 539 | for (const auto & pair : mutation.block_numbers) |
| 540 | { |
| 541 | const String & partition_id = pair.first; |
| 542 | Int64 block_num = pair.second; |
| 543 | |
| 544 | /// Note that we cannot simply count all parts to mutate using getPartsCoveredBy(appropriate part_info) |
| 545 | /// because they are not consecutive in `parts`. |
| 546 | MergeTreePartInfo covering_part_info( |
| 547 | partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER); |
| 548 | for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info)) |
| 549 | { |
| 550 | auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion()); |
| 551 | if (part_info.getDataVersion() < block_num) |
| 552 | result.push_back(covered_part_name); |
| 553 | } |
| 554 | } |
| 555 | |
| 556 | return result; |
| 557 | } |
| 558 | |
| 559 | |
| 560 | void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) |
| 561 | { |
| 562 | std::lock_guard lock(update_mutations_mutex); |
| 563 | |
| 564 | Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations" , nullptr, watch_callback); |
| 565 | StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end()); |
| 566 | |
| 567 | /// Compare with the local state, delete obsolete entries and determine which new entries to load. |
| 568 | Strings entries_to_load; |
| 569 | bool some_active_mutations_were_killed = false; |
| 570 | { |
| 571 | std::lock_guard state_lock(state_mutex); |
| 572 | |
| 573 | for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end();) |
| 574 | { |
| 575 | const ReplicatedMergeTreeMutationEntry & entry = *it->second.entry; |
| 576 | if (!entries_in_zk_set.count(entry.znode_name)) |
| 577 | { |
| 578 | if (!it->second.is_done) |
| 579 | { |
| 580 | LOG_DEBUG(log, "Removing killed mutation " + entry.znode_name + " from local state." ); |
| 581 | some_active_mutations_were_killed = true; |
| 582 | } |
| 583 | else |
| 584 | LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state." ); |
| 585 | |
| 586 | for (const auto & partition_and_block_num : entry.block_numbers) |
| 587 | { |
| 588 | auto & in_partition = mutations_by_partition[partition_and_block_num.first]; |
| 589 | in_partition.erase(partition_and_block_num.second); |
| 590 | if (in_partition.empty()) |
| 591 | mutations_by_partition.erase(partition_and_block_num.first); |
| 592 | } |
| 593 | |
| 594 | it = mutations_by_znode.erase(it); |
| 595 | } |
| 596 | else |
| 597 | ++it; |
| 598 | } |
| 599 | |
| 600 | for (const String & znode : entries_in_zk_set) |
| 601 | { |
| 602 | if (!mutations_by_znode.count(znode)) |
| 603 | entries_to_load.push_back(znode); |
| 604 | } |
| 605 | } |
| 606 | |
| 607 | if (some_active_mutations_were_killed) |
| 608 | storage.queue_task_handle->wake(); |
| 609 | |
| 610 | if (!entries_to_load.empty()) |
| 611 | { |
| 612 | LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: " |
| 613 | + entries_to_load.front() + " - " + entries_to_load.back()); |
| 614 | |
| 615 | std::vector<std::future<Coordination::GetResponse>> futures; |
| 616 | for (const String & entry : entries_to_load) |
| 617 | futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry)); |
| 618 | |
| 619 | std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations; |
| 620 | for (size_t i = 0; i < entries_to_load.size(); ++i) |
| 621 | { |
| 622 | new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>( |
| 623 | ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i]))); |
| 624 | } |
| 625 | |
| 626 | bool some_mutations_are_probably_done = false; |
| 627 | { |
| 628 | std::lock_guard state_lock(state_mutex); |
| 629 | |
| 630 | for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations) |
| 631 | { |
| 632 | auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry)) |
| 633 | .first->second; |
| 634 | |
| 635 | for (const auto & pair : entry->block_numbers) |
| 636 | { |
| 637 | const String & partition_id = pair.first; |
| 638 | Int64 block_num = pair.second; |
| 639 | mutations_by_partition[partition_id].emplace(block_num, &mutation); |
| 640 | } |
| 641 | |
| 642 | /// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`. |
| 643 | mutation.parts_to_do += getPartNamesToMutate(*entry, current_parts).size(); |
| 644 | |
| 645 | /// And next we would need to mutate all parts with getDataVersion() greater than |
| 646 | /// mutation block number that would appear as a result of executing the queue. |
| 647 | for (const auto & queue_entry : queue) |
| 648 | { |
| 649 | for (const String & produced_part_name : queue_entry->getVirtualPartNames()) |
| 650 | { |
| 651 | auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version); |
| 652 | auto it = entry->block_numbers.find(part_info.partition_id); |
| 653 | if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion()) |
| 654 | ++mutation.parts_to_do; |
| 655 | } |
| 656 | } |
| 657 | |
| 658 | if (mutation.parts_to_do == 0) |
| 659 | some_mutations_are_probably_done = true; |
| 660 | } |
| 661 | } |
| 662 | |
| 663 | storage.merge_selecting_task->schedule(); |
| 664 | |
| 665 | if (some_mutations_are_probably_done) |
| 666 | storage.mutations_finalizing_task->schedule(); |
| 667 | } |
| 668 | } |
| 669 | |
| 670 | |
| 671 | ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( |
| 672 | zkutil::ZooKeeperPtr zookeeper, const String & mutation_id) |
| 673 | { |
| 674 | std::lock_guard lock(update_mutations_mutex); |
| 675 | |
| 676 | auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id); |
| 677 | if (rc == Coordination::ZOK) |
| 678 | LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper." ); |
| 679 | |
| 680 | ReplicatedMergeTreeMutationEntryPtr entry; |
| 681 | bool mutation_was_active = false; |
| 682 | { |
| 683 | std::lock_guard state_lock(state_mutex); |
| 684 | |
| 685 | auto it = mutations_by_znode.find(mutation_id); |
| 686 | if (it == mutations_by_znode.end()) |
| 687 | return nullptr; |
| 688 | |
| 689 | mutation_was_active = !it->second.is_done; |
| 690 | |
| 691 | entry = it->second.entry; |
| 692 | for (const auto & partition_and_block_num : entry->block_numbers) |
| 693 | { |
| 694 | auto & in_partition = mutations_by_partition[partition_and_block_num.first]; |
| 695 | in_partition.erase(partition_and_block_num.second); |
| 696 | if (in_partition.empty()) |
| 697 | mutations_by_partition.erase(partition_and_block_num.first); |
| 698 | } |
| 699 | |
| 700 | mutations_by_znode.erase(it); |
| 701 | LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state." ); |
| 702 | } |
| 703 | |
| 704 | if (mutation_was_active) |
| 705 | storage.queue_task_handle->wake(); |
| 706 | |
| 707 | return entry; |
| 708 | } |
| 709 | |
| 710 | |
| 711 | ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name) |
| 712 | { |
| 713 | std::lock_guard lock(state_mutex); |
| 714 | |
| 715 | /// Let's find the action to merge this part with others. Let's remember others. |
| 716 | StringSet parts_for_merge; |
| 717 | Queue::iterator merge_entry = queue.end(); |
| 718 | for (Queue::iterator it = queue.begin(); it != queue.end(); ++it) |
| 719 | { |
| 720 | if ((*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART) |
| 721 | { |
| 722 | if (std::find((*it)->source_parts.begin(), (*it)->source_parts.end(), part_name) |
| 723 | != (*it)->source_parts.end()) |
| 724 | { |
| 725 | parts_for_merge = StringSet((*it)->source_parts.begin(), (*it)->source_parts.end()); |
| 726 | merge_entry = it; |
| 727 | break; |
| 728 | } |
| 729 | } |
| 730 | } |
| 731 | |
| 732 | if (!parts_for_merge.empty()) |
| 733 | { |
| 734 | /// Move to the end of queue actions that result in one of the parts in `parts_for_merge`. |
| 735 | for (Queue::iterator it = queue.begin(); it != queue.end();) |
| 736 | { |
| 737 | auto it0 = it; |
| 738 | ++it; |
| 739 | |
| 740 | if (it0 == merge_entry) |
| 741 | break; |
| 742 | |
| 743 | if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART || (*it0)->type == LogEntry::MUTATE_PART) |
| 744 | && parts_for_merge.count((*it0)->new_part_name)) |
| 745 | { |
| 746 | queue.splice(queue.end(), queue, it0, it); |
| 747 | } |
| 748 | } |
| 749 | } |
| 750 | |
| 751 | return parts_for_merge; |
| 752 | } |
| 753 | |
| 754 | bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const |
| 755 | { |
| 756 | if (entry_ptr->type != LogEntry::REPLACE_RANGE) |
| 757 | return false; |
| 758 | |
| 759 | if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE) |
| 760 | return false; |
| 761 | |
| 762 | if (entry_ptr->replace_range_entry != nullptr && entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves |
| 763 | return false; |
| 764 | |
| 765 | for (const String & new_part_name : entry_ptr->replace_range_entry->new_part_names) |
| 766 | if (!part_info.contains(MergeTreePartInfo::fromPartName(new_part_name, format_version))) |
| 767 | return false; |
| 768 | |
| 769 | return true; |
| 770 | } |
| 771 | |
| 772 | void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( |
| 773 | zkutil::ZooKeeperPtr zookeeper, |
| 774 | const MergeTreePartInfo & part_info, |
| 775 | const ReplicatedMergeTreeLogEntryData & current) |
| 776 | { |
| 777 | Queue to_wait; |
| 778 | size_t removed_entries = 0; |
| 779 | std::optional<time_t> min_unprocessed_insert_time_changed; |
| 780 | std::optional<time_t> max_processed_insert_time_changed; |
| 781 | |
| 782 | /// Remove operations with parts, contained in the range to be deleted, from the queue. |
| 783 | std::unique_lock lock(state_mutex); |
| 784 | for (Queue::iterator it = queue.begin(); it != queue.end();) |
| 785 | { |
| 786 | auto type = (*it)->type; |
| 787 | |
| 788 | if (((type == LogEntry::GET_PART || type == LogEntry::MERGE_PARTS || type == LogEntry::MUTATE_PART) |
| 789 | && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version))) |
| 790 | || checkReplaceRangeCanBeRemoved(part_info, *it, current)) |
| 791 | { |
| 792 | if ((*it)->currently_executing) |
| 793 | to_wait.push_back(*it); |
| 794 | auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name); |
| 795 | if (code) |
| 796 | LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": " |
| 797 | << zkutil::ZooKeeper::error2string(code)); |
| 798 | |
| 799 | updateStateOnQueueEntryRemoval( |
| 800 | *it, /* is_successful = */ false, |
| 801 | min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); |
| 802 | queue.erase(it++); |
| 803 | ++removed_entries; |
| 804 | } |
| 805 | else |
| 806 | ++it; |
| 807 | } |
| 808 | |
| 809 | updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); |
| 810 | |
| 811 | LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. " |
| 812 | "Waiting for " << to_wait.size() << " entries that are currently executing." ); |
| 813 | |
| 814 | /// Let's wait for the operations with the parts contained in the range to be deleted. |
| 815 | for (LogEntryPtr & entry : to_wait) |
| 816 | entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; }); |
| 817 | } |
| 818 | |
| 819 | |
| 820 | size_t ReplicatedMergeTreeQueue::getConflictsCountForRange( |
| 821 | const MergeTreePartInfo & range, const LogEntry & entry, |
| 822 | String * out_description, std::lock_guard<std::mutex> & /* queue_lock */) const |
| 823 | { |
| 824 | std::vector<std::pair<String, LogEntryPtr>> conflicts; |
| 825 | |
| 826 | for (auto & future_part_elem : future_parts) |
| 827 | { |
| 828 | /// Do not check itself log entry |
| 829 | if (future_part_elem.second->znode_name == entry.znode_name) |
| 830 | continue; |
| 831 | |
| 832 | if (!range.isDisjoint(MergeTreePartInfo::fromPartName(future_part_elem.first, format_version))) |
| 833 | { |
| 834 | conflicts.emplace_back(future_part_elem.first, future_part_elem.second); |
| 835 | continue; |
| 836 | } |
| 837 | } |
| 838 | |
| 839 | if (out_description) |
| 840 | { |
| 841 | std::stringstream ss; |
| 842 | ss << "Can't execute command for range " << range.getPartName() << " (entry " << entry.znode_name << "). " ; |
| 843 | ss << "There are " << conflicts.size() << " currently executing entries blocking it: " ; |
| 844 | for (const auto & conflict : conflicts) |
| 845 | ss << conflict.second->typeToString() << " part " << conflict.first << ", " ; |
| 846 | |
| 847 | *out_description = ss.str(); |
| 848 | } |
| 849 | |
| 850 | return conflicts.size(); |
| 851 | } |
| 852 | |
| 853 | |
| 854 | void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry) |
| 855 | { |
| 856 | String conflicts_description; |
| 857 | std::lock_guard lock(state_mutex); |
| 858 | |
| 859 | if (0 != getConflictsCountForRange(range, entry, &conflicts_description, lock)) |
| 860 | throw Exception(conflicts_description, ErrorCodes::UNFINISHED); |
| 861 | } |
| 862 | |
| 863 | |
| 864 | bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const |
| 865 | { |
| 866 | /// Let's check if the same part is now being created by another action. |
| 867 | if (future_parts.count(new_part_name)) |
| 868 | { |
| 869 | out_reason = "Not executing log entry for part " + new_part_name |
| 870 | + " because another log entry for the same part is being processed. This shouldn't happen often." ; |
| 871 | return false; |
| 872 | |
| 873 | /** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed, |
| 874 | * and queue element will be processed. |
| 875 | * Immediately in the `executeLogEntry` function it will be found that we already have a part, |
| 876 | * and queue element will be immediately treated as processed. |
| 877 | */ |
| 878 | } |
| 879 | |
| 880 | /// A more complex check is whether another part is currently created by other action that will cover this part. |
| 881 | /// NOTE The above is redundant, but left for a more convenient message in the log. |
| 882 | auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version); |
| 883 | |
| 884 | /// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited. |
| 885 | for (const auto & future_part_elem : future_parts) |
| 886 | { |
| 887 | auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version); |
| 888 | |
| 889 | if (future_part.contains(result_part)) |
| 890 | { |
| 891 | out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part " |
| 892 | + future_part_elem.first + " that is currently executing" ; |
| 893 | return false; |
| 894 | } |
| 895 | } |
| 896 | |
| 897 | return true; |
| 898 | } |
| 899 | |
| 900 | bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason) |
| 901 | { |
| 902 | std::lock_guard lock(state_mutex); |
| 903 | |
| 904 | if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock)) |
| 905 | { |
| 906 | CurrentlyExecuting::setActualPartName(entry, part_name, *this); |
| 907 | return true; |
| 908 | } |
| 909 | |
| 910 | return false; |
| 911 | } |
| 912 | |
| 913 | |
| 914 | bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( |
| 915 | const LogEntry & entry, |
| 916 | String & out_postpone_reason, |
| 917 | MergeTreeDataMergerMutator & merger_mutator, |
| 918 | MergeTreeData & data, |
| 919 | std::lock_guard<std::mutex> & queue_lock) const |
| 920 | { |
| 921 | if (entry.type == LogEntry::MERGE_PARTS |
| 922 | || entry.type == LogEntry::GET_PART |
| 923 | || entry.type == LogEntry::MUTATE_PART) |
| 924 | { |
| 925 | for (const String & new_part_name : entry.getBlockingPartNames()) |
| 926 | { |
| 927 | if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, queue_lock)) |
| 928 | { |
| 929 | if (!out_postpone_reason.empty()) |
| 930 | LOG_DEBUG(log, out_postpone_reason); |
| 931 | return false; |
| 932 | } |
| 933 | } |
| 934 | } |
| 935 | |
| 936 | if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART) |
| 937 | { |
| 938 | /** If any of the required parts are now fetched or in merge process, wait for the end of this operation. |
| 939 | * Otherwise, even if all the necessary parts for the merge are not present, you should try to make a merge. |
| 940 | * If any parts are missing, instead of merge, there will be an attempt to download a part. |
| 941 | * Such a situation is possible if the receive of a part has failed, and it was moved to the end of the queue. |
| 942 | */ |
| 943 | size_t sum_parts_size_in_bytes = 0; |
| 944 | for (const auto & name : entry.source_parts) |
| 945 | { |
| 946 | if (future_parts.count(name)) |
| 947 | { |
| 948 | String reason = "Not merging into part " + entry.new_part_name |
| 949 | + " because part " + name + " is not ready yet (log entry for that part is being processed)." ; |
| 950 | LOG_TRACE(log, reason); |
| 951 | out_postpone_reason = reason; |
| 952 | return false; |
| 953 | } |
| 954 | |
| 955 | auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); |
| 956 | if (part) |
| 957 | sum_parts_size_in_bytes += part->bytes_on_disk; |
| 958 | } |
| 959 | |
| 960 | if (merger_mutator.merges_blocker.isCancelled()) |
| 961 | { |
| 962 | String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now." ; |
| 963 | LOG_DEBUG(log, reason); |
| 964 | out_postpone_reason = reason; |
| 965 | return false; |
| 966 | } |
| 967 | |
| 968 | UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge() |
| 969 | : merger_mutator.getMaxSourcePartSizeForMutation(); |
| 970 | /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), |
| 971 | * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, |
| 972 | * because it may be ordered by OPTIMIZE or early with different settings. |
| 973 | * Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges, |
| 974 | * because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL). |
| 975 | */ |
| 976 | const auto data_settings = data.getSettings(); |
| 977 | bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool); |
| 978 | |
| 979 | if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size) |
| 980 | { |
| 981 | String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name |
| 982 | + " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes) |
| 983 | + ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ")." ; |
| 984 | LOG_DEBUG(log, reason); |
| 985 | out_postpone_reason = reason; |
| 986 | return false; |
| 987 | } |
| 988 | } |
| 989 | |
| 990 | /// TODO: it makes sense to check DROP_RANGE also |
| 991 | if (entry.type == LogEntry::CLEAR_COLUMN || entry.type == LogEntry::REPLACE_RANGE) |
| 992 | { |
| 993 | String conflicts_description; |
| 994 | String range_name = (entry.type == LogEntry::REPLACE_RANGE) ? entry.replace_range_entry->drop_range_part_name : entry.new_part_name; |
| 995 | auto range = MergeTreePartInfo::fromPartName(range_name, format_version); |
| 996 | |
| 997 | if (0 != getConflictsCountForRange(range, entry, &conflicts_description, queue_lock)) |
| 998 | { |
| 999 | LOG_DEBUG(log, conflicts_description); |
| 1000 | return false; |
| 1001 | } |
| 1002 | } |
| 1003 | |
| 1004 | return true; |
| 1005 | } |
| 1006 | |
| 1007 | |
| 1008 | Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl( |
| 1009 | const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const |
| 1010 | { |
| 1011 | auto in_partition = mutations_by_partition.find(partition_id); |
| 1012 | if (in_partition == mutations_by_partition.end()) |
| 1013 | return 0; |
| 1014 | |
| 1015 | auto it = in_partition->second.upper_bound(data_version); |
| 1016 | if (it == in_partition->second.begin()) |
| 1017 | return 0; |
| 1018 | |
| 1019 | --it; |
| 1020 | return it->first; |
| 1021 | } |
| 1022 | |
| 1023 | |
| 1024 | Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partition_id, Int64 data_version) const |
| 1025 | { |
| 1026 | std::lock_guard lock(state_mutex); |
| 1027 | return getCurrentMutationVersionImpl(partition_id, data_version, lock); |
| 1028 | } |
| 1029 | |
| 1030 | |
| 1031 | ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_) |
| 1032 | : entry(entry_), queue(queue_) |
| 1033 | { |
| 1034 | entry->currently_executing = true; |
| 1035 | ++entry->num_tries; |
| 1036 | entry->last_attempt_time = time(nullptr); |
| 1037 | |
| 1038 | for (const String & new_part_name : entry->getBlockingPartNames()) |
| 1039 | { |
| 1040 | if (!queue.future_parts.emplace(new_part_name, entry).second) |
| 1041 | throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug." , ErrorCodes::LOGICAL_ERROR); |
| 1042 | } |
| 1043 | } |
| 1044 | |
| 1045 | |
| 1046 | void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, |
| 1047 | const String & actual_part_name, ReplicatedMergeTreeQueue & queue) |
| 1048 | { |
| 1049 | if (!entry.actual_new_part_name.empty()) |
| 1050 | throw Exception("Entry actual part isn't empty yet. This is a bug." , ErrorCodes::LOGICAL_ERROR); |
| 1051 | |
| 1052 | entry.actual_new_part_name = actual_part_name; |
| 1053 | |
| 1054 | /// Check if it is the same (and already added) part. |
| 1055 | if (entry.actual_new_part_name == entry.new_part_name) |
| 1056 | return; |
| 1057 | |
| 1058 | if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second) |
| 1059 | throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug." , ErrorCodes::LOGICAL_ERROR); |
| 1060 | } |
| 1061 | |
| 1062 | |
| 1063 | ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() |
| 1064 | { |
| 1065 | std::lock_guard lock(queue.state_mutex); |
| 1066 | |
| 1067 | entry->currently_executing = false; |
| 1068 | entry->execution_complete.notify_all(); |
| 1069 | |
| 1070 | for (const String & new_part_name : entry->getBlockingPartNames()) |
| 1071 | { |
| 1072 | if (!queue.future_parts.erase(new_part_name)) |
| 1073 | LOG_ERROR(queue.log, "Untagging already untagged future part " + new_part_name + ". This is a bug." ); |
| 1074 | } |
| 1075 | |
| 1076 | if (!entry->actual_new_part_name.empty()) |
| 1077 | { |
| 1078 | if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name)) |
| 1079 | LOG_ERROR(queue.log, "Untagging already untagged future part " + entry->actual_new_part_name + ". This is a bug." ); |
| 1080 | |
| 1081 | entry->actual_new_part_name.clear(); |
| 1082 | } |
| 1083 | } |
| 1084 | |
| 1085 | |
| 1086 | ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data) |
| 1087 | { |
| 1088 | LogEntryPtr entry; |
| 1089 | |
| 1090 | std::lock_guard lock(state_mutex); |
| 1091 | |
| 1092 | for (auto it = queue.begin(); it != queue.end(); ++it) |
| 1093 | { |
| 1094 | if ((*it)->currently_executing) |
| 1095 | continue; |
| 1096 | |
| 1097 | if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock)) |
| 1098 | { |
| 1099 | entry = *it; |
| 1100 | queue.splice(queue.end(), queue, it); |
| 1101 | break; |
| 1102 | } |
| 1103 | else |
| 1104 | { |
| 1105 | ++(*it)->num_postponed; |
| 1106 | (*it)->last_postpone_time = time(nullptr); |
| 1107 | } |
| 1108 | } |
| 1109 | |
| 1110 | if (entry) |
| 1111 | return { entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) } }; |
| 1112 | else |
| 1113 | return {}; |
| 1114 | } |
| 1115 | |
| 1116 | |
| 1117 | bool ReplicatedMergeTreeQueue::processEntry( |
| 1118 | std::function<zkutil::ZooKeeperPtr()> get_zookeeper, |
| 1119 | LogEntryPtr & entry, |
| 1120 | const std::function<bool(LogEntryPtr &)> func) |
| 1121 | { |
| 1122 | std::exception_ptr saved_exception; |
| 1123 | |
| 1124 | try |
| 1125 | { |
| 1126 | if (func(entry)) |
| 1127 | removeProcessedEntry(get_zookeeper(), entry); |
| 1128 | } |
| 1129 | catch (...) |
| 1130 | { |
| 1131 | saved_exception = std::current_exception(); |
| 1132 | } |
| 1133 | |
| 1134 | if (saved_exception) |
| 1135 | { |
| 1136 | std::lock_guard lock(state_mutex); |
| 1137 | |
| 1138 | entry->exception = saved_exception; |
| 1139 | |
| 1140 | if (entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART) |
| 1141 | { |
| 1142 | /// Record the exception in the system.mutations table. |
| 1143 | Int64 result_data_version = MergeTreePartInfo::fromPartName(entry->new_part_name, format_version) |
| 1144 | .getDataVersion(); |
| 1145 | auto source_part_info = MergeTreePartInfo::fromPartName( |
| 1146 | entry->source_parts.at(0), format_version); |
| 1147 | |
| 1148 | auto in_partition = mutations_by_partition.find(source_part_info.partition_id); |
| 1149 | if (in_partition != mutations_by_partition.end()) |
| 1150 | { |
| 1151 | auto mutations_begin_it = in_partition->second.upper_bound(source_part_info.getDataVersion()); |
| 1152 | auto mutations_end_it = in_partition->second.upper_bound(result_data_version); |
| 1153 | for (auto it = mutations_begin_it; it != mutations_end_it; ++it) |
| 1154 | { |
| 1155 | MutationStatus & status = *it->second; |
| 1156 | status.latest_failed_part = entry->source_parts.at(0); |
| 1157 | status.latest_failed_part_info = source_part_info; |
| 1158 | status.latest_fail_time = time(nullptr); |
| 1159 | status.latest_fail_reason = getExceptionMessage(saved_exception, false); |
| 1160 | } |
| 1161 | } |
| 1162 | } |
| 1163 | |
| 1164 | return false; |
| 1165 | } |
| 1166 | |
| 1167 | return true; |
| 1168 | } |
| 1169 | |
| 1170 | |
| 1171 | std::pair<size_t, size_t> ReplicatedMergeTreeQueue::countMergesAndPartMutations() const |
| 1172 | { |
| 1173 | std::lock_guard lock(state_mutex); |
| 1174 | |
| 1175 | size_t count_merges = 0; |
| 1176 | size_t count_mutations = 0; |
| 1177 | for (const auto & entry : queue) |
| 1178 | { |
| 1179 | if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS) |
| 1180 | ++count_merges; |
| 1181 | else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART) |
| 1182 | ++count_mutations; |
| 1183 | } |
| 1184 | |
| 1185 | return std::make_pair(count_merges, count_mutations); |
| 1186 | } |
| 1187 | |
| 1188 | |
| 1189 | size_t ReplicatedMergeTreeQueue::countMutations() const |
| 1190 | { |
| 1191 | std::lock_guard lock(state_mutex); |
| 1192 | return mutations_by_znode.size(); |
| 1193 | } |
| 1194 | |
| 1195 | |
| 1196 | size_t ReplicatedMergeTreeQueue::countFinishedMutations() const |
| 1197 | { |
| 1198 | std::lock_guard lock(state_mutex); |
| 1199 | |
| 1200 | size_t count = 0; |
| 1201 | for (const auto & pair : mutations_by_znode) |
| 1202 | { |
| 1203 | const auto & mutation = pair.second; |
| 1204 | if (!mutation.is_done) |
| 1205 | break; |
| 1206 | |
| 1207 | ++count; |
| 1208 | } |
| 1209 | |
| 1210 | return count; |
| 1211 | } |
| 1212 | |
| 1213 | |
| 1214 | ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper) |
| 1215 | { |
| 1216 | return ReplicatedMergeTreeMergePredicate(*this, zookeeper); |
| 1217 | } |
| 1218 | |
| 1219 | |
| 1220 | MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( |
| 1221 | const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const |
| 1222 | { |
| 1223 | /// NOTE: If the corresponding mutation is not found, the error is logged (and not thrown as an exception) |
| 1224 | /// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry |
| 1225 | /// from /mutations in ZK and the replicas will simply skip the mutation. |
| 1226 | |
| 1227 | if (part->info.getDataVersion() > desired_mutation_version) |
| 1228 | { |
| 1229 | LOG_WARNING(log, "Data version of part " << part->name << " is already greater than " |
| 1230 | "desired mutation version " << desired_mutation_version); |
| 1231 | return MutationCommands{}; |
| 1232 | } |
| 1233 | |
| 1234 | std::lock_guard lock(state_mutex); |
| 1235 | |
| 1236 | auto in_partition = mutations_by_partition.find(part->info.partition_id); |
| 1237 | if (in_partition == mutations_by_partition.end()) |
| 1238 | { |
| 1239 | LOG_WARNING(log, "There are no mutations for partition ID " << part->info.partition_id |
| 1240 | << " (trying to mutate part " << part->name << " to " << toString(desired_mutation_version) << ")" ); |
| 1241 | return MutationCommands{}; |
| 1242 | } |
| 1243 | |
| 1244 | auto begin = in_partition->second.upper_bound(part->info.getDataVersion()); |
| 1245 | |
| 1246 | auto end = in_partition->second.lower_bound(desired_mutation_version); |
| 1247 | if (end == in_partition->second.end() || end->first != desired_mutation_version) |
| 1248 | LOG_WARNING(log, "Mutation with version " << desired_mutation_version |
| 1249 | << " not found in partition ID " << part->info.partition_id |
| 1250 | << " (trying to mutate part " << part->name + ")" ); |
| 1251 | else |
| 1252 | ++end; |
| 1253 | |
| 1254 | MutationCommands commands; |
| 1255 | for (auto it = begin; it != end; ++it) |
| 1256 | commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end()); |
| 1257 | |
| 1258 | return commands; |
| 1259 | } |
| 1260 | |
| 1261 | |
| 1262 | bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper) |
| 1263 | { |
| 1264 | std::vector<ReplicatedMergeTreeMutationEntryPtr> candidates; |
| 1265 | { |
| 1266 | std::lock_guard lock(state_mutex); |
| 1267 | |
| 1268 | for (auto & kv : mutations_by_znode) |
| 1269 | { |
| 1270 | const String & znode = kv.first; |
| 1271 | MutationStatus & mutation = kv.second; |
| 1272 | |
| 1273 | if (mutation.is_done) |
| 1274 | continue; |
| 1275 | |
| 1276 | if (znode <= mutation_pointer) |
| 1277 | { |
| 1278 | LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")" ); |
| 1279 | mutation.is_done = true; |
| 1280 | } |
| 1281 | else if (mutation.parts_to_do == 0) |
| 1282 | { |
| 1283 | LOG_TRACE(log, "Will check if mutation " << mutation.entry->znode_name << " is done" ); |
| 1284 | candidates.push_back(mutation.entry); |
| 1285 | } |
| 1286 | } |
| 1287 | } |
| 1288 | |
| 1289 | if (candidates.empty()) |
| 1290 | return false; |
| 1291 | |
| 1292 | auto merge_pred = getMergePredicate(zookeeper); |
| 1293 | |
| 1294 | std::vector<const ReplicatedMergeTreeMutationEntry *> finished; |
| 1295 | for (const ReplicatedMergeTreeMutationEntryPtr & candidate : candidates) |
| 1296 | { |
| 1297 | if (merge_pred.isMutationFinished(*candidate)) |
| 1298 | finished.push_back(candidate.get()); |
| 1299 | } |
| 1300 | |
| 1301 | if (!finished.empty()) |
| 1302 | { |
| 1303 | zookeeper->set(replica_path + "/mutation_pointer" , finished.back()->znode_name); |
| 1304 | |
| 1305 | std::lock_guard lock(state_mutex); |
| 1306 | |
| 1307 | mutation_pointer = finished.back()->znode_name; |
| 1308 | |
| 1309 | for (const ReplicatedMergeTreeMutationEntry * entry : finished) |
| 1310 | { |
| 1311 | auto it = mutations_by_znode.find(entry->znode_name); |
| 1312 | if (it != mutations_by_znode.end()) |
| 1313 | { |
| 1314 | LOG_TRACE(log, "Mutation " << entry->znode_name << " is done" ); |
| 1315 | it->second.is_done = true; |
| 1316 | } |
| 1317 | } |
| 1318 | } |
| 1319 | |
| 1320 | return candidates.size() != finished.size(); |
| 1321 | } |
| 1322 | |
| 1323 | |
| 1324 | void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_name) |
| 1325 | { |
| 1326 | std::lock_guard lock(state_mutex); |
| 1327 | virtual_parts.add(part_name); |
| 1328 | } |
| 1329 | |
| 1330 | |
| 1331 | ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const |
| 1332 | { |
| 1333 | std::lock_guard lock(state_mutex); |
| 1334 | |
| 1335 | Status res; |
| 1336 | |
| 1337 | res.future_parts = future_parts.size(); |
| 1338 | res.queue_size = queue.size(); |
| 1339 | res.last_queue_update = last_queue_update; |
| 1340 | |
| 1341 | res.inserts_in_queue = 0; |
| 1342 | res.merges_in_queue = 0; |
| 1343 | res.part_mutations_in_queue = 0; |
| 1344 | res.queue_oldest_time = 0; |
| 1345 | res.inserts_oldest_time = 0; |
| 1346 | res.merges_oldest_time = 0; |
| 1347 | res.part_mutations_oldest_time = 0; |
| 1348 | |
| 1349 | for (const LogEntryPtr & entry : queue) |
| 1350 | { |
| 1351 | if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time)) |
| 1352 | res.queue_oldest_time = entry->create_time; |
| 1353 | |
| 1354 | if (entry->type == LogEntry::GET_PART) |
| 1355 | { |
| 1356 | ++res.inserts_in_queue; |
| 1357 | |
| 1358 | if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time)) |
| 1359 | { |
| 1360 | res.inserts_oldest_time = entry->create_time; |
| 1361 | res.oldest_part_to_get = entry->new_part_name; |
| 1362 | } |
| 1363 | } |
| 1364 | |
| 1365 | if (entry->type == LogEntry::MERGE_PARTS) |
| 1366 | { |
| 1367 | ++res.merges_in_queue; |
| 1368 | |
| 1369 | if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time)) |
| 1370 | { |
| 1371 | res.merges_oldest_time = entry->create_time; |
| 1372 | res.oldest_part_to_merge_to = entry->new_part_name; |
| 1373 | } |
| 1374 | } |
| 1375 | |
| 1376 | if (entry->type == LogEntry::MUTATE_PART) |
| 1377 | { |
| 1378 | ++res.part_mutations_in_queue; |
| 1379 | |
| 1380 | if (entry->create_time && (!res.part_mutations_oldest_time || entry->create_time < res.part_mutations_oldest_time)) |
| 1381 | { |
| 1382 | res.part_mutations_oldest_time = entry->create_time; |
| 1383 | res.oldest_part_to_mutate_to = entry->new_part_name; |
| 1384 | } |
| 1385 | } |
| 1386 | } |
| 1387 | |
| 1388 | return res; |
| 1389 | } |
| 1390 | |
| 1391 | |
| 1392 | void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const |
| 1393 | { |
| 1394 | res.clear(); |
| 1395 | std::lock_guard lock(state_mutex); |
| 1396 | |
| 1397 | res.reserve(queue.size()); |
| 1398 | for (const auto & entry : queue) |
| 1399 | res.emplace_back(*entry); |
| 1400 | } |
| 1401 | |
| 1402 | |
| 1403 | void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const |
| 1404 | { |
| 1405 | std::lock_guard lock(state_mutex); |
| 1406 | out_min_unprocessed_insert_time = min_unprocessed_insert_time; |
| 1407 | out_max_processed_insert_time = max_processed_insert_time; |
| 1408 | } |
| 1409 | |
| 1410 | |
| 1411 | std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatus() const |
| 1412 | { |
| 1413 | std::lock_guard lock(state_mutex); |
| 1414 | |
| 1415 | std::vector<MergeTreeMutationStatus> result; |
| 1416 | for (const auto & pair : mutations_by_znode) |
| 1417 | { |
| 1418 | const MutationStatus & status = pair.second; |
| 1419 | const ReplicatedMergeTreeMutationEntry & entry = *status.entry; |
| 1420 | const Names parts_to_mutate = getPartNamesToMutate(entry, current_parts); |
| 1421 | |
| 1422 | for (const MutationCommand & command : entry.commands) |
| 1423 | { |
| 1424 | std::stringstream ss; |
| 1425 | formatAST(*command.ast, ss, false, true); |
| 1426 | result.push_back(MergeTreeMutationStatus |
| 1427 | { |
| 1428 | entry.znode_name, |
| 1429 | ss.str(), |
| 1430 | entry.create_time, |
| 1431 | entry.block_numbers, |
| 1432 | parts_to_mutate, |
| 1433 | status.is_done, |
| 1434 | status.latest_failed_part, |
| 1435 | status.latest_fail_time, |
| 1436 | status.latest_fail_reason, |
| 1437 | }); |
| 1438 | } |
| 1439 | } |
| 1440 | |
| 1441 | return result; |
| 1442 | } |
| 1443 | |
| 1444 | |
| 1445 | ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( |
| 1446 | ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper) |
| 1447 | : queue(queue_) |
| 1448 | , prev_virtual_parts(queue.format_version) |
| 1449 | { |
| 1450 | { |
| 1451 | std::lock_guard lock(queue.state_mutex); |
| 1452 | prev_virtual_parts = queue.virtual_parts; |
| 1453 | } |
| 1454 | |
| 1455 | /// Load current quorum status. |
| 1456 | auto quorum_last_part_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/last_part" ); |
| 1457 | auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status" ); |
| 1458 | |
| 1459 | /// Load current inserts |
| 1460 | std::unordered_set<String> lock_holder_paths; |
| 1461 | for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp" )) |
| 1462 | { |
| 1463 | if (startsWith(entry, "abandonable_lock-" )) |
| 1464 | lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry); |
| 1465 | } |
| 1466 | |
| 1467 | if (!lock_holder_paths.empty()) |
| 1468 | { |
| 1469 | Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers" ); |
| 1470 | std::vector<std::future<Coordination::ListResponse>> lock_futures; |
| 1471 | for (const String & partition : partitions) |
| 1472 | lock_futures.push_back(zookeeper->asyncGetChildren(queue.zookeeper_path + "/block_numbers/" + partition)); |
| 1473 | |
| 1474 | struct BlockInfo_ |
| 1475 | { |
| 1476 | String partition; |
| 1477 | Int64 number; |
| 1478 | String zk_path; |
| 1479 | std::future<Coordination::GetResponse> contents_future; |
| 1480 | }; |
| 1481 | |
| 1482 | std::vector<BlockInfo_> block_infos; |
| 1483 | for (size_t i = 0; i < partitions.size(); ++i) |
| 1484 | { |
| 1485 | Strings partition_block_numbers = lock_futures[i].get().names; |
| 1486 | for (const String & entry : partition_block_numbers) |
| 1487 | { |
| 1488 | /// TODO: cache block numbers that are abandoned. |
| 1489 | /// We won't need to check them on the next iteration. |
| 1490 | if (startsWith(entry, "block-" )) |
| 1491 | { |
| 1492 | Int64 block_number = parse<Int64>(entry.substr(strlen("block-" ))); |
| 1493 | String zk_path = queue.zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry; |
| 1494 | block_infos.emplace_back( |
| 1495 | BlockInfo_{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)}); |
| 1496 | } |
| 1497 | } |
| 1498 | } |
| 1499 | |
| 1500 | for (auto & block : block_infos) |
| 1501 | { |
| 1502 | Coordination::GetResponse resp = block.contents_future.get(); |
| 1503 | if (!resp.error && lock_holder_paths.count(resp.data)) |
| 1504 | committing_blocks[block.partition].insert(block.number); |
| 1505 | } |
| 1506 | } |
| 1507 | |
| 1508 | queue_.pullLogsToQueue(zookeeper); |
| 1509 | |
| 1510 | Coordination::GetResponse quorum_last_part_response = quorum_last_part_future.get(); |
| 1511 | if (!quorum_last_part_response.error) |
| 1512 | { |
| 1513 | ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(queue.format_version); |
| 1514 | if (!quorum_last_part_response.data.empty()) |
| 1515 | { |
| 1516 | parts_with_quorum.fromString(quorum_last_part_response.data); |
| 1517 | last_quorum_parts.clear(); |
| 1518 | for (const auto & added_part : parts_with_quorum.added_parts) |
| 1519 | last_quorum_parts.emplace(added_part.second); |
| 1520 | } |
| 1521 | } |
| 1522 | |
| 1523 | Coordination::GetResponse quorum_status_response = quorum_status_future.get(); |
| 1524 | if (!quorum_status_response.error) |
| 1525 | { |
| 1526 | ReplicatedMergeTreeQuorumEntry quorum_status; |
| 1527 | quorum_status.fromString(quorum_status_response.data); |
| 1528 | inprogress_quorum_part = quorum_status.part_name; |
| 1529 | } |
| 1530 | else |
| 1531 | inprogress_quorum_part.clear(); |
| 1532 | } |
| 1533 | |
| 1534 | bool ReplicatedMergeTreeMergePredicate::operator()( |
| 1535 | const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, |
| 1536 | String * out_reason) const |
| 1537 | { |
| 1538 | /// A sketch of a proof of why this method actually works: |
| 1539 | /// |
| 1540 | /// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right. |
| 1541 | /// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h). |
| 1542 | /// These block numbers are monotonically increasing in a partition. |
| 1543 | /// |
| 1544 | /// Because there is a window between the moment the inserted part gets its block number and |
| 1545 | /// the moment it is committed (appears in the replication log), we can't get the name of all parts up to the given |
| 1546 | /// block number just by looking at the replication log - some parts with smaller block numbers may be currently committing |
| 1547 | /// and will appear in the log later than the parts with bigger block numbers. |
| 1548 | /// |
| 1549 | /// We also can't take a consistent snapshot of parts that are already committed plus parts that are about to commit |
| 1550 | /// due to limitations of ZooKeeper transactions. |
| 1551 | /// |
| 1552 | /// So we do the following (see the constructor): |
| 1553 | /// * copy virtual_parts from queue to prev_virtual_parts |
| 1554 | /// (a set of parts which corresponds to executing the replication log up to a certain point) |
| 1555 | /// * load committing_blocks (inserts and mutations that have already acquired a block number but haven't appeared in the log yet) |
| 1556 | /// * do pullLogsToQueue() again to load fresh queue.virtual_parts and mutations. |
| 1557 | /// |
| 1558 | /// Now we have an invariant: if some part is in prev_virtual_parts then: |
| 1559 | /// * all parts with smaller block numbers are either in committing_blocks or in queue.virtual_parts |
| 1560 | /// (those that managed to commit before we loaded committing_blocks). |
| 1561 | /// * all mutations with smaller block numbers are either in committing_blocks or in queue.mutations_by_partition |
| 1562 | /// |
| 1563 | /// So to check that no new parts will ever appear in the range of blocks between left and right we first check that |
| 1564 | /// left and right are already present in prev_virtual_parts (we can't give a definite answer for parts that were committed later) |
| 1565 | /// and then check that there are no blocks between them in committing_blocks and no parts in queue.virtual_parts. |
| 1566 | /// |
| 1567 | /// Similarly, to check that there will be no mutation with a block number between two parts from prev_virtual_parts |
| 1568 | /// (only then we can merge them without mutating the left part), we first check committing_blocks |
| 1569 | /// and then check that these two parts have the same mutation version according to queue.mutations_by_partition. |
| 1570 | |
| 1571 | if (left->info.partition_id != right->info.partition_id) |
| 1572 | { |
| 1573 | if (out_reason) |
| 1574 | *out_reason = "Parts " + left->name + " and " + right->name + " belong to different partitions" ; |
| 1575 | return false; |
| 1576 | } |
| 1577 | |
| 1578 | for (const MergeTreeData::DataPartPtr & part : {left, right}) |
| 1579 | { |
| 1580 | if (last_quorum_parts.find(part->name) != last_quorum_parts.end()) |
| 1581 | { |
| 1582 | if (out_reason) |
| 1583 | *out_reason = "Part " + part->name + " is the most recent part with a satisfied quorum" ; |
| 1584 | return false; |
| 1585 | } |
| 1586 | |
| 1587 | if (part->name == inprogress_quorum_part) |
| 1588 | { |
| 1589 | if (out_reason) |
| 1590 | *out_reason = "Quorum insert for part " + part->name + " is currently in progress" ; |
| 1591 | return false; |
| 1592 | } |
| 1593 | |
| 1594 | if (prev_virtual_parts.getContainingPart(part->info).empty()) |
| 1595 | { |
| 1596 | if (out_reason) |
| 1597 | *out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet" ; |
| 1598 | return false; |
| 1599 | } |
| 1600 | } |
| 1601 | |
| 1602 | Int64 left_max_block = left->info.max_block; |
| 1603 | Int64 right_min_block = right->info.min_block; |
| 1604 | if (left_max_block > right_min_block) |
| 1605 | std::swap(left_max_block, right_min_block); |
| 1606 | |
| 1607 | if (left_max_block + 1 < right_min_block) |
| 1608 | { |
| 1609 | auto committing_blocks_in_partition = committing_blocks.find(left->info.partition_id); |
| 1610 | if (committing_blocks_in_partition != committing_blocks.end()) |
| 1611 | { |
| 1612 | const std::set<Int64> & block_numbers = committing_blocks_in_partition->second; |
| 1613 | |
| 1614 | auto block_it = block_numbers.upper_bound(left_max_block); |
| 1615 | if (block_it != block_numbers.end() && *block_it < right_min_block) |
| 1616 | { |
| 1617 | if (out_reason) |
| 1618 | *out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts " |
| 1619 | + left->name + " and " + right->name; |
| 1620 | |
| 1621 | return false; |
| 1622 | } |
| 1623 | } |
| 1624 | } |
| 1625 | |
| 1626 | std::lock_guard lock(queue.state_mutex); |
| 1627 | |
| 1628 | for (const MergeTreeData::DataPartPtr & part : {left, right}) |
| 1629 | { |
| 1630 | /// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer |
| 1631 | /// and it is guaranteed that it will contain all merges assigned before this object is constructed. |
| 1632 | String containing_part = queue.virtual_parts.getContainingPart(part->info); |
| 1633 | if (containing_part != part->name) |
| 1634 | { |
| 1635 | if (out_reason) |
| 1636 | *out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part; |
| 1637 | return false; |
| 1638 | } |
| 1639 | } |
| 1640 | |
| 1641 | if (left_max_block + 1 < right_min_block) |
| 1642 | { |
| 1643 | /// Fake part which will appear as merge result |
| 1644 | MergeTreePartInfo gap_part_info( |
| 1645 | left->info.partition_id, left_max_block + 1, right_min_block - 1, |
| 1646 | MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER); |
| 1647 | |
| 1648 | /// We don't select parts if any smaller part covered by our merge must exist after |
| 1649 | /// processing replication log up to log_pointer. |
| 1650 | Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info); |
| 1651 | if (!covered.empty()) |
| 1652 | { |
| 1653 | if (out_reason) |
| 1654 | *out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front() |
| 1655 | + " to " + covered.back() + ") that are still not present or beeing processed by " |
| 1656 | + " other background process on this replica between " + left->name + " and " + right->name; |
| 1657 | return false; |
| 1658 | } |
| 1659 | } |
| 1660 | |
| 1661 | Int64 left_mutation_ver = queue.getCurrentMutationVersionImpl( |
| 1662 | left->info.partition_id, left->info.getDataVersion(), lock); |
| 1663 | |
| 1664 | Int64 right_mutation_ver = queue.getCurrentMutationVersionImpl( |
| 1665 | left->info.partition_id, right->info.getDataVersion(), lock); |
| 1666 | |
| 1667 | if (left_mutation_ver != right_mutation_ver) |
| 1668 | { |
| 1669 | if (out_reason) |
| 1670 | *out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: " |
| 1671 | + toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively" ; |
| 1672 | return false; |
| 1673 | } |
| 1674 | |
| 1675 | return true; |
| 1676 | } |
| 1677 | |
| 1678 | |
| 1679 | std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const |
| 1680 | { |
| 1681 | /// Assigning mutations is easier than assigning merges because mutations appear in the same order as |
| 1682 | /// the order of their version numbers (see StorageReplicatedMergeTree::mutate). |
| 1683 | /// This means that if we have loaded the mutation with version number X then all mutations with |
| 1684 | /// the version numbers less than X are also loaded and if there is no merge or mutation assigned to |
| 1685 | /// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to |
| 1686 | /// version X for this part. |
| 1687 | |
| 1688 | if (last_quorum_parts.find(part->name) != last_quorum_parts.end() |
| 1689 | || part->name == inprogress_quorum_part) |
| 1690 | return {}; |
| 1691 | |
| 1692 | std::lock_guard lock(queue.state_mutex); |
| 1693 | |
| 1694 | if (queue.virtual_parts.getContainingPart(part->info) != part->name) |
| 1695 | return {}; |
| 1696 | |
| 1697 | auto in_partition = queue.mutations_by_partition.find(part->info.partition_id); |
| 1698 | if (in_partition == queue.mutations_by_partition.end()) |
| 1699 | return {}; |
| 1700 | |
| 1701 | Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock); |
| 1702 | Int64 max_version = in_partition->second.rbegin()->first; |
| 1703 | if (current_version >= max_version) |
| 1704 | return {}; |
| 1705 | |
| 1706 | return max_version; |
| 1707 | } |
| 1708 | |
| 1709 | |
| 1710 | bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const |
| 1711 | { |
| 1712 | for (const auto & kv : mutation.block_numbers) |
| 1713 | { |
| 1714 | const String & partition_id = kv.first; |
| 1715 | Int64 block_num = kv.second; |
| 1716 | |
| 1717 | auto partition_it = committing_blocks.find(partition_id); |
| 1718 | if (partition_it != committing_blocks.end()) |
| 1719 | { |
| 1720 | size_t blocks_count = std::distance( |
| 1721 | partition_it->second.begin(), partition_it->second.lower_bound(block_num)); |
| 1722 | if (blocks_count) |
| 1723 | { |
| 1724 | LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because " |
| 1725 | << "in partition ID " << partition_id << " there are still " |
| 1726 | << blocks_count << " uncommitted blocks." ); |
| 1727 | return false; |
| 1728 | } |
| 1729 | } |
| 1730 | } |
| 1731 | |
| 1732 | { |
| 1733 | std::lock_guard lock(queue.state_mutex); |
| 1734 | |
| 1735 | size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts).size(); |
| 1736 | if (suddenly_appeared_parts) |
| 1737 | { |
| 1738 | LOG_TRACE(queue.log, "Mutation " << mutation.znode_name << " is not done yet because " |
| 1739 | << suddenly_appeared_parts << " parts to mutate suddenly appeared." ); |
| 1740 | return false; |
| 1741 | } |
| 1742 | } |
| 1743 | |
| 1744 | return true; |
| 1745 | } |
| 1746 | |
| 1747 | |
| 1748 | ReplicatedMergeTreeQueue::SubscriberHandler |
| 1749 | ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback) |
| 1750 | { |
| 1751 | std::lock_guard lock(state_mutex); |
| 1752 | std::lock_guard lock_subscribers(subscribers_mutex); |
| 1753 | |
| 1754 | auto it = subscribers.emplace(subscribers.end(), std::move(callback)); |
| 1755 | |
| 1756 | /// Atomically notify about current size |
| 1757 | (*it)(queue.size()); |
| 1758 | |
| 1759 | return SubscriberHandler(it, *this); |
| 1760 | } |
| 1761 | |
| 1762 | ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler() |
| 1763 | { |
| 1764 | std::lock_guard lock(queue.subscribers_mutex); |
| 1765 | queue.subscribers.erase(it); |
| 1766 | } |
| 1767 | |
| 1768 | void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size) |
| 1769 | { |
| 1770 | std::lock_guard lock_subscribers(subscribers_mutex); |
| 1771 | for (auto & subscriber_callback : subscribers) |
| 1772 | subscriber_callback(new_queue_size); |
| 1773 | } |
| 1774 | |
| 1775 | ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue() |
| 1776 | { |
| 1777 | notifySubscribers(0); |
| 1778 | } |
| 1779 | |
| 1780 | String padIndex(Int64 index) |
| 1781 | { |
| 1782 | String index_str = toString(index); |
| 1783 | return std::string(10 - index_str.size(), '0') + index_str; |
| 1784 | } |
| 1785 | |
| 1786 | } |
| 1787 | |