1#include <Storages/StorageReplicatedMergeTree.h>
2#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
3#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
4#include <Interpreters/PartLog.h>
5#include <DataStreams/IBlockOutputStream.h>
6#include <Common/SipHash.h>
7#include <Common/ZooKeeper/KeeperException.h>
8#include <IO/Operators.h>
9
10
11namespace ProfileEvents
12{
13 extern const Event DuplicatedInsertedBlocks;
14}
15
16namespace DB
17{
18
19namespace ErrorCodes
20{
21 extern const int TOO_FEW_LIVE_REPLICAS;
22 extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE;
23 extern const int CHECKSUM_DOESNT_MATCH;
24 extern const int UNEXPECTED_ZOOKEEPER_ERROR;
25 extern const int NO_ZOOKEEPER;
26 extern const int READONLY;
27 extern const int UNKNOWN_STATUS_OF_INSERT;
28 extern const int INSERT_WAS_DEDUPLICATED;
29 extern const int KEEPER_EXCEPTION;
30 extern const int TIMEOUT_EXCEEDED;
31 extern const int NO_ACTIVE_REPLICAS;
32}
33
34
35ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
36 StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool deduplicate_)
37 : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block_), deduplicate(deduplicate_),
38 log(&Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
39{
40 /// The quorum value `1` has the same meaning as if it is disabled.
41 if (quorum == 1)
42 quorum = 0;
43}
44
45
46Block ReplicatedMergeTreeBlockOutputStream::getHeader() const
47{
48 return storage.getSampleBlock();
49}
50
51
52/// Allow to verify that the session in ZooKeeper is still alive.
53static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
54{
55 if (!zookeeper)
56 throw Exception("No ZooKeeper session.", ErrorCodes::NO_ZOOKEEPER);
57
58 if (zookeeper->expired())
59 throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
60}
61
62
63void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
64{
65 quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
66
67 std::future<Coordination::GetResponse> quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path);
68 std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
69 std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
70
71 /// List of live replicas. All of them register an ephemeral node for leader_election.
72
73 Coordination::Stat leader_election_stat;
74 zookeeper->get(storage.zookeeper_path + "/leader_election", &leader_election_stat);
75
76 if (leader_election_stat.numChildren < static_cast<int32_t>(quorum))
77 throw Exception("Number of alive replicas ("
78 + toString(leader_election_stat.numChildren) + ") is less than requested quorum (" + toString(quorum) + ").",
79 ErrorCodes::TOO_FEW_LIVE_REPLICAS);
80
81 /** Is there a quorum for the last part for which a quorum is needed?
82 * Write of all the parts with the included quorum is linearly ordered.
83 * This means that at any time there can be only one part,
84 * for which you need, but not yet reach the quorum.
85 * Information about this part will be located in `/quorum/status` node.
86 * If the quorum is reached, then the node is deleted.
87 */
88
89 auto quorum_status = quorum_status_future.get();
90 if (quorum_status.error != Coordination::ZNONODE)
91 throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
92
93 /// Both checks are implicitly made also later (otherwise there would be a race condition).
94
95 auto is_active = is_active_future.get();
96 auto host = host_future.get();
97
98 if (is_active.error == Coordination::ZNONODE || host.error == Coordination::ZNONODE)
99 throw Exception("Replica is not active right now", ErrorCodes::READONLY);
100
101 quorum_info.is_active_node_value = is_active.data;
102 quorum_info.is_active_node_version = is_active.stat.version;
103 quorum_info.host_node_version = host.stat.version;
104}
105
106
107void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
108{
109 last_block_is_duplicate = false;
110
111 /// TODO Is it possible to not lock the table structure here?
112 storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
113
114 auto zookeeper = storage.getZooKeeper();
115 assertSessionIsNotExpired(zookeeper);
116
117 /** If write is with quorum, then we check that the required number of replicas is now live,
118 * and also that for all previous parts for which quorum is required, this quorum is reached.
119 * And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node).
120 * TODO Too complex logic, you can do better.
121 */
122 if (quorum)
123 checkQuorumPrecondition(zookeeper);
124
125 auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
126
127 for (auto & current_block : part_blocks)
128 {
129 Stopwatch watch;
130
131 /// Write part to the filesystem under temporary name. Calculate a checksum.
132
133 MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
134
135 String block_id;
136
137 if (deduplicate)
138 {
139 SipHash hash;
140 part->checksums.computeTotalChecksumDataOnly(hash);
141 union
142 {
143 char bytes[16];
144 UInt64 words[2];
145 } hash_value;
146 hash.get128(hash_value.bytes);
147
148 /// We add the hash from the data and partition identifier to deduplication ID.
149 /// That is, do not insert the same data to the same partition twice.
150 block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
151
152 LOG_DEBUG(log, "Wrote block with ID '" << block_id << "', " << block.rows() << " rows");
153 }
154 else
155 {
156 LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows");
157 }
158
159 try
160 {
161 commitPart(zookeeper, part, block_id);
162
163 /// Set a special error code if the block is duplicate
164 int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
165 PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus(error));
166 }
167 catch (...)
168 {
169 PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
170 throw;
171 }
172 }
173}
174
175
176void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
177{
178 last_block_is_duplicate = false;
179
180 /// NOTE: No delay in this case. That's Ok.
181
182 auto zookeeper = storage.getZooKeeper();
183 assertSessionIsNotExpired(zookeeper);
184
185 if (quorum)
186 checkQuorumPrecondition(zookeeper);
187
188 Stopwatch watch;
189
190 try
191 {
192 commitPart(zookeeper, part, "");
193 PartLog::addNewPart(storage.global_context, part, watch.elapsed());
194 }
195 catch (...)
196 {
197 PartLog::addNewPart(storage.global_context, part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
198 throw;
199 }
200}
201
202
203void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
204{
205 storage.check(part->columns);
206 assertSessionIsNotExpired(zookeeper);
207
208 /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
209 /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
210 /// Also, make deduplication check. If a duplicate is detected, no nodes are created.
211
212 /// Allocate new block number and check for duplicates
213 bool deduplicate_block = !block_id.empty();
214 String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
215 auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
216
217 if (!block_number_lock)
218 {
219 LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it.");
220 part->is_duplicate = true;
221 last_block_is_duplicate = true;
222 ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
223 return;
224 }
225
226 Int64 block_number = block_number_lock->getNumber();
227
228 /// Set part attributes according to part_number. Prepare an entry for log.
229
230 part->info.min_block = block_number;
231 part->info.max_block = block_number;
232 part->info.level = 0;
233
234 String part_name = part->getNewName(part->info);
235 part->name = part_name;
236
237 StorageReplicatedMergeTree::LogEntry log_entry;
238 log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
239 log_entry.create_time = time(nullptr);
240 log_entry.source_replica = storage.replica_name;
241 log_entry.new_part_name = part_name;
242 log_entry.quorum = quorum;
243 log_entry.block_id = block_id;
244
245 /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
246
247 /// Information about the part.
248 Coordination::Requests ops;
249
250 storage.getCommitPartOps(ops, part, block_id_path);
251
252 /// Replication log.
253 ops.emplace_back(zkutil::makeCreateRequest(
254 storage.zookeeper_path + "/log/log-",
255 log_entry.toString(),
256 zkutil::CreateMode::PersistentSequential));
257
258 /// Deletes the information that the block number is used for writing.
259 block_number_lock->getUnlockOps(ops);
260
261 /** If you need a quorum - create a node in which the quorum is monitored.
262 * (If such a node already exists, then someone has managed to make another quorum record at the same time, but for it the quorum has not yet been reached.
263 * You can not do the next quorum record at this time.)
264 */
265 if (quorum)
266 {
267 ReplicatedMergeTreeQuorumEntry quorum_entry;
268 quorum_entry.part_name = part_name;
269 quorum_entry.required_number_of_replicas = quorum;
270 quorum_entry.replicas.insert(storage.replica_name);
271
272 /** At this point, this node will contain information that the current replica received a part.
273 * When other replicas will receive this part (in the usual way, processing the replication log),
274 * they will add themselves to the contents of this node.
275 * When it contains information about `quorum` number of replicas, this node is deleted,
276 * which indicates that the quorum has been reached.
277 */
278
279 ops.emplace_back(
280 zkutil::makeCreateRequest(
281 quorum_info.status_path,
282 quorum_entry.toString(),
283 zkutil::CreateMode::Persistent));
284
285 /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
286 ops.emplace_back(
287 zkutil::makeCheckRequest(
288 storage.replica_path + "/is_active",
289 quorum_info.is_active_node_version));
290
291 /// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version.
292 /// But then the `host` value will change. We will check this.
293 /// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread).
294 ops.emplace_back(
295 zkutil::makeCheckRequest(
296 storage.replica_path + "/host",
297 quorum_info.host_node_version));
298 }
299
300 MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
301 storage.renameTempPartAndAdd(part, nullptr, &transaction);
302
303 Coordination::Responses responses;
304 int32_t multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
305
306 if (multi_code == Coordination::ZOK)
307 {
308 transaction.commit();
309 storage.merge_selecting_task->schedule();
310
311 /// Lock nodes have been already deleted, do not delete them in destructor
312 block_number_lock->assumeUnlocked();
313 }
314 else if (multi_code == Coordination::ZCONNECTIONLOSS
315 || multi_code == Coordination::ZOPERATIONTIMEOUT)
316 {
317 /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
318 * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
319 */
320 transaction.commit();
321 storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
322
323 /// We do not know whether or not data has been inserted.
324 throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)),
325 ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
326 }
327 else if (Coordination::isUserError(multi_code))
328 {
329 String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
330
331 if (multi_code == Coordination::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
332 {
333 /// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
334 LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
335
336 part->is_duplicate = true;
337 transaction.rollback();
338 last_block_is_duplicate = true;
339 ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
340 }
341 else if (multi_code == Coordination::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
342 {
343 transaction.rollback();
344
345 throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
346 }
347 else
348 {
349 /// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
350 transaction.rollback();
351 throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
352 + zkutil::ZooKeeper::error2string(multi_code) + ", path " + failed_op_path,
353 ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
354 }
355 }
356 else if (Coordination::isHardwareError(multi_code))
357 {
358 transaction.rollback();
359 throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
360 + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
361 }
362 else
363 {
364 transaction.rollback();
365 throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
366 + zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
367 }
368
369 if (quorum)
370 {
371 /// We are waiting for quorum to be satisfied.
372 LOG_TRACE(log, "Waiting for quorum");
373
374 String quorum_status_path = storage.zookeeper_path + "/quorum/status";
375
376 try
377 {
378 while (true)
379 {
380 zkutil::EventPtr event = std::make_shared<Poco::Event>();
381
382 std::string value;
383 /// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
384 if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event))
385 break;
386
387 ReplicatedMergeTreeQuorumEntry quorum_entry(value);
388
389 /// If the node has time to disappear, and then appear again for the next insert.
390 if (quorum_entry.part_name != part_name)
391 break;
392
393 if (!event->tryWait(quorum_timeout_ms))
394 throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
395 }
396
397 /// And what if it is possible that the current replica at this time has ceased to be active and the quorum is marked as failed and deleted?
398 String value;
399 if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
400 || value != quorum_info.is_active_node_value)
401 throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
402 }
403 catch (...)
404 {
405 /// We do not know whether or not data has been inserted
406 /// - whether other replicas have time to download the part and mark the quorum as done.
407 throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
408 ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
409 }
410
411 LOG_TRACE(log, "Quorum satisfied");
412 }
413}
414
415void ReplicatedMergeTreeBlockOutputStream::writePrefix()
416{
417 storage.throwInsertIfNeeded();
418}
419
420
421}
422