| 1 | #include <Storages/StorageReplicatedMergeTree.h> |
| 2 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> |
| 3 | #include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h> |
| 4 | #include <Interpreters/PartLog.h> |
| 5 | #include <DataStreams/IBlockOutputStream.h> |
| 6 | #include <Common/SipHash.h> |
| 7 | #include <Common/ZooKeeper/KeeperException.h> |
| 8 | #include <IO/Operators.h> |
| 9 | |
| 10 | |
| 11 | namespace ProfileEvents |
| 12 | { |
| 13 | extern const Event DuplicatedInsertedBlocks; |
| 14 | } |
| 15 | |
| 16 | namespace DB |
| 17 | { |
| 18 | |
| 19 | namespace ErrorCodes |
| 20 | { |
| 21 | extern const int TOO_FEW_LIVE_REPLICAS; |
| 22 | extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE; |
| 23 | extern const int CHECKSUM_DOESNT_MATCH; |
| 24 | extern const int UNEXPECTED_ZOOKEEPER_ERROR; |
| 25 | extern const int NO_ZOOKEEPER; |
| 26 | extern const int READONLY; |
| 27 | extern const int UNKNOWN_STATUS_OF_INSERT; |
| 28 | extern const int INSERT_WAS_DEDUPLICATED; |
| 29 | extern const int KEEPER_EXCEPTION; |
| 30 | extern const int TIMEOUT_EXCEEDED; |
| 31 | extern const int NO_ACTIVE_REPLICAS; |
| 32 | } |
| 33 | |
| 34 | |
| 35 | ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( |
| 36 | StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool deduplicate_) |
| 37 | : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block_), deduplicate(deduplicate_), |
| 38 | log(&Logger::get(storage.getLogName() + " (Replicated OutputStream)" )) |
| 39 | { |
| 40 | /// The quorum value `1` has the same meaning as if it is disabled. |
| 41 | if (quorum == 1) |
| 42 | quorum = 0; |
| 43 | } |
| 44 | |
| 45 | |
| 46 | Block ReplicatedMergeTreeBlockOutputStream::() const |
| 47 | { |
| 48 | return storage.getSampleBlock(); |
| 49 | } |
| 50 | |
| 51 | |
| 52 | /// Allow to verify that the session in ZooKeeper is still alive. |
| 53 | static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper) |
| 54 | { |
| 55 | if (!zookeeper) |
| 56 | throw Exception("No ZooKeeper session." , ErrorCodes::NO_ZOOKEEPER); |
| 57 | |
| 58 | if (zookeeper->expired()) |
| 59 | throw Exception("ZooKeeper session has been expired." , ErrorCodes::NO_ZOOKEEPER); |
| 60 | } |
| 61 | |
| 62 | |
| 63 | void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper) |
| 64 | { |
| 65 | quorum_info.status_path = storage.zookeeper_path + "/quorum/status" ; |
| 66 | |
| 67 | std::future<Coordination::GetResponse> quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path); |
| 68 | std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active" ); |
| 69 | std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host" ); |
| 70 | |
| 71 | /// List of live replicas. All of them register an ephemeral node for leader_election. |
| 72 | |
| 73 | Coordination::Stat leader_election_stat; |
| 74 | zookeeper->get(storage.zookeeper_path + "/leader_election" , &leader_election_stat); |
| 75 | |
| 76 | if (leader_election_stat.numChildren < static_cast<int32_t>(quorum)) |
| 77 | throw Exception("Number of alive replicas (" |
| 78 | + toString(leader_election_stat.numChildren) + ") is less than requested quorum (" + toString(quorum) + ")." , |
| 79 | ErrorCodes::TOO_FEW_LIVE_REPLICAS); |
| 80 | |
| 81 | /** Is there a quorum for the last part for which a quorum is needed? |
| 82 | * Write of all the parts with the included quorum is linearly ordered. |
| 83 | * This means that at any time there can be only one part, |
| 84 | * for which you need, but not yet reach the quorum. |
| 85 | * Information about this part will be located in `/quorum/status` node. |
| 86 | * If the quorum is reached, then the node is deleted. |
| 87 | */ |
| 88 | |
| 89 | auto quorum_status = quorum_status_future.get(); |
| 90 | if (quorum_status.error != Coordination::ZNONODE) |
| 91 | throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); |
| 92 | |
| 93 | /// Both checks are implicitly made also later (otherwise there would be a race condition). |
| 94 | |
| 95 | auto is_active = is_active_future.get(); |
| 96 | auto host = host_future.get(); |
| 97 | |
| 98 | if (is_active.error == Coordination::ZNONODE || host.error == Coordination::ZNONODE) |
| 99 | throw Exception("Replica is not active right now" , ErrorCodes::READONLY); |
| 100 | |
| 101 | quorum_info.is_active_node_value = is_active.data; |
| 102 | quorum_info.is_active_node_version = is_active.stat.version; |
| 103 | quorum_info.host_node_version = host.stat.version; |
| 104 | } |
| 105 | |
| 106 | |
| 107 | void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) |
| 108 | { |
| 109 | last_block_is_duplicate = false; |
| 110 | |
| 111 | /// TODO Is it possible to not lock the table structure here? |
| 112 | storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event); |
| 113 | |
| 114 | auto zookeeper = storage.getZooKeeper(); |
| 115 | assertSessionIsNotExpired(zookeeper); |
| 116 | |
| 117 | /** If write is with quorum, then we check that the required number of replicas is now live, |
| 118 | * and also that for all previous parts for which quorum is required, this quorum is reached. |
| 119 | * And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node). |
| 120 | * TODO Too complex logic, you can do better. |
| 121 | */ |
| 122 | if (quorum) |
| 123 | checkQuorumPrecondition(zookeeper); |
| 124 | |
| 125 | auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block); |
| 126 | |
| 127 | for (auto & current_block : part_blocks) |
| 128 | { |
| 129 | Stopwatch watch; |
| 130 | |
| 131 | /// Write part to the filesystem under temporary name. Calculate a checksum. |
| 132 | |
| 133 | MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block); |
| 134 | |
| 135 | String block_id; |
| 136 | |
| 137 | if (deduplicate) |
| 138 | { |
| 139 | SipHash hash; |
| 140 | part->checksums.computeTotalChecksumDataOnly(hash); |
| 141 | union |
| 142 | { |
| 143 | char bytes[16]; |
| 144 | UInt64 words[2]; |
| 145 | } hash_value; |
| 146 | hash.get128(hash_value.bytes); |
| 147 | |
| 148 | /// We add the hash from the data and partition identifier to deduplication ID. |
| 149 | /// That is, do not insert the same data to the same partition twice. |
| 150 | block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); |
| 151 | |
| 152 | LOG_DEBUG(log, "Wrote block with ID '" << block_id << "', " << block.rows() << " rows" ); |
| 153 | } |
| 154 | else |
| 155 | { |
| 156 | LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows" ); |
| 157 | } |
| 158 | |
| 159 | try |
| 160 | { |
| 161 | commitPart(zookeeper, part, block_id); |
| 162 | |
| 163 | /// Set a special error code if the block is duplicate |
| 164 | int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; |
| 165 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus(error)); |
| 166 | } |
| 167 | catch (...) |
| 168 | { |
| 169 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); |
| 170 | throw; |
| 171 | } |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | |
| 176 | void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part) |
| 177 | { |
| 178 | last_block_is_duplicate = false; |
| 179 | |
| 180 | /// NOTE: No delay in this case. That's Ok. |
| 181 | |
| 182 | auto zookeeper = storage.getZooKeeper(); |
| 183 | assertSessionIsNotExpired(zookeeper); |
| 184 | |
| 185 | if (quorum) |
| 186 | checkQuorumPrecondition(zookeeper); |
| 187 | |
| 188 | Stopwatch watch; |
| 189 | |
| 190 | try |
| 191 | { |
| 192 | commitPart(zookeeper, part, "" ); |
| 193 | PartLog::addNewPart(storage.global_context, part, watch.elapsed()); |
| 194 | } |
| 195 | catch (...) |
| 196 | { |
| 197 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); |
| 198 | throw; |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | |
| 203 | void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id) |
| 204 | { |
| 205 | storage.check(part->columns); |
| 206 | assertSessionIsNotExpired(zookeeper); |
| 207 | |
| 208 | /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. |
| 209 | /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned. |
| 210 | /// Also, make deduplication check. If a duplicate is detected, no nodes are created. |
| 211 | |
| 212 | /// Allocate new block number and check for duplicates |
| 213 | bool deduplicate_block = !block_id.empty(); |
| 214 | String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "" ; |
| 215 | auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); |
| 216 | |
| 217 | if (!block_number_lock) |
| 218 | { |
| 219 | LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it." ); |
| 220 | part->is_duplicate = true; |
| 221 | last_block_is_duplicate = true; |
| 222 | ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); |
| 223 | return; |
| 224 | } |
| 225 | |
| 226 | Int64 block_number = block_number_lock->getNumber(); |
| 227 | |
| 228 | /// Set part attributes according to part_number. Prepare an entry for log. |
| 229 | |
| 230 | part->info.min_block = block_number; |
| 231 | part->info.max_block = block_number; |
| 232 | part->info.level = 0; |
| 233 | |
| 234 | String part_name = part->getNewName(part->info); |
| 235 | part->name = part_name; |
| 236 | |
| 237 | StorageReplicatedMergeTree::LogEntry log_entry; |
| 238 | log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; |
| 239 | log_entry.create_time = time(nullptr); |
| 240 | log_entry.source_replica = storage.replica_name; |
| 241 | log_entry.new_part_name = part_name; |
| 242 | log_entry.quorum = quorum; |
| 243 | log_entry.block_id = block_id; |
| 244 | |
| 245 | /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. |
| 246 | |
| 247 | /// Information about the part. |
| 248 | Coordination::Requests ops; |
| 249 | |
| 250 | storage.getCommitPartOps(ops, part, block_id_path); |
| 251 | |
| 252 | /// Replication log. |
| 253 | ops.emplace_back(zkutil::makeCreateRequest( |
| 254 | storage.zookeeper_path + "/log/log-" , |
| 255 | log_entry.toString(), |
| 256 | zkutil::CreateMode::PersistentSequential)); |
| 257 | |
| 258 | /// Deletes the information that the block number is used for writing. |
| 259 | block_number_lock->getUnlockOps(ops); |
| 260 | |
| 261 | /** If you need a quorum - create a node in which the quorum is monitored. |
| 262 | * (If such a node already exists, then someone has managed to make another quorum record at the same time, but for it the quorum has not yet been reached. |
| 263 | * You can not do the next quorum record at this time.) |
| 264 | */ |
| 265 | if (quorum) |
| 266 | { |
| 267 | ReplicatedMergeTreeQuorumEntry quorum_entry; |
| 268 | quorum_entry.part_name = part_name; |
| 269 | quorum_entry.required_number_of_replicas = quorum; |
| 270 | quorum_entry.replicas.insert(storage.replica_name); |
| 271 | |
| 272 | /** At this point, this node will contain information that the current replica received a part. |
| 273 | * When other replicas will receive this part (in the usual way, processing the replication log), |
| 274 | * they will add themselves to the contents of this node. |
| 275 | * When it contains information about `quorum` number of replicas, this node is deleted, |
| 276 | * which indicates that the quorum has been reached. |
| 277 | */ |
| 278 | |
| 279 | ops.emplace_back( |
| 280 | zkutil::makeCreateRequest( |
| 281 | quorum_info.status_path, |
| 282 | quorum_entry.toString(), |
| 283 | zkutil::CreateMode::Persistent)); |
| 284 | |
| 285 | /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). |
| 286 | ops.emplace_back( |
| 287 | zkutil::makeCheckRequest( |
| 288 | storage.replica_path + "/is_active" , |
| 289 | quorum_info.is_active_node_version)); |
| 290 | |
| 291 | /// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version. |
| 292 | /// But then the `host` value will change. We will check this. |
| 293 | /// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread). |
| 294 | ops.emplace_back( |
| 295 | zkutil::makeCheckRequest( |
| 296 | storage.replica_path + "/host" , |
| 297 | quorum_info.host_node_version)); |
| 298 | } |
| 299 | |
| 300 | MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. |
| 301 | storage.renameTempPartAndAdd(part, nullptr, &transaction); |
| 302 | |
| 303 | Coordination::Responses responses; |
| 304 | int32_t multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT |
| 305 | |
| 306 | if (multi_code == Coordination::ZOK) |
| 307 | { |
| 308 | transaction.commit(); |
| 309 | storage.merge_selecting_task->schedule(); |
| 310 | |
| 311 | /// Lock nodes have been already deleted, do not delete them in destructor |
| 312 | block_number_lock->assumeUnlocked(); |
| 313 | } |
| 314 | else if (multi_code == Coordination::ZCONNECTIONLOSS |
| 315 | || multi_code == Coordination::ZOPERATIONTIMEOUT) |
| 316 | { |
| 317 | /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part |
| 318 | * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. |
| 319 | */ |
| 320 | transaction.commit(); |
| 321 | storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); |
| 322 | |
| 323 | /// We do not know whether or not data has been inserted. |
| 324 | throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)), |
| 325 | ErrorCodes::UNKNOWN_STATUS_OF_INSERT); |
| 326 | } |
| 327 | else if (Coordination::isUserError(multi_code)) |
| 328 | { |
| 329 | String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp(); |
| 330 | |
| 331 | if (multi_code == Coordination::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path) |
| 332 | { |
| 333 | /// Block with the same id have just appeared in table (or other replica), rollback thee insertion. |
| 334 | LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")" ); |
| 335 | |
| 336 | part->is_duplicate = true; |
| 337 | transaction.rollback(); |
| 338 | last_block_is_duplicate = true; |
| 339 | ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); |
| 340 | } |
| 341 | else if (multi_code == Coordination::ZNODEEXISTS && failed_op_path == quorum_info.status_path) |
| 342 | { |
| 343 | transaction.rollback(); |
| 344 | |
| 345 | throw Exception("Another quorum insert has been already started" , ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); |
| 346 | } |
| 347 | else |
| 348 | { |
| 349 | /// NOTE: We could be here if the node with the quorum existed, but was quickly removed. |
| 350 | transaction.rollback(); |
| 351 | throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': " |
| 352 | + zkutil::ZooKeeper::error2string(multi_code) + ", path " + failed_op_path, |
| 353 | ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); |
| 354 | } |
| 355 | } |
| 356 | else if (Coordination::isHardwareError(multi_code)) |
| 357 | { |
| 358 | transaction.rollback(); |
| 359 | throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': " |
| 360 | + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); |
| 361 | } |
| 362 | else |
| 363 | { |
| 364 | transaction.rollback(); |
| 365 | throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': " |
| 366 | + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); |
| 367 | } |
| 368 | |
| 369 | if (quorum) |
| 370 | { |
| 371 | /// We are waiting for quorum to be satisfied. |
| 372 | LOG_TRACE(log, "Waiting for quorum" ); |
| 373 | |
| 374 | String quorum_status_path = storage.zookeeper_path + "/quorum/status" ; |
| 375 | |
| 376 | try |
| 377 | { |
| 378 | while (true) |
| 379 | { |
| 380 | zkutil::EventPtr event = std::make_shared<Poco::Event>(); |
| 381 | |
| 382 | std::string value; |
| 383 | /// `get` instead of `exists` so that `watch` does not leak if the node is no longer there. |
| 384 | if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event)) |
| 385 | break; |
| 386 | |
| 387 | ReplicatedMergeTreeQuorumEntry quorum_entry(value); |
| 388 | |
| 389 | /// If the node has time to disappear, and then appear again for the next insert. |
| 390 | if (quorum_entry.part_name != part_name) |
| 391 | break; |
| 392 | |
| 393 | if (!event->tryWait(quorum_timeout_ms)) |
| 394 | throw Exception("Timeout while waiting for quorum" , ErrorCodes::TIMEOUT_EXCEEDED); |
| 395 | } |
| 396 | |
| 397 | /// And what if it is possible that the current replica at this time has ceased to be active and the quorum is marked as failed and deleted? |
| 398 | String value; |
| 399 | if (!zookeeper->tryGet(storage.replica_path + "/is_active" , value, nullptr) |
| 400 | || value != quorum_info.is_active_node_value) |
| 401 | throw Exception("Replica become inactive while waiting for quorum" , ErrorCodes::NO_ACTIVE_REPLICAS); |
| 402 | } |
| 403 | catch (...) |
| 404 | { |
| 405 | /// We do not know whether or not data has been inserted |
| 406 | /// - whether other replicas have time to download the part and mark the quorum as done. |
| 407 | throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false), |
| 408 | ErrorCodes::UNKNOWN_STATUS_OF_INSERT); |
| 409 | } |
| 410 | |
| 411 | LOG_TRACE(log, "Quorum satisfied" ); |
| 412 | } |
| 413 | } |
| 414 | |
| 415 | void ReplicatedMergeTreeBlockOutputStream::writePrefix() |
| 416 | { |
| 417 | storage.throwInsertIfNeeded(); |
| 418 | } |
| 419 | |
| 420 | |
| 421 | } |
| 422 | |