1 | #include <Storages/StorageReplicatedMergeTree.h> |
2 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> |
3 | #include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h> |
4 | #include <Interpreters/PartLog.h> |
5 | #include <DataStreams/IBlockOutputStream.h> |
6 | #include <Common/SipHash.h> |
7 | #include <Common/ZooKeeper/KeeperException.h> |
8 | #include <IO/Operators.h> |
9 | |
10 | |
11 | namespace ProfileEvents |
12 | { |
13 | extern const Event DuplicatedInsertedBlocks; |
14 | } |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | namespace ErrorCodes |
20 | { |
21 | extern const int TOO_FEW_LIVE_REPLICAS; |
22 | extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE; |
23 | extern const int CHECKSUM_DOESNT_MATCH; |
24 | extern const int UNEXPECTED_ZOOKEEPER_ERROR; |
25 | extern const int NO_ZOOKEEPER; |
26 | extern const int READONLY; |
27 | extern const int UNKNOWN_STATUS_OF_INSERT; |
28 | extern const int INSERT_WAS_DEDUPLICATED; |
29 | extern const int KEEPER_EXCEPTION; |
30 | extern const int TIMEOUT_EXCEEDED; |
31 | extern const int NO_ACTIVE_REPLICAS; |
32 | } |
33 | |
34 | |
35 | ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( |
36 | StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool deduplicate_) |
37 | : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block_), deduplicate(deduplicate_), |
38 | log(&Logger::get(storage.getLogName() + " (Replicated OutputStream)" )) |
39 | { |
40 | /// The quorum value `1` has the same meaning as if it is disabled. |
41 | if (quorum == 1) |
42 | quorum = 0; |
43 | } |
44 | |
45 | |
46 | Block ReplicatedMergeTreeBlockOutputStream::() const |
47 | { |
48 | return storage.getSampleBlock(); |
49 | } |
50 | |
51 | |
52 | /// Allow to verify that the session in ZooKeeper is still alive. |
53 | static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper) |
54 | { |
55 | if (!zookeeper) |
56 | throw Exception("No ZooKeeper session." , ErrorCodes::NO_ZOOKEEPER); |
57 | |
58 | if (zookeeper->expired()) |
59 | throw Exception("ZooKeeper session has been expired." , ErrorCodes::NO_ZOOKEEPER); |
60 | } |
61 | |
62 | |
63 | void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper) |
64 | { |
65 | quorum_info.status_path = storage.zookeeper_path + "/quorum/status" ; |
66 | |
67 | std::future<Coordination::GetResponse> quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path); |
68 | std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active" ); |
69 | std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host" ); |
70 | |
71 | /// List of live replicas. All of them register an ephemeral node for leader_election. |
72 | |
73 | Coordination::Stat leader_election_stat; |
74 | zookeeper->get(storage.zookeeper_path + "/leader_election" , &leader_election_stat); |
75 | |
76 | if (leader_election_stat.numChildren < static_cast<int32_t>(quorum)) |
77 | throw Exception("Number of alive replicas (" |
78 | + toString(leader_election_stat.numChildren) + ") is less than requested quorum (" + toString(quorum) + ")." , |
79 | ErrorCodes::TOO_FEW_LIVE_REPLICAS); |
80 | |
81 | /** Is there a quorum for the last part for which a quorum is needed? |
82 | * Write of all the parts with the included quorum is linearly ordered. |
83 | * This means that at any time there can be only one part, |
84 | * for which you need, but not yet reach the quorum. |
85 | * Information about this part will be located in `/quorum/status` node. |
86 | * If the quorum is reached, then the node is deleted. |
87 | */ |
88 | |
89 | auto quorum_status = quorum_status_future.get(); |
90 | if (quorum_status.error != Coordination::ZNONODE) |
91 | throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); |
92 | |
93 | /// Both checks are implicitly made also later (otherwise there would be a race condition). |
94 | |
95 | auto is_active = is_active_future.get(); |
96 | auto host = host_future.get(); |
97 | |
98 | if (is_active.error == Coordination::ZNONODE || host.error == Coordination::ZNONODE) |
99 | throw Exception("Replica is not active right now" , ErrorCodes::READONLY); |
100 | |
101 | quorum_info.is_active_node_value = is_active.data; |
102 | quorum_info.is_active_node_version = is_active.stat.version; |
103 | quorum_info.host_node_version = host.stat.version; |
104 | } |
105 | |
106 | |
107 | void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) |
108 | { |
109 | last_block_is_duplicate = false; |
110 | |
111 | /// TODO Is it possible to not lock the table structure here? |
112 | storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event); |
113 | |
114 | auto zookeeper = storage.getZooKeeper(); |
115 | assertSessionIsNotExpired(zookeeper); |
116 | |
117 | /** If write is with quorum, then we check that the required number of replicas is now live, |
118 | * and also that for all previous parts for which quorum is required, this quorum is reached. |
119 | * And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node). |
120 | * TODO Too complex logic, you can do better. |
121 | */ |
122 | if (quorum) |
123 | checkQuorumPrecondition(zookeeper); |
124 | |
125 | auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block); |
126 | |
127 | for (auto & current_block : part_blocks) |
128 | { |
129 | Stopwatch watch; |
130 | |
131 | /// Write part to the filesystem under temporary name. Calculate a checksum. |
132 | |
133 | MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block); |
134 | |
135 | String block_id; |
136 | |
137 | if (deduplicate) |
138 | { |
139 | SipHash hash; |
140 | part->checksums.computeTotalChecksumDataOnly(hash); |
141 | union |
142 | { |
143 | char bytes[16]; |
144 | UInt64 words[2]; |
145 | } hash_value; |
146 | hash.get128(hash_value.bytes); |
147 | |
148 | /// We add the hash from the data and partition identifier to deduplication ID. |
149 | /// That is, do not insert the same data to the same partition twice. |
150 | block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); |
151 | |
152 | LOG_DEBUG(log, "Wrote block with ID '" << block_id << "', " << block.rows() << " rows" ); |
153 | } |
154 | else |
155 | { |
156 | LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows" ); |
157 | } |
158 | |
159 | try |
160 | { |
161 | commitPart(zookeeper, part, block_id); |
162 | |
163 | /// Set a special error code if the block is duplicate |
164 | int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; |
165 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus(error)); |
166 | } |
167 | catch (...) |
168 | { |
169 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); |
170 | throw; |
171 | } |
172 | } |
173 | } |
174 | |
175 | |
176 | void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part) |
177 | { |
178 | last_block_is_duplicate = false; |
179 | |
180 | /// NOTE: No delay in this case. That's Ok. |
181 | |
182 | auto zookeeper = storage.getZooKeeper(); |
183 | assertSessionIsNotExpired(zookeeper); |
184 | |
185 | if (quorum) |
186 | checkQuorumPrecondition(zookeeper); |
187 | |
188 | Stopwatch watch; |
189 | |
190 | try |
191 | { |
192 | commitPart(zookeeper, part, "" ); |
193 | PartLog::addNewPart(storage.global_context, part, watch.elapsed()); |
194 | } |
195 | catch (...) |
196 | { |
197 | PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); |
198 | throw; |
199 | } |
200 | } |
201 | |
202 | |
203 | void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id) |
204 | { |
205 | storage.check(part->columns); |
206 | assertSessionIsNotExpired(zookeeper); |
207 | |
208 | /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. |
209 | /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned. |
210 | /// Also, make deduplication check. If a duplicate is detected, no nodes are created. |
211 | |
212 | /// Allocate new block number and check for duplicates |
213 | bool deduplicate_block = !block_id.empty(); |
214 | String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "" ; |
215 | auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); |
216 | |
217 | if (!block_number_lock) |
218 | { |
219 | LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it." ); |
220 | part->is_duplicate = true; |
221 | last_block_is_duplicate = true; |
222 | ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); |
223 | return; |
224 | } |
225 | |
226 | Int64 block_number = block_number_lock->getNumber(); |
227 | |
228 | /// Set part attributes according to part_number. Prepare an entry for log. |
229 | |
230 | part->info.min_block = block_number; |
231 | part->info.max_block = block_number; |
232 | part->info.level = 0; |
233 | |
234 | String part_name = part->getNewName(part->info); |
235 | part->name = part_name; |
236 | |
237 | StorageReplicatedMergeTree::LogEntry log_entry; |
238 | log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; |
239 | log_entry.create_time = time(nullptr); |
240 | log_entry.source_replica = storage.replica_name; |
241 | log_entry.new_part_name = part_name; |
242 | log_entry.quorum = quorum; |
243 | log_entry.block_id = block_id; |
244 | |
245 | /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. |
246 | |
247 | /// Information about the part. |
248 | Coordination::Requests ops; |
249 | |
250 | storage.getCommitPartOps(ops, part, block_id_path); |
251 | |
252 | /// Replication log. |
253 | ops.emplace_back(zkutil::makeCreateRequest( |
254 | storage.zookeeper_path + "/log/log-" , |
255 | log_entry.toString(), |
256 | zkutil::CreateMode::PersistentSequential)); |
257 | |
258 | /// Deletes the information that the block number is used for writing. |
259 | block_number_lock->getUnlockOps(ops); |
260 | |
261 | /** If you need a quorum - create a node in which the quorum is monitored. |
262 | * (If such a node already exists, then someone has managed to make another quorum record at the same time, but for it the quorum has not yet been reached. |
263 | * You can not do the next quorum record at this time.) |
264 | */ |
265 | if (quorum) |
266 | { |
267 | ReplicatedMergeTreeQuorumEntry quorum_entry; |
268 | quorum_entry.part_name = part_name; |
269 | quorum_entry.required_number_of_replicas = quorum; |
270 | quorum_entry.replicas.insert(storage.replica_name); |
271 | |
272 | /** At this point, this node will contain information that the current replica received a part. |
273 | * When other replicas will receive this part (in the usual way, processing the replication log), |
274 | * they will add themselves to the contents of this node. |
275 | * When it contains information about `quorum` number of replicas, this node is deleted, |
276 | * which indicates that the quorum has been reached. |
277 | */ |
278 | |
279 | ops.emplace_back( |
280 | zkutil::makeCreateRequest( |
281 | quorum_info.status_path, |
282 | quorum_entry.toString(), |
283 | zkutil::CreateMode::Persistent)); |
284 | |
285 | /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). |
286 | ops.emplace_back( |
287 | zkutil::makeCheckRequest( |
288 | storage.replica_path + "/is_active" , |
289 | quorum_info.is_active_node_version)); |
290 | |
291 | /// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version. |
292 | /// But then the `host` value will change. We will check this. |
293 | /// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread). |
294 | ops.emplace_back( |
295 | zkutil::makeCheckRequest( |
296 | storage.replica_path + "/host" , |
297 | quorum_info.host_node_version)); |
298 | } |
299 | |
300 | MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. |
301 | storage.renameTempPartAndAdd(part, nullptr, &transaction); |
302 | |
303 | Coordination::Responses responses; |
304 | int32_t multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT |
305 | |
306 | if (multi_code == Coordination::ZOK) |
307 | { |
308 | transaction.commit(); |
309 | storage.merge_selecting_task->schedule(); |
310 | |
311 | /// Lock nodes have been already deleted, do not delete them in destructor |
312 | block_number_lock->assumeUnlocked(); |
313 | } |
314 | else if (multi_code == Coordination::ZCONNECTIONLOSS |
315 | || multi_code == Coordination::ZOPERATIONTIMEOUT) |
316 | { |
317 | /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part |
318 | * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. |
319 | */ |
320 | transaction.commit(); |
321 | storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); |
322 | |
323 | /// We do not know whether or not data has been inserted. |
324 | throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)), |
325 | ErrorCodes::UNKNOWN_STATUS_OF_INSERT); |
326 | } |
327 | else if (Coordination::isUserError(multi_code)) |
328 | { |
329 | String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp(); |
330 | |
331 | if (multi_code == Coordination::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path) |
332 | { |
333 | /// Block with the same id have just appeared in table (or other replica), rollback thee insertion. |
334 | LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")" ); |
335 | |
336 | part->is_duplicate = true; |
337 | transaction.rollback(); |
338 | last_block_is_duplicate = true; |
339 | ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); |
340 | } |
341 | else if (multi_code == Coordination::ZNODEEXISTS && failed_op_path == quorum_info.status_path) |
342 | { |
343 | transaction.rollback(); |
344 | |
345 | throw Exception("Another quorum insert has been already started" , ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); |
346 | } |
347 | else |
348 | { |
349 | /// NOTE: We could be here if the node with the quorum existed, but was quickly removed. |
350 | transaction.rollback(); |
351 | throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': " |
352 | + zkutil::ZooKeeper::error2string(multi_code) + ", path " + failed_op_path, |
353 | ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); |
354 | } |
355 | } |
356 | else if (Coordination::isHardwareError(multi_code)) |
357 | { |
358 | transaction.rollback(); |
359 | throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': " |
360 | + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); |
361 | } |
362 | else |
363 | { |
364 | transaction.rollback(); |
365 | throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': " |
366 | + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); |
367 | } |
368 | |
369 | if (quorum) |
370 | { |
371 | /// We are waiting for quorum to be satisfied. |
372 | LOG_TRACE(log, "Waiting for quorum" ); |
373 | |
374 | String quorum_status_path = storage.zookeeper_path + "/quorum/status" ; |
375 | |
376 | try |
377 | { |
378 | while (true) |
379 | { |
380 | zkutil::EventPtr event = std::make_shared<Poco::Event>(); |
381 | |
382 | std::string value; |
383 | /// `get` instead of `exists` so that `watch` does not leak if the node is no longer there. |
384 | if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event)) |
385 | break; |
386 | |
387 | ReplicatedMergeTreeQuorumEntry quorum_entry(value); |
388 | |
389 | /// If the node has time to disappear, and then appear again for the next insert. |
390 | if (quorum_entry.part_name != part_name) |
391 | break; |
392 | |
393 | if (!event->tryWait(quorum_timeout_ms)) |
394 | throw Exception("Timeout while waiting for quorum" , ErrorCodes::TIMEOUT_EXCEEDED); |
395 | } |
396 | |
397 | /// And what if it is possible that the current replica at this time has ceased to be active and the quorum is marked as failed and deleted? |
398 | String value; |
399 | if (!zookeeper->tryGet(storage.replica_path + "/is_active" , value, nullptr) |
400 | || value != quorum_info.is_active_node_value) |
401 | throw Exception("Replica become inactive while waiting for quorum" , ErrorCodes::NO_ACTIVE_REPLICAS); |
402 | } |
403 | catch (...) |
404 | { |
405 | /// We do not know whether or not data has been inserted |
406 | /// - whether other replicas have time to download the part and mark the quorum as done. |
407 | throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false), |
408 | ErrorCodes::UNKNOWN_STATUS_OF_INSERT); |
409 | } |
410 | |
411 | LOG_TRACE(log, "Quorum satisfied" ); |
412 | } |
413 | } |
414 | |
415 | void ReplicatedMergeTreeBlockOutputStream::writePrefix() |
416 | { |
417 | storage.throwInsertIfNeeded(); |
418 | } |
419 | |
420 | |
421 | } |
422 | |