| 1 | #include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h> | 
|---|
| 2 | #include <Storages/MergeTree/checkDataPart.h> | 
|---|
| 3 | #include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h> | 
|---|
| 4 | #include <Storages/StorageReplicatedMergeTree.h> | 
|---|
| 5 |  | 
|---|
| 6 |  | 
|---|
| 7 | namespace ProfileEvents | 
|---|
| 8 | { | 
|---|
| 9 | extern const Event ReplicatedPartChecks; | 
|---|
| 10 | extern const Event ReplicatedPartChecksFailed; | 
|---|
| 11 | extern const Event ReplicatedDataLoss; | 
|---|
| 12 | } | 
|---|
| 13 |  | 
|---|
| 14 | namespace DB | 
|---|
| 15 | { | 
|---|
| 16 |  | 
|---|
| 17 | namespace ErrorCodes | 
|---|
| 18 | { | 
|---|
| 19 | extern const int TABLE_DIFFERS_TOO_MUCH; | 
|---|
| 20 | } | 
|---|
| 21 |  | 
|---|
| 22 | static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000; | 
|---|
| 23 |  | 
|---|
| 24 |  | 
|---|
| 25 | ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_) | 
|---|
| 26 | : storage(storage_) | 
|---|
| 27 | , log_name(storage.database_name + "."+ storage.table_name + " (ReplicatedMergeTreePartCheckThread)") | 
|---|
| 28 | , log(&Logger::get(log_name)) | 
|---|
| 29 | { | 
|---|
| 30 | task = storage.global_context.getSchedulePool().createTask(log_name, [this] { run(); }); | 
|---|
| 31 | task->schedule(); | 
|---|
| 32 | } | 
|---|
| 33 |  | 
|---|
| 34 | ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread() | 
|---|
| 35 | { | 
|---|
| 36 | stop(); | 
|---|
| 37 | } | 
|---|
| 38 |  | 
|---|
| 39 | void ReplicatedMergeTreePartCheckThread::start() | 
|---|
| 40 | { | 
|---|
| 41 | std::lock_guard lock(start_stop_mutex); | 
|---|
| 42 | need_stop = false; | 
|---|
| 43 | task->activateAndSchedule(); | 
|---|
| 44 | } | 
|---|
| 45 |  | 
|---|
| 46 | void ReplicatedMergeTreePartCheckThread::stop() | 
|---|
| 47 | { | 
|---|
| 48 | //based on discussion on https://github.com/ClickHouse/ClickHouse/pull/1489#issuecomment-344756259 | 
|---|
| 49 | //using the schedule pool there is no problem in case stop is called two time in row and the start multiple times | 
|---|
| 50 |  | 
|---|
| 51 | std::lock_guard lock(start_stop_mutex); | 
|---|
| 52 | need_stop = true; | 
|---|
| 53 | task->deactivate(); | 
|---|
| 54 | } | 
|---|
| 55 |  | 
|---|
| 56 | void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds) | 
|---|
| 57 | { | 
|---|
| 58 | std::lock_guard lock(parts_mutex); | 
|---|
| 59 |  | 
|---|
| 60 | if (parts_set.count(name)) | 
|---|
| 61 | return; | 
|---|
| 62 |  | 
|---|
| 63 | parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); | 
|---|
| 64 | parts_set.insert(name); | 
|---|
| 65 | task->schedule(); | 
|---|
| 66 | } | 
|---|
| 67 |  | 
|---|
| 68 |  | 
|---|
| 69 | size_t ReplicatedMergeTreePartCheckThread::size() const | 
|---|
| 70 | { | 
|---|
| 71 | std::lock_guard lock(parts_mutex); | 
|---|
| 72 | return parts_set.size(); | 
|---|
| 73 | } | 
|---|
| 74 |  | 
|---|
| 75 |  | 
|---|
| 76 | void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & part_name) | 
|---|
| 77 | { | 
|---|
| 78 | auto zookeeper = storage.getZooKeeper(); | 
|---|
| 79 | String part_path = storage.replica_path + "/parts/"+ part_name; | 
|---|
| 80 |  | 
|---|
| 81 | /// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue. | 
|---|
| 82 | if (zookeeper->exists(part_path)) | 
|---|
| 83 | { | 
|---|
| 84 | LOG_WARNING(log, "Part "<< part_name << " exists in ZooKeeper but not locally. " | 
|---|
| 85 | "Removing from ZooKeeper and queueing a fetch."); | 
|---|
| 86 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); | 
|---|
| 87 |  | 
|---|
| 88 | storage.removePartAndEnqueueFetch(part_name); | 
|---|
| 89 | return; | 
|---|
| 90 | } | 
|---|
| 91 |  | 
|---|
| 92 | /// If the part is not in ZooKeeper, we'll check if it's at least somewhere. | 
|---|
| 93 | auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version); | 
|---|
| 94 |  | 
|---|
| 95 | /** The logic is as follows: | 
|---|
| 96 | * - if some live or inactive replica has such a part, or a part covering it | 
|---|
| 97 | *   - it is Ok, nothing is needed, it is then downloaded when processing the queue, when the replica comes to life; | 
|---|
| 98 | *   - or, if the replica never comes to life, then the administrator will delete or create a new replica with the same address and see everything from the beginning; | 
|---|
| 99 | * - if no one has such part or a part covering it, then | 
|---|
| 100 | *   - if there are two smaller parts, one with the same min block and the other with the same | 
|---|
| 101 | *     max block, we hope that all parts in between are present too and the needed part | 
|---|
| 102 | *     will appear on other replicas as a result of a merge. | 
|---|
| 103 | *   - otherwise, consider the part lost and delete the entry from the queue. | 
|---|
| 104 | * | 
|---|
| 105 | *   Note that this logic is not perfect - some part in the interior may be missing and the | 
|---|
| 106 | *   needed part will never appear. But precisely determining whether the part will appear as | 
|---|
| 107 | *   a result of a merge is complicated - we can't just check if all block numbers covered | 
|---|
| 108 | *   by the missing part are present somewhere (because gaps between blocks are possible) | 
|---|
| 109 | *   and to determine the constituent parts of the merge we need to query the replication log | 
|---|
| 110 | *   (both the common log and the queues of the individual replicas) and then, if the | 
|---|
| 111 | *   constituent parts are in turn not found, solve the problem recursively for them. | 
|---|
| 112 | * | 
|---|
| 113 | *   Considering the part lost when it is not in fact lost is very dangerous because it leads | 
|---|
| 114 | *   to divergent replicas and intersecting parts. So we err on the side of caution | 
|---|
| 115 | *   and don't delete the queue entry when in doubt. | 
|---|
| 116 | */ | 
|---|
| 117 |  | 
|---|
| 118 | LOG_WARNING(log, "Checking if anyone has a part covering "<< part_name << "."); | 
|---|
| 119 |  | 
|---|
| 120 | bool found_part_with_the_same_min_block = false; | 
|---|
| 121 | bool found_part_with_the_same_max_block = false; | 
|---|
| 122 |  | 
|---|
| 123 | Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas"); | 
|---|
| 124 | for (const String & replica : replicas) | 
|---|
| 125 | { | 
|---|
| 126 | Strings parts = zookeeper->getChildren(storage.zookeeper_path + "/replicas/"+ replica + "/parts"); | 
|---|
| 127 | for (const String & part_on_replica : parts) | 
|---|
| 128 | { | 
|---|
| 129 | auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version); | 
|---|
| 130 |  | 
|---|
| 131 | if (part_on_replica_info.contains(part_info)) | 
|---|
| 132 | { | 
|---|
| 133 | LOG_WARNING(log, "Found part "<< part_on_replica << " on "<< replica << " that covers the missing part "<< part_name); | 
|---|
| 134 | return; | 
|---|
| 135 | } | 
|---|
| 136 |  | 
|---|
| 137 | if (part_info.contains(part_on_replica_info)) | 
|---|
| 138 | { | 
|---|
| 139 | if (part_on_replica_info.min_block == part_info.min_block) | 
|---|
| 140 | found_part_with_the_same_min_block = true; | 
|---|
| 141 | if (part_on_replica_info.max_block == part_info.max_block) | 
|---|
| 142 | found_part_with_the_same_max_block = true; | 
|---|
| 143 |  | 
|---|
| 144 | if (found_part_with_the_same_min_block && found_part_with_the_same_max_block) | 
|---|
| 145 | { | 
|---|
| 146 | LOG_WARNING(log, | 
|---|
| 147 | "Found parts with the same min block and with the same max block as the missing part " | 
|---|
| 148 | << part_name << ". Hoping that it will eventually appear as a result of a merge."); | 
|---|
| 149 | return; | 
|---|
| 150 | } | 
|---|
| 151 | } | 
|---|
| 152 | } | 
|---|
| 153 | } | 
|---|
| 154 |  | 
|---|
| 155 | /// No one has such a part and the merge is impossible. | 
|---|
| 156 | String not_found_msg; | 
|---|
| 157 | if (found_part_with_the_same_max_block) | 
|---|
| 158 | not_found_msg = "a smaller part with the same max block."; | 
|---|
| 159 | else if (found_part_with_the_same_min_block) | 
|---|
| 160 | not_found_msg = "a smaller part with the same min block."; | 
|---|
| 161 | else | 
|---|
| 162 | not_found_msg = "smaller parts with either the same min block or the same max block."; | 
|---|
| 163 | LOG_ERROR(log, "No replica has part covering "<< part_name | 
|---|
| 164 | << " and a merge is impossible: we didn't find "<< not_found_msg); | 
|---|
| 165 |  | 
|---|
| 166 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); | 
|---|
| 167 |  | 
|---|
| 168 | /// Is it in the replication queue? If there is - delete, because the task can not be processed. | 
|---|
| 169 | if (!storage.queue.remove(zookeeper, part_name)) | 
|---|
| 170 | { | 
|---|
| 171 | /// The part was not in our queue. Why did it happen? | 
|---|
| 172 | LOG_ERROR(log, "Missing part "<< part_name << " is not in our queue."); | 
|---|
| 173 | return; | 
|---|
| 174 | } | 
|---|
| 175 |  | 
|---|
| 176 | /** This situation is possible if on all the replicas where the part was, it deteriorated. | 
|---|
| 177 | * For example, a replica that has just written it has power turned off and the data has not been written from cache to disk. | 
|---|
| 178 | */ | 
|---|
| 179 | LOG_ERROR(log, "Part "<< part_name << " is lost forever."); | 
|---|
| 180 | ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss); | 
|---|
| 181 | } | 
|---|
| 182 |  | 
|---|
| 183 |  | 
|---|
| 184 | CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) | 
|---|
| 185 | { | 
|---|
| 186 | LOG_WARNING(log, "Checking part "<< part_name); | 
|---|
| 187 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks); | 
|---|
| 188 |  | 
|---|
| 189 | /// If the part is still in the PreCommitted -> Committed transition, it is not lost | 
|---|
| 190 | /// and there is no need to go searching for it on other replicas. To definitely find the needed part | 
|---|
| 191 | /// if it exists (or a part containing it) we first search among the PreCommitted parts. | 
|---|
| 192 | auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::PreCommitted}); | 
|---|
| 193 | if (!part) | 
|---|
| 194 | part = storage.getActiveContainingPart(part_name); | 
|---|
| 195 |  | 
|---|
| 196 | /// We do not have this or a covering part. | 
|---|
| 197 | if (!part) | 
|---|
| 198 | { | 
|---|
| 199 | searchForMissingPart(part_name); | 
|---|
| 200 | return {part_name, false, "Part is missing, will search for it"}; | 
|---|
| 201 | } | 
|---|
| 202 | /// We have this part, and it's active. We will check whether we need this part and whether it has the right data. | 
|---|
| 203 | else if (part->name == part_name) | 
|---|
| 204 | { | 
|---|
| 205 | auto zookeeper = storage.getZooKeeper(); | 
|---|
| 206 | auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY); | 
|---|
| 207 |  | 
|---|
| 208 | auto  = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( | 
|---|
| 209 | part->columns, part->checksums); | 
|---|
| 210 |  | 
|---|
| 211 | String part_path = storage.replica_path + "/parts/"+ part_name; | 
|---|
| 212 | String part_znode; | 
|---|
| 213 | /// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper. | 
|---|
| 214 | if (zookeeper->tryGet(part_path, part_znode)) | 
|---|
| 215 | { | 
|---|
| 216 | LOG_WARNING(log, "Checking data of part "<< part_name << "."); | 
|---|
| 217 |  | 
|---|
| 218 | try | 
|---|
| 219 | { | 
|---|
| 220 | ReplicatedMergeTreePartHeader ; | 
|---|
| 221 | if (!part_znode.empty()) | 
|---|
| 222 | zk_part_header = ReplicatedMergeTreePartHeader::fromString(part_znode); | 
|---|
| 223 | else | 
|---|
| 224 | { | 
|---|
| 225 | String columns_znode = zookeeper->get(part_path + "/columns"); | 
|---|
| 226 | String checksums_znode = zookeeper->get(part_path + "/checksums"); | 
|---|
| 227 | zk_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes( | 
|---|
| 228 | columns_znode, checksums_znode); | 
|---|
| 229 | } | 
|---|
| 230 |  | 
|---|
| 231 | if (local_part_header.getColumnsHash() != zk_part_header.getColumnsHash()) | 
|---|
| 232 | throw Exception( "Columns of local part "+ part_name + " are different from ZooKeeper", ErrorCodes::TABLE_DIFFERS_TOO_MUCH); | 
|---|
| 233 |  | 
|---|
| 234 | zk_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true); | 
|---|
| 235 |  | 
|---|
| 236 | checkDataPart( | 
|---|
| 237 | part, | 
|---|
| 238 | true, | 
|---|
| 239 | storage.primary_key_data_types, | 
|---|
| 240 | storage.skip_indices, | 
|---|
| 241 | [this] { return need_stop.load(); }); | 
|---|
| 242 |  | 
|---|
| 243 | if (need_stop) | 
|---|
| 244 | { | 
|---|
| 245 | LOG_INFO(log, "Checking part was cancelled."); | 
|---|
| 246 | return {part_name, false, "Checking part was cancelled"}; | 
|---|
| 247 | } | 
|---|
| 248 |  | 
|---|
| 249 | LOG_INFO(log, "Part "<< part_name << " looks good."); | 
|---|
| 250 | } | 
|---|
| 251 | catch (const Exception &) | 
|---|
| 252 | { | 
|---|
| 253 | /// TODO Better to check error code. | 
|---|
| 254 |  | 
|---|
| 255 | tryLogCurrentException(log, __PRETTY_FUNCTION__); | 
|---|
| 256 |  | 
|---|
| 257 | String message = "Part "+ part_name + " looks broken. Removing it and queueing a fetch."; | 
|---|
| 258 | LOG_ERROR(log, message); | 
|---|
| 259 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); | 
|---|
| 260 |  | 
|---|
| 261 | storage.removePartAndEnqueueFetch(part_name); | 
|---|
| 262 |  | 
|---|
| 263 | /// Delete part locally. | 
|---|
| 264 | storage.forgetPartAndMoveToDetached(part, "broken"); | 
|---|
| 265 | return {part_name, false, message}; | 
|---|
| 266 | } | 
|---|
| 267 | } | 
|---|
| 268 | else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr)) | 
|---|
| 269 | { | 
|---|
| 270 | /// If the part is not in ZooKeeper, delete it locally. | 
|---|
| 271 | /// Probably, someone just wrote down the part, and has not yet added to ZK. | 
|---|
| 272 | /// Therefore, delete only if the part is old (not very reliable). | 
|---|
| 273 | ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); | 
|---|
| 274 |  | 
|---|
| 275 | String message = "Unexpected part "+ part_name + " in filesystem. Removing."; | 
|---|
| 276 | LOG_ERROR(log, message); | 
|---|
| 277 | storage.forgetPartAndMoveToDetached(part, "unexpected"); | 
|---|
| 278 | return {part_name, false, message}; | 
|---|
| 279 | } | 
|---|
| 280 | else | 
|---|
| 281 | { | 
|---|
| 282 | /// TODO You need to make sure that the part is still checked after a while. | 
|---|
| 283 | /// Otherwise, it's possible that the part was not added to ZK, | 
|---|
| 284 | ///  but remained in the filesystem and in a number of active parts. | 
|---|
| 285 | /// And then for a long time (before restarting), the data on the replicas will be different. | 
|---|
| 286 |  | 
|---|
| 287 | LOG_TRACE(log, "Young part "<< part_name | 
|---|
| 288 | << " with age "<< (time(nullptr) - part->modification_time) | 
|---|
| 289 | << " seconds hasn't been added to ZooKeeper yet. It's ok."); | 
|---|
| 290 | } | 
|---|
| 291 | } | 
|---|
| 292 | else | 
|---|
| 293 | { | 
|---|
| 294 | /// If we have a covering part, ignore all the problems with this part. | 
|---|
| 295 | /// In the worst case, errors will still appear `old_parts_lifetime` seconds in error log until the part is removed as the old one. | 
|---|
| 296 | LOG_WARNING(log, "We have part "<< part->name << " covering part "<< part_name); | 
|---|
| 297 | } | 
|---|
| 298 |  | 
|---|
| 299 | return {part_name, true, ""}; | 
|---|
| 300 | } | 
|---|
| 301 |  | 
|---|
| 302 |  | 
|---|
| 303 | void ReplicatedMergeTreePartCheckThread::run() | 
|---|
| 304 | { | 
|---|
| 305 | if (need_stop) | 
|---|
| 306 | return; | 
|---|
| 307 |  | 
|---|
| 308 | try | 
|---|
| 309 | { | 
|---|
| 310 | time_t current_time = time(nullptr); | 
|---|
| 311 |  | 
|---|
| 312 | /// Take part from the queue for verification. | 
|---|
| 313 | PartsToCheckQueue::iterator selected = parts_queue.end();    /// end from std::list is not get invalidated | 
|---|
| 314 | time_t min_check_time = std::numeric_limits<time_t>::max(); | 
|---|
| 315 |  | 
|---|
| 316 | { | 
|---|
| 317 | std::lock_guard lock(parts_mutex); | 
|---|
| 318 |  | 
|---|
| 319 | if (parts_queue.empty()) | 
|---|
| 320 | { | 
|---|
| 321 | if (!parts_set.empty()) | 
|---|
| 322 | { | 
|---|
| 323 | LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug."); | 
|---|
| 324 | parts_set.clear(); | 
|---|
| 325 | } | 
|---|
| 326 | } | 
|---|
| 327 | else | 
|---|
| 328 | { | 
|---|
| 329 | for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it) | 
|---|
| 330 | { | 
|---|
| 331 | if (it->second <= current_time) | 
|---|
| 332 | { | 
|---|
| 333 | selected = it; | 
|---|
| 334 | break; | 
|---|
| 335 | } | 
|---|
| 336 |  | 
|---|
| 337 | if (it->second < min_check_time) | 
|---|
| 338 | min_check_time = it->second; | 
|---|
| 339 | } | 
|---|
| 340 | } | 
|---|
| 341 | } | 
|---|
| 342 |  | 
|---|
| 343 | if (selected == parts_queue.end()) | 
|---|
| 344 | return; | 
|---|
| 345 |  | 
|---|
| 346 | checkPart(selected->first); | 
|---|
| 347 |  | 
|---|
| 348 | if (need_stop) | 
|---|
| 349 | return; | 
|---|
| 350 |  | 
|---|
| 351 | /// Remove the part from check queue. | 
|---|
| 352 | { | 
|---|
| 353 | std::lock_guard lock(parts_mutex); | 
|---|
| 354 |  | 
|---|
| 355 | if (parts_queue.empty()) | 
|---|
| 356 | { | 
|---|
| 357 | LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug."); | 
|---|
| 358 | } | 
|---|
| 359 | else | 
|---|
| 360 | { | 
|---|
| 361 | parts_set.erase(selected->first); | 
|---|
| 362 | parts_queue.erase(selected); | 
|---|
| 363 | } | 
|---|
| 364 | } | 
|---|
| 365 |  | 
|---|
| 366 | task->schedule(); | 
|---|
| 367 | } | 
|---|
| 368 | catch (const Coordination::Exception & e) | 
|---|
| 369 | { | 
|---|
| 370 | tryLogCurrentException(log, __PRETTY_FUNCTION__); | 
|---|
| 371 |  | 
|---|
| 372 | if (e.code == Coordination::ZSESSIONEXPIRED) | 
|---|
| 373 | return; | 
|---|
| 374 |  | 
|---|
| 375 | task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); | 
|---|
| 376 | } | 
|---|
| 377 | catch (...) | 
|---|
| 378 | { | 
|---|
| 379 | tryLogCurrentException(log, __PRETTY_FUNCTION__); | 
|---|
| 380 | task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); | 
|---|
| 381 | } | 
|---|
| 382 | } | 
|---|
| 383 |  | 
|---|
| 384 | } | 
|---|
| 385 |  | 
|---|