| 1 | #include <Storages/StorageReplicatedMergeTree.h> | 
|---|
| 2 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> | 
|---|
| 3 | #include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h> | 
|---|
| 4 | #include <Interpreters/PartLog.h> | 
|---|
| 5 | #include <DataStreams/IBlockOutputStream.h> | 
|---|
| 6 | #include <Common/SipHash.h> | 
|---|
| 7 | #include <Common/ZooKeeper/KeeperException.h> | 
|---|
| 8 | #include <IO/Operators.h> | 
|---|
| 9 |  | 
|---|
| 10 |  | 
|---|
| 11 | namespace ProfileEvents | 
|---|
| 12 | { | 
|---|
| 13 | extern const Event DuplicatedInsertedBlocks; | 
|---|
| 14 | } | 
|---|
| 15 |  | 
|---|
| 16 | namespace DB | 
|---|
| 17 | { | 
|---|
| 18 |  | 
|---|
| 19 | namespace ErrorCodes | 
|---|
| 20 | { | 
|---|
| 21 | extern const int TOO_FEW_LIVE_REPLICAS; | 
|---|
| 22 | extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE; | 
|---|
| 23 | extern const int CHECKSUM_DOESNT_MATCH; | 
|---|
| 24 | extern const int UNEXPECTED_ZOOKEEPER_ERROR; | 
|---|
| 25 | extern const int NO_ZOOKEEPER; | 
|---|
| 26 | extern const int READONLY; | 
|---|
| 27 | extern const int UNKNOWN_STATUS_OF_INSERT; | 
|---|
| 28 | extern const int INSERT_WAS_DEDUPLICATED; | 
|---|
| 29 | extern const int KEEPER_EXCEPTION; | 
|---|
| 30 | extern const int TIMEOUT_EXCEEDED; | 
|---|
| 31 | extern const int NO_ACTIVE_REPLICAS; | 
|---|
| 32 | } | 
|---|
| 33 |  | 
|---|
| 34 |  | 
|---|
| 35 | ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( | 
|---|
| 36 | StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool deduplicate_) | 
|---|
| 37 | : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block_), deduplicate(deduplicate_), | 
|---|
| 38 | log(&Logger::get(storage.getLogName() + " (Replicated OutputStream)")) | 
|---|
| 39 | { | 
|---|
| 40 | /// The quorum value `1` has the same meaning as if it is disabled. | 
|---|
| 41 | if (quorum == 1) | 
|---|
| 42 | quorum = 0; | 
|---|
| 43 | } | 
|---|
| 44 |  | 
|---|
| 45 |  | 
|---|
| 46 | Block ReplicatedMergeTreeBlockOutputStream::() const | 
|---|
| 47 | { | 
|---|
| 48 | return storage.getSampleBlock(); | 
|---|
| 49 | } | 
|---|
| 50 |  | 
|---|
| 51 |  | 
|---|
| 52 | /// Allow to verify that the session in ZooKeeper is still alive. | 
|---|
| 53 | static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper) | 
|---|
| 54 | { | 
|---|
| 55 | if (!zookeeper) | 
|---|
| 56 | throw Exception( "No ZooKeeper session.", ErrorCodes::NO_ZOOKEEPER); | 
|---|
| 57 |  | 
|---|
| 58 | if (zookeeper->expired()) | 
|---|
| 59 | throw Exception( "ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER); | 
|---|
| 60 | } | 
|---|
| 61 |  | 
|---|
| 62 |  | 
|---|
| 63 | void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper) | 
|---|
| 64 | { | 
|---|
| 65 | quorum_info.status_path = storage.zookeeper_path + "/quorum/status"; | 
|---|
| 66 |  | 
|---|
| 67 | std::future<Coordination::GetResponse> quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path); | 
|---|
| 68 | std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active"); | 
|---|
| 69 | std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host"); | 
|---|
| 70 |  | 
|---|
| 71 | /// List of live replicas. All of them register an ephemeral node for leader_election. | 
|---|
| 72 |  | 
|---|
| 73 | Coordination::Stat leader_election_stat; | 
|---|
| 74 | zookeeper->get(storage.zookeeper_path + "/leader_election", &leader_election_stat); | 
|---|
| 75 |  | 
|---|
| 76 | if (leader_election_stat.numChildren < static_cast<int32_t>(quorum)) | 
|---|
| 77 | throw Exception( "Number of alive replicas (" | 
|---|
| 78 | + toString(leader_election_stat.numChildren) + ") is less than requested quorum ("+ toString(quorum) + ").", | 
|---|
| 79 | ErrorCodes::TOO_FEW_LIVE_REPLICAS); | 
|---|
| 80 |  | 
|---|
| 81 | /** Is there a quorum for the last part for which a quorum is needed? | 
|---|
| 82 | * Write of all the parts with the included quorum is linearly ordered. | 
|---|
| 83 | * This means that at any time there can be only one part, | 
|---|
| 84 | *  for which you need, but not yet reach the quorum. | 
|---|
| 85 | * Information about this part will be located in `/quorum/status` node. | 
|---|
| 86 | * If the quorum is reached, then the node is deleted. | 
|---|
| 87 | */ | 
|---|
| 88 |  | 
|---|
| 89 | auto quorum_status = quorum_status_future.get(); | 
|---|
| 90 | if (quorum_status.error != Coordination::ZNONODE) | 
|---|
| 91 | throw Exception( "Quorum for previous write has not been satisfied yet. Status: "+ quorum_status.data, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); | 
|---|
| 92 |  | 
|---|
| 93 | /// Both checks are implicitly made also later (otherwise there would be a race condition). | 
|---|
| 94 |  | 
|---|
| 95 | auto is_active = is_active_future.get(); | 
|---|
| 96 | auto host = host_future.get(); | 
|---|
| 97 |  | 
|---|
| 98 | if (is_active.error == Coordination::ZNONODE || host.error == Coordination::ZNONODE) | 
|---|
| 99 | throw Exception( "Replica is not active right now", ErrorCodes::READONLY); | 
|---|
| 100 |  | 
|---|
| 101 | quorum_info.is_active_node_value = is_active.data; | 
|---|
| 102 | quorum_info.is_active_node_version = is_active.stat.version; | 
|---|
| 103 | quorum_info.host_node_version = host.stat.version; | 
|---|
| 104 | } | 
|---|
| 105 |  | 
|---|
| 106 |  | 
|---|
| 107 | void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) | 
|---|
| 108 | { | 
|---|
| 109 | last_block_is_duplicate = false; | 
|---|
| 110 |  | 
|---|
| 111 | /// TODO Is it possible to not lock the table structure here? | 
|---|
| 112 | storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event); | 
|---|
| 113 |  | 
|---|
| 114 | auto zookeeper = storage.getZooKeeper(); | 
|---|
| 115 | assertSessionIsNotExpired(zookeeper); | 
|---|
| 116 |  | 
|---|
| 117 | /** If write is with quorum, then we check that the required number of replicas is now live, | 
|---|
| 118 | *  and also that for all previous parts for which quorum is required, this quorum is reached. | 
|---|
| 119 | * And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node). | 
|---|
| 120 | * TODO Too complex logic, you can do better. | 
|---|
| 121 | */ | 
|---|
| 122 | if (quorum) | 
|---|
| 123 | checkQuorumPrecondition(zookeeper); | 
|---|
| 124 |  | 
|---|
| 125 | auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block); | 
|---|
| 126 |  | 
|---|
| 127 | for (auto & current_block : part_blocks) | 
|---|
| 128 | { | 
|---|
| 129 | Stopwatch watch; | 
|---|
| 130 |  | 
|---|
| 131 | /// Write part to the filesystem under temporary name. Calculate a checksum. | 
|---|
| 132 |  | 
|---|
| 133 | MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block); | 
|---|
| 134 |  | 
|---|
| 135 | String block_id; | 
|---|
| 136 |  | 
|---|
| 137 | if (deduplicate) | 
|---|
| 138 | { | 
|---|
| 139 | SipHash hash; | 
|---|
| 140 | part->checksums.computeTotalChecksumDataOnly(hash); | 
|---|
| 141 | union | 
|---|
| 142 | { | 
|---|
| 143 | char bytes[16]; | 
|---|
| 144 | UInt64 words[2]; | 
|---|
| 145 | } hash_value; | 
|---|
| 146 | hash.get128(hash_value.bytes); | 
|---|
| 147 |  | 
|---|
| 148 | /// We add the hash from the data and partition identifier to deduplication ID. | 
|---|
| 149 | /// That is, do not insert the same data to the same partition twice. | 
|---|
| 150 | block_id = part->info.partition_id + "_"+ toString(hash_value.words[0]) + "_"+ toString(hash_value.words[1]); | 
|---|
| 151 |  | 
|---|
| 152 | LOG_DEBUG(log, "Wrote block with ID '"<< block_id << "', "<< block.rows() << " rows"); | 
|---|
| 153 | } | 
|---|
| 154 | else | 
|---|
| 155 | { | 
|---|
| 156 | LOG_DEBUG(log, "Wrote block with "<< block.rows() << " rows"); | 
|---|
| 157 | } | 
|---|
| 158 |  | 
|---|
| 159 | try | 
|---|
| 160 | { | 
|---|
| 161 | commitPart(zookeeper, part, block_id); | 
|---|
| 162 |  | 
|---|
| 163 | /// Set a special error code if the block is duplicate | 
|---|
| 164 | int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; | 
|---|
| 165 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus(error)); | 
|---|
| 166 | } | 
|---|
| 167 | catch (...) | 
|---|
| 168 | { | 
|---|
| 169 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); | 
|---|
| 170 | throw; | 
|---|
| 171 | } | 
|---|
| 172 | } | 
|---|
| 173 | } | 
|---|
| 174 |  | 
|---|
| 175 |  | 
|---|
| 176 | void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part) | 
|---|
| 177 | { | 
|---|
| 178 | last_block_is_duplicate = false; | 
|---|
| 179 |  | 
|---|
| 180 | /// NOTE: No delay in this case. That's Ok. | 
|---|
| 181 |  | 
|---|
| 182 | auto zookeeper = storage.getZooKeeper(); | 
|---|
| 183 | assertSessionIsNotExpired(zookeeper); | 
|---|
| 184 |  | 
|---|
| 185 | if (quorum) | 
|---|
| 186 | checkQuorumPrecondition(zookeeper); | 
|---|
| 187 |  | 
|---|
| 188 | Stopwatch watch; | 
|---|
| 189 |  | 
|---|
| 190 | try | 
|---|
| 191 | { | 
|---|
| 192 | commitPart(zookeeper, part, ""); | 
|---|
| 193 | PartLog::addNewPart(storage.global_context, part, watch.elapsed()); | 
|---|
| 194 | } | 
|---|
| 195 | catch (...) | 
|---|
| 196 | { | 
|---|
| 197 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); | 
|---|
| 198 | throw; | 
|---|
| 199 | } | 
|---|
| 200 | } | 
|---|
| 201 |  | 
|---|
| 202 |  | 
|---|
| 203 | void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id) | 
|---|
| 204 | { | 
|---|
| 205 | storage.check(part->columns); | 
|---|
| 206 | assertSessionIsNotExpired(zookeeper); | 
|---|
| 207 |  | 
|---|
| 208 | /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. | 
|---|
| 209 | /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned. | 
|---|
| 210 | /// Also, make deduplication check. If a duplicate is detected, no nodes are created. | 
|---|
| 211 |  | 
|---|
| 212 | /// Allocate new block number and check for duplicates | 
|---|
| 213 | bool deduplicate_block = !block_id.empty(); | 
|---|
| 214 | String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/"+ block_id : ""; | 
|---|
| 215 | auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); | 
|---|
| 216 |  | 
|---|
| 217 | if (!block_number_lock) | 
|---|
| 218 | { | 
|---|
| 219 | LOG_INFO(log, "Block with ID "<< block_id << " already exists; ignoring it."); | 
|---|
| 220 | part->is_duplicate = true; | 
|---|
| 221 | last_block_is_duplicate = true; | 
|---|
| 222 | ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); | 
|---|
| 223 | return; | 
|---|
| 224 | } | 
|---|
| 225 |  | 
|---|
| 226 | Int64 block_number = block_number_lock->getNumber(); | 
|---|
| 227 |  | 
|---|
| 228 | /// Set part attributes according to part_number. Prepare an entry for log. | 
|---|
| 229 |  | 
|---|
| 230 | part->info.min_block = block_number; | 
|---|
| 231 | part->info.max_block = block_number; | 
|---|
| 232 | part->info.level = 0; | 
|---|
| 233 |  | 
|---|
| 234 | String part_name = part->getNewName(part->info); | 
|---|
| 235 | part->name = part_name; | 
|---|
| 236 |  | 
|---|
| 237 | StorageReplicatedMergeTree::LogEntry log_entry; | 
|---|
| 238 | log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; | 
|---|
| 239 | log_entry.create_time = time(nullptr); | 
|---|
| 240 | log_entry.source_replica = storage.replica_name; | 
|---|
| 241 | log_entry.new_part_name = part_name; | 
|---|
| 242 | log_entry.quorum = quorum; | 
|---|
| 243 | log_entry.block_id = block_id; | 
|---|
| 244 |  | 
|---|
| 245 | /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. | 
|---|
| 246 |  | 
|---|
| 247 | /// Information about the part. | 
|---|
| 248 | Coordination::Requests ops; | 
|---|
| 249 |  | 
|---|
| 250 | storage.getCommitPartOps(ops, part, block_id_path); | 
|---|
| 251 |  | 
|---|
| 252 | /// Replication log. | 
|---|
| 253 | ops.emplace_back(zkutil::makeCreateRequest( | 
|---|
| 254 | storage.zookeeper_path + "/log/log-", | 
|---|
| 255 | log_entry.toString(), | 
|---|
| 256 | zkutil::CreateMode::PersistentSequential)); | 
|---|
| 257 |  | 
|---|
| 258 | /// Deletes the information that the block number is used for writing. | 
|---|
| 259 | block_number_lock->getUnlockOps(ops); | 
|---|
| 260 |  | 
|---|
| 261 | /** If you need a quorum - create a node in which the quorum is monitored. | 
|---|
| 262 | * (If such a node already exists, then someone has managed to make another quorum record at the same time, but for it the quorum has not yet been reached. | 
|---|
| 263 | *  You can not do the next quorum record at this time.) | 
|---|
| 264 | */ | 
|---|
| 265 | if (quorum) | 
|---|
| 266 | { | 
|---|
| 267 | ReplicatedMergeTreeQuorumEntry quorum_entry; | 
|---|
| 268 | quorum_entry.part_name = part_name; | 
|---|
| 269 | quorum_entry.required_number_of_replicas = quorum; | 
|---|
| 270 | quorum_entry.replicas.insert(storage.replica_name); | 
|---|
| 271 |  | 
|---|
| 272 | /** At this point, this node will contain information that the current replica received a part. | 
|---|
| 273 | * When other replicas will receive this part (in the usual way, processing the replication log), | 
|---|
| 274 | *  they will add themselves to the contents of this node. | 
|---|
| 275 | * When it contains information about `quorum` number of replicas, this node is deleted, | 
|---|
| 276 | *  which indicates that the quorum has been reached. | 
|---|
| 277 | */ | 
|---|
| 278 |  | 
|---|
| 279 | ops.emplace_back( | 
|---|
| 280 | zkutil::makeCreateRequest( | 
|---|
| 281 | quorum_info.status_path, | 
|---|
| 282 | quorum_entry.toString(), | 
|---|
| 283 | zkutil::CreateMode::Persistent)); | 
|---|
| 284 |  | 
|---|
| 285 | /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). | 
|---|
| 286 | ops.emplace_back( | 
|---|
| 287 | zkutil::makeCheckRequest( | 
|---|
| 288 | storage.replica_path + "/is_active", | 
|---|
| 289 | quorum_info.is_active_node_version)); | 
|---|
| 290 |  | 
|---|
| 291 | /// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version. | 
|---|
| 292 | /// But then the `host` value will change. We will check this. | 
|---|
| 293 | /// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread). | 
|---|
| 294 | ops.emplace_back( | 
|---|
| 295 | zkutil::makeCheckRequest( | 
|---|
| 296 | storage.replica_path + "/host", | 
|---|
| 297 | quorum_info.host_node_version)); | 
|---|
| 298 | } | 
|---|
| 299 |  | 
|---|
| 300 | MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. | 
|---|
| 301 | storage.renameTempPartAndAdd(part, nullptr, &transaction); | 
|---|
| 302 |  | 
|---|
| 303 | Coordination::Responses responses; | 
|---|
| 304 | int32_t multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT | 
|---|
| 305 |  | 
|---|
| 306 | if (multi_code == Coordination::ZOK) | 
|---|
| 307 | { | 
|---|
| 308 | transaction.commit(); | 
|---|
| 309 | storage.merge_selecting_task->schedule(); | 
|---|
| 310 |  | 
|---|
| 311 | /// Lock nodes have been already deleted, do not delete them in destructor | 
|---|
| 312 | block_number_lock->assumeUnlocked(); | 
|---|
| 313 | } | 
|---|
| 314 | else if (multi_code == Coordination::ZCONNECTIONLOSS | 
|---|
| 315 | || multi_code == Coordination::ZOPERATIONTIMEOUT) | 
|---|
| 316 | { | 
|---|
| 317 | /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part | 
|---|
| 318 | *  if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. | 
|---|
| 319 | */ | 
|---|
| 320 | transaction.commit(); | 
|---|
| 321 | storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); | 
|---|
| 322 |  | 
|---|
| 323 | /// We do not know whether or not data has been inserted. | 
|---|
| 324 | throw Exception( "Unknown status, client must retry. Reason: "+ String(Coordination::errorMessage(multi_code)), | 
|---|
| 325 | ErrorCodes::UNKNOWN_STATUS_OF_INSERT); | 
|---|
| 326 | } | 
|---|
| 327 | else if (Coordination::isUserError(multi_code)) | 
|---|
| 328 | { | 
|---|
| 329 | String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp(); | 
|---|
| 330 |  | 
|---|
| 331 | if (multi_code == Coordination::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path) | 
|---|
| 332 | { | 
|---|
| 333 | /// Block with the same id have just appeared in table (or other replica), rollback thee insertion. | 
|---|
| 334 | LOG_INFO(log, "Block with ID "<< block_id << " already exists; ignoring it (removing part "<< part->name << ")"); | 
|---|
| 335 |  | 
|---|
| 336 | part->is_duplicate = true; | 
|---|
| 337 | transaction.rollback(); | 
|---|
| 338 | last_block_is_duplicate = true; | 
|---|
| 339 | ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); | 
|---|
| 340 | } | 
|---|
| 341 | else if (multi_code == Coordination::ZNODEEXISTS && failed_op_path == quorum_info.status_path) | 
|---|
| 342 | { | 
|---|
| 343 | transaction.rollback(); | 
|---|
| 344 |  | 
|---|
| 345 | throw Exception( "Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); | 
|---|
| 346 | } | 
|---|
| 347 | else | 
|---|
| 348 | { | 
|---|
| 349 | /// NOTE: We could be here if the node with the quorum existed, but was quickly removed. | 
|---|
| 350 | transaction.rollback(); | 
|---|
| 351 | throw Exception( "Unexpected logical error while adding block "+ toString(block_number) + " with ID '"+ block_id + "': " | 
|---|
| 352 | + zkutil::ZooKeeper::error2string(multi_code) + ", path "+ failed_op_path, | 
|---|
| 353 | ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); | 
|---|
| 354 | } | 
|---|
| 355 | } | 
|---|
| 356 | else if (Coordination::isHardwareError(multi_code)) | 
|---|
| 357 | { | 
|---|
| 358 | transaction.rollback(); | 
|---|
| 359 | throw Exception( "Unrecoverable network error while adding block "+ toString(block_number) + " with ID '"+ block_id + "': " | 
|---|
| 360 | + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); | 
|---|
| 361 | } | 
|---|
| 362 | else | 
|---|
| 363 | { | 
|---|
| 364 | transaction.rollback(); | 
|---|
| 365 | throw Exception( "Unexpected ZooKeeper error while adding block "+ toString(block_number) + " with ID '"+ block_id + "': " | 
|---|
| 366 | + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); | 
|---|
| 367 | } | 
|---|
| 368 |  | 
|---|
| 369 | if (quorum) | 
|---|
| 370 | { | 
|---|
| 371 | /// We are waiting for quorum to be satisfied. | 
|---|
| 372 | LOG_TRACE(log, "Waiting for quorum"); | 
|---|
| 373 |  | 
|---|
| 374 | String quorum_status_path = storage.zookeeper_path + "/quorum/status"; | 
|---|
| 375 |  | 
|---|
| 376 | try | 
|---|
| 377 | { | 
|---|
| 378 | while (true) | 
|---|
| 379 | { | 
|---|
| 380 | zkutil::EventPtr event = std::make_shared<Poco::Event>(); | 
|---|
| 381 |  | 
|---|
| 382 | std::string value; | 
|---|
| 383 | /// `get` instead of `exists` so that `watch` does not leak if the node is no longer there. | 
|---|
| 384 | if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event)) | 
|---|
| 385 | break; | 
|---|
| 386 |  | 
|---|
| 387 | ReplicatedMergeTreeQuorumEntry quorum_entry(value); | 
|---|
| 388 |  | 
|---|
| 389 | /// If the node has time to disappear, and then appear again for the next insert. | 
|---|
| 390 | if (quorum_entry.part_name != part_name) | 
|---|
| 391 | break; | 
|---|
| 392 |  | 
|---|
| 393 | if (!event->tryWait(quorum_timeout_ms)) | 
|---|
| 394 | throw Exception( "Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED); | 
|---|
| 395 | } | 
|---|
| 396 |  | 
|---|
| 397 | /// And what if it is possible that the current replica at this time has ceased to be active and the quorum is marked as failed and deleted? | 
|---|
| 398 | String value; | 
|---|
| 399 | if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr) | 
|---|
| 400 | || value != quorum_info.is_active_node_value) | 
|---|
| 401 | throw Exception( "Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS); | 
|---|
| 402 | } | 
|---|
| 403 | catch (...) | 
|---|
| 404 | { | 
|---|
| 405 | /// We do not know whether or not data has been inserted | 
|---|
| 406 | /// - whether other replicas have time to download the part and mark the quorum as done. | 
|---|
| 407 | throw Exception( "Unknown status, client must retry. Reason: "+ getCurrentExceptionMessage(false), | 
|---|
| 408 | ErrorCodes::UNKNOWN_STATUS_OF_INSERT); | 
|---|
| 409 | } | 
|---|
| 410 |  | 
|---|
| 411 | LOG_TRACE(log, "Quorum satisfied"); | 
|---|
| 412 | } | 
|---|
| 413 | } | 
|---|
| 414 |  | 
|---|
| 415 | void ReplicatedMergeTreeBlockOutputStream::writePrefix() | 
|---|
| 416 | { | 
|---|
| 417 | storage.throwInsertIfNeeded(); | 
|---|
| 418 | } | 
|---|
| 419 |  | 
|---|
| 420 |  | 
|---|
| 421 | } | 
|---|
| 422 |  | 
|---|