1#include <Disks/DiskSpaceMonitor.h>
2#include <Common/FieldVisitors.h>
3#include <Common/Macros.h>
4#include <Common/StringUtils/StringUtils.h>
5#include <Common/ThreadPool.h>
6#include <Common/ZooKeeper/KeeperException.h>
7#include <Common/ZooKeeper/Types.h>
8#include <Common/escapeForFileName.h>
9#include <Common/formatReadable.h>
10#include <Common/thread_local_rng.h>
11#include <Common/typeid_cast.h>
12
13#include <Storages/AlterCommands.h>
14#include <Storages/PartitionCommands.h>
15#include <Storages/ColumnsDescription.h>
16#include <Storages/StorageReplicatedMergeTree.h>
17#include <Storages/MergeTree/MergeTreeDataPart.h>
18#include <Storages/MergeTree/MergeList.h>
19#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
20#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
21#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
22#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
23#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
24#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
25#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
26#include <Storages/VirtualColumnUtils.h>
27
28#include <Databases/IDatabase.h>
29
30#include <Parsers/formatAST.h>
31#include <Parsers/ASTDropQuery.h>
32#include <Parsers/ASTOptimizeQuery.h>
33#include <Parsers/ASTLiteral.h>
34#include <Parsers/queryToString.h>
35#include <Parsers/ASTCheckQuery.h>
36#include <Parsers/ASTSetQuery.h>
37
38#include <IO/ReadBufferFromString.h>
39#include <IO/Operators.h>
40#include <IO/ConnectionTimeouts.h>
41
42#include <Interpreters/InterpreterAlterQuery.h>
43#include <Interpreters/PartLog.h>
44
45#include <DataStreams/RemoteBlockInputStream.h>
46#include <DataStreams/NullBlockOutputStream.h>
47#include <DataStreams/copyData.h>
48
49#include <Poco/DirectoryIterator.h>
50
51#include <ext/range.h>
52#include <ext/scope_guard.h>
53
54#include <ctime>
55#include <thread>
56#include <future>
57
58#include <boost/algorithm/string/join.hpp>
59
60namespace ProfileEvents
61{
62 extern const Event ReplicatedPartMerges;
63 extern const Event ReplicatedPartMutations;
64 extern const Event ReplicatedPartFailedFetches;
65 extern const Event ReplicatedPartFetchesOfMerged;
66 extern const Event ObsoleteReplicatedParts;
67 extern const Event ReplicatedPartFetches;
68 extern const Event DataAfterMergeDiffersFromReplica;
69 extern const Event DataAfterMutationDiffersFromReplica;
70}
71
72namespace CurrentMetrics
73{
74 extern const Metric LeaderReplica;
75}
76
77
78namespace DB
79{
80
81namespace ErrorCodes
82{
83 extern const int NO_ZOOKEEPER;
84 extern const int INCORRECT_DATA;
85 extern const int INCOMPATIBLE_COLUMNS;
86 extern const int REPLICA_IS_ALREADY_EXIST;
87 extern const int NO_SUCH_REPLICA;
88 extern const int NO_REPLICA_HAS_PART;
89 extern const int LOGICAL_ERROR;
90 extern const int TOO_MANY_UNEXPECTED_DATA_PARTS;
91 extern const int ABORTED;
92 extern const int REPLICA_IS_NOT_IN_QUORUM;
93 extern const int TABLE_IS_READ_ONLY;
94 extern const int NOT_FOUND_NODE;
95 extern const int NO_ACTIVE_REPLICAS;
96 extern const int LEADERSHIP_CHANGED;
97 extern const int TABLE_WAS_NOT_DROPPED;
98 extern const int PARTITION_ALREADY_EXISTS;
99 extern const int TOO_MANY_RETRIES_TO_FETCH_PARTS;
100 extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
101 extern const int PARTITION_DOESNT_EXIST;
102 extern const int CHECKSUM_DOESNT_MATCH;
103 extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
104 extern const int UNEXPECTED_FILE_IN_DATA_PART;
105 extern const int NO_FILE_IN_DATA_PART;
106 extern const int UNFINISHED;
107 extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
108 extern const int TOO_MANY_FETCHES;
109 extern const int BAD_DATA_PART_NAME;
110 extern const int PART_IS_TEMPORARILY_LOCKED;
111 extern const int INCORRECT_FILE_NAME;
112 extern const int CANNOT_ASSIGN_OPTIMIZE;
113 extern const int KEEPER_EXCEPTION;
114 extern const int ALL_REPLICAS_LOST;
115 extern const int REPLICA_STATUS_CHANGED;
116}
117
118namespace ActionLocks
119{
120 extern const StorageActionBlockType PartsMerge;
121 extern const StorageActionBlockType PartsFetch;
122 extern const StorageActionBlockType PartsSend;
123 extern const StorageActionBlockType ReplicationQueue;
124 extern const StorageActionBlockType PartsTTLMerge;
125 extern const StorageActionBlockType PartsMove;
126}
127
128
129static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
130static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
131static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
132
133/** There are three places for each part, where it should be
134 * 1. In the RAM, data_parts, all_data_parts.
135 * 2. In the filesystem (FS), the directory with the data of the table.
136 * 3. in ZooKeeper (ZK).
137 *
138 * When adding a part, it must be added immediately to these three places.
139 * This is done like this
140 * - [FS] first write the part into a temporary directory on the filesystem;
141 * - [FS] rename the temporary part to the result on the filesystem;
142 * - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
143 * - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
144 * rolls back the changes in `data_parts` (from the previous point) back;
145 * - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
146 * - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
147 * is delayed, after a few minutes.
148 *
149 * There is no atomicity here.
150 * It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
151 * But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
152 *
153 * Instead, we are forced to work in a situation where at any time
154 * (from another thread, or after server restart), there may be an unfinished transaction.
155 * (note - for this the part should be in RAM)
156 * From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
157 * This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
158 *
159 * Do this with the threshold for the time.
160 * If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
161 * - as if the transaction has not yet been executed, but will soon be executed.
162 * And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
163 *
164 * PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
165 * But here it's too easy to get confused with the consistency of this flag.
166 */
167extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER = 5 * 60;
168
169
170void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
171{
172 std::lock_guard lock(current_zookeeper_mutex);
173 current_zookeeper = zookeeper;
174}
175
176zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
177{
178 std::lock_guard lock(current_zookeeper_mutex);
179 return current_zookeeper;
180}
181
182zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
183{
184 auto res = tryGetZooKeeper();
185 if (!res)
186 throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
187 return res;
188}
189
190
191StorageReplicatedMergeTree::StorageReplicatedMergeTree(
192 const String & zookeeper_path_,
193 const String & replica_name_,
194 bool attach,
195 const String & database_name_,
196 const String & table_name_,
197 const String & relative_data_path_,
198 const StorageInMemoryMetadata & metadata,
199 Context & context_,
200 const String & date_column_name,
201 const MergingParams & merging_params_,
202 std::unique_ptr<MergeTreeSettings> settings_,
203 bool has_force_restore_data_flag)
204 : MergeTreeData(database_name_, table_name_, relative_data_path_, metadata,
205 context_, date_column_name, merging_params_, std::move(settings_), true, attach,
206 [this] (const std::string & name) { enqueuePartForCheck(name); }),
207
208 zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)),
209 replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)),
210 reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()),
211 queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this),
212 part_check_thread(*this), restarting_thread(*this)
213{
214 if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
215 zookeeper_path.resize(zookeeper_path.size() - 1);
216 /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
217 if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
218 zookeeper_path = "/" + zookeeper_path;
219 replica_path = zookeeper_path + "/replicas/" + replica_name;
220
221 queue_updating_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
222
223 mutations_updating_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mutationsUpdatingTask)", [this]{ mutationsUpdatingTask(); });
224
225 merge_selecting_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
226 /// Will be activated if we win leader election.
227 merge_selecting_task->deactivate();
228
229 mutations_finalizing_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
230
231 if (global_context.hasZooKeeper())
232 current_zookeeper = global_context.getZooKeeper();
233
234 bool skip_sanity_checks = false;
235
236 if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
237 {
238 skip_sanity_checks = true;
239 current_zookeeper->remove(replica_path + "/flags/force_restore_data");
240
241 LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
242 << replica_path << "/flags/force_restore_data).");
243 }
244 else if (has_force_restore_data_flag)
245 {
246 skip_sanity_checks = true;
247
248 LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
249 }
250
251 loadDataParts(skip_sanity_checks);
252
253 if (!current_zookeeper)
254 {
255 if (!attach)
256 throw Exception("Can't create replicated table without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
257
258 /// Do not activate the replica. It will be readonly.
259 LOG_ERROR(log, "No ZooKeeper: table will be in readonly mode.");
260 is_readonly = true;
261 return;
262 }
263
264 if (attach && !current_zookeeper->exists(zookeeper_path + "/metadata"))
265 {
266 LOG_WARNING(log, "No metadata in ZooKeeper: table will be in readonly mode.");
267 is_readonly = true;
268 return;
269 }
270
271 if (!attach)
272 {
273 if (!getDataParts().empty())
274 throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
275
276 createTableIfNotExists();
277
278 checkTableStructure(false, false);
279 createReplica();
280 }
281 else
282 {
283 checkTableStructure(skip_sanity_checks, true);
284 checkParts(skip_sanity_checks);
285
286 /// Temporary directories contain unfinalized results of Merges or Fetches (after forced restart)
287 /// and don't allow to reinitialize them, so delete each of them immediately
288 clearOldTemporaryDirectories(0);
289 }
290
291 createNewZooKeeperNodes();
292
293 other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
294}
295
296
297bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper()
298{
299 auto zookeeper = getZooKeeper();
300 String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
301 auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
302 return metadata_from_zk.index_granularity_bytes == 0;
303}
304
305
306void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
307 const Strings & replicas, const String & mutation_id) const
308{
309 if (replicas.empty())
310 return;
311
312 zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
313
314
315 std::set<String> inactive_replicas;
316 for (const String & replica : replicas)
317 {
318
319 LOG_DEBUG(log, "Waiting for " << replica << " to apply mutation " + mutation_id);
320
321 while (!partial_shutdown_called)
322 {
323 /// Mutation maybe killed or whole replica was deleted.
324 /// Wait event will unblock at this moment.
325 Coordination::Stat exists_stat;
326 if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event))
327 {
328 LOG_WARNING(log, "Mutation " << mutation_id << " was killed or manually removed. Nothing to wait.");
329 return;
330 }
331
332 auto zookeeper = getZooKeeper();
333 /// Replica could be inactive.
334 if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
335 {
336 LOG_WARNING(log, "Replica " << replica << " is not active during mutation. "
337 "Mutation will be done asynchronously when replica becomes active.");
338
339 inactive_replicas.emplace(replica);
340 break;
341 }
342
343 String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer";
344 std::string mutation_pointer_value;
345 Coordination::Stat get_stat;
346 /// Replica could be removed
347 if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, &get_stat, wait_event))
348 {
349 LOG_WARNING(log, replica << " was removed");
350 break;
351 }
352 else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
353 break; /// (numbers like 0000000000 and 0000000001)
354
355 /// We wait without timeout.
356 wait_event->wait();
357 }
358
359 if (partial_shutdown_called)
360 throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",
361 ErrorCodes::UNFINISHED);
362 }
363
364 if (!inactive_replicas.empty())
365 {
366 std::stringstream exception_message;
367 exception_message << "Mutation is not finished because";
368
369 if (!inactive_replicas.empty())
370 exception_message << " some replicas are inactive right now: " << boost::algorithm::join(inactive_replicas, ", ");
371
372 exception_message << ". Mutation will be done asynchronously";
373
374 throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
375 }
376}
377
378void StorageReplicatedMergeTree::createNewZooKeeperNodes()
379{
380 auto zookeeper = getZooKeeper();
381
382 /// Working with quorum.
383 zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
384 zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
385 zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
386
387 /// Tracking lag of replicas.
388 zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String());
389 zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", String());
390
391 /// Mutations
392 zookeeper->createIfNotExists(zookeeper_path + "/mutations", String());
393 zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String());
394
395 /// ALTERs of the metadata node.
396 zookeeper->createIfNotExists(replica_path + "/metadata", String());
397}
398
399
400void StorageReplicatedMergeTree::createTableIfNotExists()
401{
402 auto zookeeper = getZooKeeper();
403
404 if (zookeeper->exists(zookeeper_path))
405 return;
406
407 LOG_DEBUG(log, "Creating table " << zookeeper_path);
408
409 zookeeper->createAncestors(zookeeper_path);
410
411 /// We write metadata of table so that the replicas can check table parameters with them.
412 String metadata = ReplicatedMergeTreeTableMetadata(*this).toString();
413
414 Coordination::Requests ops;
415 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "",
416 zkutil::CreateMode::Persistent));
417 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", metadata,
418 zkutil::CreateMode::Persistent));
419 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/columns", getColumns().toString(),
420 zkutil::CreateMode::Persistent));
421 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "",
422 zkutil::CreateMode::Persistent));
423 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/blocks", "",
424 zkutil::CreateMode::Persistent));
425 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/block_numbers", "",
426 zkutil::CreateMode::Persistent));
427 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/nonincrement_block_numbers", "",
428 zkutil::CreateMode::Persistent)); /// /nonincrement_block_numbers dir is unused, but is created nonetheless for backwards compatibility.
429 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/leader_election", "",
430 zkutil::CreateMode::Persistent));
431 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/temp", "",
432 zkutil::CreateMode::Persistent));
433 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "",
434 zkutil::CreateMode::Persistent));
435
436 Coordination::Responses responses;
437 auto code = zookeeper->tryMulti(ops, responses);
438 if (code && code != Coordination::ZNODEEXISTS)
439 throw Coordination::Exception(code);
440}
441
442
443/** Verify that list of columns and table storage_settings_ptr match those specified in ZK (/ metadata).
444 * If not, throw an exception.
445 */
446void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter)
447{
448 auto zookeeper = getZooKeeper();
449
450 ReplicatedMergeTreeTableMetadata old_metadata(*this);
451
452 Coordination::Stat metadata_stat;
453 String metadata_str = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
454 auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
455 auto metadata_diff = old_metadata.checkAndFindDiff(metadata_from_zk, allow_alter);
456 metadata_version = metadata_stat.version;
457
458 Coordination::Stat columns_stat;
459 auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_path + "/columns", &columns_stat));
460 columns_version = columns_stat.version;
461
462 const ColumnsDescription & old_columns = getColumns();
463 if (columns_from_zk != old_columns || !metadata_diff.empty())
464 {
465 if (allow_alter &&
466 (skip_sanity_checks ||
467 old_columns.getOrdinary().sizeOfDifference(columns_from_zk.getOrdinary()) +
468 old_columns.getMaterialized().sizeOfDifference(columns_from_zk.getMaterialized()) <= 2))
469 {
470 LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
471
472 /// We delay setting table structure till startup() because otherwise new table metadata file can
473 /// be overwritten in DatabaseOrdinary::createTable.
474 set_table_structure_at_startup = [columns_from_zk, metadata_diff, this]()
475 {
476 /// Without any locks, because table has not been created yet.
477 setTableStructure(std::move(columns_from_zk), metadata_diff);
478 };
479 }
480 else
481 {
482 throw Exception("Table structure in ZooKeeper is too different from local table structure",
483 ErrorCodes::INCOMPATIBLE_COLUMNS);
484 }
485 }
486}
487
488
489void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff)
490{
491 StorageInMemoryMetadata metadata = getInMemoryMetadata();
492 if (new_columns != metadata.columns)
493 metadata.columns = new_columns;
494
495 if (!metadata_diff.empty())
496 {
497 if (metadata_diff.sorting_key_changed)
498 {
499 ParserNotEmptyExpressionList parser(false);
500 auto new_sorting_key_expr_list = parseQuery(parser, metadata_diff.new_sorting_key, 0);
501
502 if (new_sorting_key_expr_list->children.size() == 1)
503 metadata.order_by_ast = new_sorting_key_expr_list->children[0];
504 else
505 {
506 auto tuple = makeASTFunction("tuple");
507 tuple->arguments->children = new_sorting_key_expr_list->children;
508 metadata.order_by_ast = tuple;
509 }
510
511 if (!primary_key_ast)
512 {
513 /// Primary and sorting key become independent after this ALTER so we have to
514 /// save the old ORDER BY expression as the new primary key.
515 metadata.primary_key_ast = order_by_ast->clone();
516 }
517 }
518
519 if (metadata_diff.skip_indices_changed)
520 metadata.indices = IndicesDescription::parse(metadata_diff.new_skip_indices);
521
522 if (metadata_diff.constraints_changed)
523 metadata.constraints = ConstraintsDescription::parse(metadata_diff.new_constraints);
524
525 if (metadata_diff.ttl_table_changed)
526 {
527 ParserExpression parser;
528 metadata.ttl_for_table_ast = parseQuery(parser, metadata_diff.new_ttl_table, 0);
529 }
530 }
531
532 global_context.getDatabase(database_name)->alterTable(global_context, table_name, metadata);
533
534 /// Even if the primary/sorting keys didn't change we must reinitialize it
535 /// because primary key column types might have changed.
536 setProperties(metadata);
537 setTTLExpressions(new_columns.getColumnTTLs(), metadata.ttl_for_table_ast);
538}
539
540
541/** If necessary, restore a part, replica itself adds a record for its receipt.
542 * What time should I put for this entry in the queue? Time is taken into account when calculating lag of replica.
543 * For these purposes, it makes sense to use creation time of missing part
544 * (that is, in calculating lag, it will be taken into account how old is the part we need to recover).
545 */
546static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const String & replica_path, const String & part_name)
547{
548 time_t res = 0;
549
550 /// We get creation time of part, if it still exists (was not merged, for example).
551 Coordination::Stat stat;
552 String unused;
553 if (zookeeper->tryGet(replica_path + "/parts/" + part_name, unused, &stat))
554 res = stat.ctime / 1000;
555
556 return res;
557}
558
559
560void StorageReplicatedMergeTree::createReplica()
561{
562 auto zookeeper = getZooKeeper();
563
564 LOG_DEBUG(log, "Creating replica " << replica_path);
565
566 int32_t code;
567
568 do
569 {
570 Coordination::Stat replicas_stat;
571 String last_added_replica = zookeeper->get(zookeeper_path + "/replicas", &replicas_stat);
572
573 /// If it is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
574 String is_lost_value = last_added_replica.empty() ? "0" : "1";
575
576 Coordination::Requests ops;
577 Coordination::Responses resps;
578 ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
579 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
580 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
581 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
582 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
583 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
584 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent));
585 ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent));
586 /// Check version of /replicas to see if there are any replicas created at the same moment of time.
587 ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
588
589 code = zookeeper->tryMulti(ops, resps);
590 if (code == Coordination::Error::ZNODEEXISTS)
591 throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
592 else if (code == Coordination::Error::ZBADVERSION)
593 LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
594 else
595 zkutil::KeeperMultiException::check(code, ops, resps);
596 } while (code == Coordination::Error::ZBADVERSION);
597}
598
599
600void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
601{
602 auto zookeeper = getZooKeeper();
603
604 Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
605
606 /// Parts in ZK.
607 NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
608
609 /// There are no PreCommitted parts at startup.
610 auto parts = getDataParts({MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
611
612 /** Local parts that are not in ZK.
613 * In very rare cases they may cover missing parts
614 * and someone may think that pushing them to zookeeper is good idea.
615 * But actually we can't precisely determine that ALL missing parts
616 * covered by this unexpected part. So missing parts will be downloaded.
617 */
618 DataParts unexpected_parts;
619
620 /// Collect unexpected parts
621 for (const auto & part : parts)
622 if (!expected_parts.count(part->name))
623 unexpected_parts.insert(part); /// this parts we will place to detached with ignored_ prefix
624
625 /// Which parts should be taken from other replicas.
626 Strings parts_to_fetch;
627
628 for (const String & missing_name : expected_parts)
629 if (!getActiveContainingPart(missing_name))
630 parts_to_fetch.push_back(missing_name);
631
632 /** To check the adequacy, for the parts that are in the FS, but not in ZK, we will only consider not the most recent parts.
633 * Because unexpected new parts usually arise only because they did not have time to enroll in ZK with a rough restart of the server.
634 * It also occurs from deduplicated parts that did not have time to retire.
635 */
636 size_t unexpected_parts_nonnew = 0;
637 UInt64 unexpected_parts_nonnew_rows = 0;
638 UInt64 unexpected_parts_rows = 0;
639 for (const auto & part : unexpected_parts)
640 {
641 if (part->info.level > 0)
642 {
643 ++unexpected_parts_nonnew;
644 unexpected_parts_nonnew_rows += part->rows_count;
645 }
646
647 unexpected_parts_rows += part->rows_count;
648 }
649
650 /// Additional helpful statistics
651 auto get_blocks_count_in_data_part = [&] (const String & part_name) -> UInt64
652 {
653 MergeTreePartInfo part_info;
654 if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
655 return part_info.getBlocksCount();
656
657 LOG_ERROR(log, "Unexpected part name: " << part_name);
658 return 0;
659 };
660
661 UInt64 parts_to_fetch_blocks = 0;
662 for (const String & name : parts_to_fetch)
663 parts_to_fetch_blocks += get_blocks_count_in_data_part(name);
664
665 std::stringstream sanity_report;
666 sanity_report << "There are "
667 << unexpected_parts.size() << " unexpected parts with " << unexpected_parts_rows << " rows ("
668 << unexpected_parts_nonnew << " of them is not just-written with " << unexpected_parts_rows << " rows), "
669 << parts_to_fetch.size() << " missing parts (with " << parts_to_fetch_blocks << " blocks).";
670
671 /** We can automatically synchronize data,
672 * if the ratio of the total number of errors to the total number of parts (minimum - on the local filesystem or in ZK)
673 * is no more than some threshold (for example 50%).
674 *
675 * A large ratio of mismatches in the data on the filesystem and the expected data
676 * may indicate a configuration error (the server accidentally connected as a replica not from right shard).
677 * In this case, the protection mechanism does not allow the server to start.
678 */
679
680 UInt64 total_rows_on_filesystem = 0;
681 for (const auto & part : parts)
682 total_rows_on_filesystem += part->rows_count;
683
684 const auto storage_settings_ptr = getSettings();
685 bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
686
687 if (insane && !skip_sanity_checks)
688 {
689 std::stringstream why;
690 why << "The local set of parts of table " << database_name << "." << table_name << " doesn't look like the set of parts "
691 << "in ZooKeeper: "
692 << formatReadableQuantity(unexpected_parts_rows) << " rows of " << formatReadableQuantity(total_rows_on_filesystem)
693 << " total rows in filesystem are suspicious.";
694
695 throw Exception(why.str() + " " + sanity_report.str(), ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
696 }
697
698 if (unexpected_parts_nonnew_rows > 0)
699 LOG_WARNING(log, sanity_report.str());
700
701 /// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
702 std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
703 exists_futures.reserve(parts_to_fetch.size());
704 for (const String & part_name : parts_to_fetch)
705 {
706 String part_path = replica_path + "/parts/" + part_name;
707 exists_futures.emplace_back(zookeeper->asyncExists(part_path));
708 }
709
710 std::vector<std::future<Coordination::MultiResponse>> enqueue_futures;
711 enqueue_futures.reserve(parts_to_fetch.size());
712 for (size_t i = 0; i < parts_to_fetch.size(); ++i)
713 {
714 const String & part_name = parts_to_fetch[i];
715 LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: " << part_name);
716
717 Coordination::Requests ops;
718
719 time_t part_create_time = 0;
720 Coordination::ExistsResponse exists_resp = exists_futures[i].get();
721 if (!exists_resp.error)
722 {
723 part_create_time = exists_resp.stat.ctime / 1000;
724 removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0);
725 }
726
727 LogEntry log_entry;
728 log_entry.type = LogEntry::GET_PART;
729 log_entry.source_replica = "";
730 log_entry.new_part_name = part_name;
731 log_entry.create_time = part_create_time;
732
733 /// We assume that this occurs before the queue is loaded (queue.initialize).
734 ops.emplace_back(zkutil::makeCreateRequest(
735 replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
736
737 enqueue_futures.emplace_back(zookeeper->asyncMulti(ops));
738 }
739
740 for (auto & future : enqueue_futures)
741 future.get();
742
743 /// Remove extra local parts.
744 for (const DataPartPtr & part : unexpected_parts)
745 {
746 LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
747 forgetPartAndMoveToDetached(part, "ignored", true);
748 }
749}
750
751
752void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper,
753 const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet * absent_replicas_paths)
754{
755 if (part_name.empty())
756 part_name = part->name;
757
758 check(part->columns);
759 int expected_columns_version = columns_version;
760
761 auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
762 part->columns, part->checksums);
763
764 Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
765 std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
766 bool has_been_already_added = false;
767
768 for (const String & replica : replicas)
769 {
770 String current_part_path = zookeeper_path + "/replicas/" + replica + "/parts/" + part_name;
771
772 String part_zk_str;
773 if (!zookeeper->tryGet(current_part_path, part_zk_str))
774 {
775 if (absent_replicas_paths)
776 absent_replicas_paths->emplace(current_part_path);
777
778 continue;
779 }
780
781 ReplicatedMergeTreePartHeader replica_part_header;
782 if (!part_zk_str.empty())
783 replica_part_header = ReplicatedMergeTreePartHeader::fromString(part_zk_str);
784 else
785 {
786 Coordination::Stat columns_stat_before, columns_stat_after;
787 String columns_str;
788 String checksums_str;
789 /// Let's check that the node's version with the columns did not change while we were reading the checksums.
790 /// This ensures that the columns and the checksum refer to the same
791 if (!zookeeper->tryGet(current_part_path + "/columns", columns_str, &columns_stat_before) ||
792 !zookeeper->tryGet(current_part_path + "/checksums", checksums_str) ||
793 !zookeeper->exists(current_part_path + "/columns", &columns_stat_after) ||
794 columns_stat_before.version != columns_stat_after.version)
795 {
796 LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
797 << " because part changed while we were reading its checksums");
798 continue;
799 }
800
801 replica_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(
802 columns_str, checksums_str);
803 }
804
805 if (replica_part_header.getColumnsHash() != local_part_header.getColumnsHash())
806 {
807 LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
808 << " because columns are different");
809 continue;
810 }
811
812 replica_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);
813
814 if (replica == replica_name)
815 has_been_already_added = true;
816
817 /// If we verify checksums in "sequential manner" (i.e. recheck absence of checksums on other replicas when commit)
818 /// then it is enough to verify checksums on at least one replica since checksums on other replicas must be the same.
819 if (absent_replicas_paths)
820 {
821 absent_replicas_paths->clear();
822 break;
823 }
824 }
825
826 if (!has_been_already_added)
827 {
828 const auto storage_settings_ptr = getSettings();
829 String part_path = replica_path + "/parts/" + part_name;
830
831 ops.emplace_back(zkutil::makeCheckRequest(
832 zookeeper_path + "/columns", expected_columns_version));
833
834 if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
835 {
836 ops.emplace_back(zkutil::makeCreateRequest(
837 part_path, local_part_header.toString(), zkutil::CreateMode::Persistent));
838 }
839 else
840 {
841 ops.emplace_back(zkutil::makeCreateRequest(
842 part_path, "", zkutil::CreateMode::Persistent));
843 ops.emplace_back(zkutil::makeCreateRequest(
844 part_path + "/columns", part->columns.toString(), zkutil::CreateMode::Persistent));
845 ops.emplace_back(zkutil::makeCreateRequest(
846 part_path + "/checksums", getChecksumsForZooKeeper(part->checksums), zkutil::CreateMode::Persistent));
847 }
848 }
849 else
850 {
851 LOG_WARNING(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists."
852 << " Will not commit any nodes.");
853 }
854}
855
856MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAndCommit(Transaction & transaction,
857 const DataPartPtr & part)
858{
859 auto zookeeper = getZooKeeper();
860
861 while (true)
862 {
863 Coordination::Requests ops;
864 NameSet absent_part_paths_on_replicas;
865
866 /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`.
867 checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas);
868
869 /// Do not commit if the part is obsolete, we have just briefly checked its checksums
870 if (transaction.isEmpty())
871 return {};
872
873 /// Will check that the part did not suddenly appear on skipped replicas
874 if (!absent_part_paths_on_replicas.empty())
875 {
876 Coordination::Requests new_ops;
877 for (const String & part_path : absent_part_paths_on_replicas)
878 {
879 new_ops.emplace_back(zkutil::makeCreateRequest(part_path, "", zkutil::CreateMode::Persistent));
880 new_ops.emplace_back(zkutil::makeRemoveRequest(part_path, -1));
881 }
882
883 /// Add check ops at the beginning
884 new_ops.insert(new_ops.end(), ops.begin(), ops.end());
885 ops = std::move(new_ops);
886 }
887
888 try
889 {
890 zookeeper->multi(ops);
891 return transaction.commit();
892 }
893 catch (const zkutil::KeeperMultiException & e)
894 {
895 size_t num_check_ops = 2 * absent_part_paths_on_replicas.size();
896 size_t failed_op_index = e.failed_op_index;
897
898 if (failed_op_index < num_check_ops && e.code == Coordination::ZNODEEXISTS)
899 {
900 LOG_INFO(log, "The part " << e.getPathForFirstFailedOp() << " on a replica suddenly appeared, will recheck checksums");
901 }
902 else
903 throw;
904 }
905 }
906}
907
908String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const
909{
910 return MinimalisticDataPartChecksums::getSerializedString(checksums,
911 getSettings()->use_minimalistic_checksums_in_zookeeper);
912}
913
914
915bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
916{
917 if (entry.type == LogEntry::DROP_RANGE)
918 {
919 executeDropRange(entry);
920 return true;
921 }
922
923 if (entry.type == LogEntry::CLEAR_COLUMN || entry.type == LogEntry::CLEAR_INDEX)
924 {
925 executeClearColumnOrIndexInPartition(entry);
926 return true;
927 }
928
929 if (entry.type == LogEntry::REPLACE_RANGE)
930 {
931 executeReplaceRange(entry);
932 return true;
933 }
934
935 if (entry.type == LogEntry::GET_PART ||
936 entry.type == LogEntry::MERGE_PARTS ||
937 entry.type == LogEntry::MUTATE_PART)
938 {
939 /// If we already have this part or a part covering it, we do not need to do anything.
940 /// The part may be still in the PreCommitted -> Committed transition so we first search
941 /// among PreCommitted parts to definitely find the desired part if it exists.
942 DataPartPtr existing_part = getPartIfExists(entry.new_part_name, {MergeTreeDataPartState::PreCommitted});
943 if (!existing_part)
944 existing_part = getActiveContainingPart(entry.new_part_name);
945
946 /// Even if the part is locally, it (in exceptional cases) may not be in ZooKeeper. Let's check that it is there.
947 if (existing_part && getZooKeeper()->exists(replica_path + "/parts/" + existing_part->name))
948 {
949 if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
950 {
951 LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because part " + existing_part->name + " already exists.");
952 }
953 return true;
954 }
955 }
956
957 if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
958 LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
959
960 /// Perhaps we don't need this part, because during write with quorum, the quorum has failed (see below about `/quorum/failed_parts`).
961 if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
962 {
963 LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
964 return true; /// NOTE Deletion from `virtual_parts` is not done, but it is only necessary for merge.
965 }
966
967 bool do_fetch = false;
968 if (entry.type == LogEntry::GET_PART)
969 {
970 do_fetch = true;
971 }
972 else if (entry.type == LogEntry::MERGE_PARTS)
973 {
974 do_fetch = !tryExecuteMerge(entry);
975 }
976 else if (entry.type == LogEntry::MUTATE_PART)
977 {
978 do_fetch = !tryExecutePartMutation(entry);
979 }
980 else
981 {
982 throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)), ErrorCodes::LOGICAL_ERROR);
983 }
984
985 if (do_fetch)
986 return executeFetch(entry);
987
988 return true;
989}
990
991bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
992{
993 // Log source part names just in case
994 {
995 std::stringstream log_message;
996 log_message << "Executing log entry to merge parts ";
997 for (auto i : ext::range(0, entry.source_parts.size()))
998 log_message << (i != 0 ? ", " : "") << entry.source_parts[i];
999 log_message << " to " << entry.new_part_name;
1000
1001 LOG_TRACE(log, log_message.rdbuf());
1002 }
1003
1004 DataPartsVector parts;
1005 bool have_all_parts = true;
1006 for (const String & name : entry.source_parts)
1007 {
1008 DataPartPtr part = getActiveContainingPart(name);
1009 if (!part)
1010 {
1011 have_all_parts = false;
1012 break;
1013 }
1014 if (part->name != name)
1015 {
1016 LOG_WARNING(log, "Part " << name << " is covered by " << part->name
1017 << " but should be merged into " << entry.new_part_name << ". This shouldn't happen often.");
1018 have_all_parts = false;
1019 break;
1020 }
1021 parts.push_back(part);
1022 }
1023
1024 const auto storage_settings_ptr = getSettings();
1025 if (!have_all_parts)
1026 {
1027 /// If you do not have all the necessary parts, try to take some already merged part from someone.
1028 LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
1029 return false;
1030 }
1031 else if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
1032 {
1033 /// If entry is old enough, and have enough size, and part are exists in any replica,
1034 /// then prefer fetching of merged part from replica.
1035
1036 size_t sum_parts_bytes_on_disk = 0;
1037 for (const auto & part : parts)
1038 sum_parts_bytes_on_disk += part->bytes_on_disk;
1039
1040 if (sum_parts_bytes_on_disk >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
1041 {
1042 String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
1043 if (!replica.empty())
1044 {
1045 LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
1046 return false;
1047 }
1048 }
1049 }
1050
1051 /// Start to make the main work
1052 size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
1053
1054 /// Can throw an exception while reserving space.
1055 MergeTreeDataPart::TTLInfos ttl_infos;
1056 for (auto & part_ptr : parts)
1057 {
1058 ttl_infos.update(part_ptr->ttl_infos);
1059 }
1060 ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
1061 ttl_infos, time(nullptr));
1062
1063 auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
1064
1065 FutureMergedMutatedPart future_merged_part(parts);
1066 if (future_merged_part.name != entry.new_part_name)
1067 {
1068 throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: "
1069 + backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME);
1070 }
1071 future_merged_part.updatePath(*this, reserved_space);
1072
1073 MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_merged_part);
1074
1075 Transaction transaction(*this);
1076 MutableDataPartPtr part;
1077
1078 Stopwatch stopwatch;
1079
1080 auto write_part_log = [&] (const ExecutionStatus & execution_status)
1081 {
1082 writePartLog(
1083 PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
1084 entry.new_part_name, part, parts, merge_entry.get());
1085 };
1086
1087 try
1088 {
1089 part = merger_mutator.mergePartsToTemporaryPart(
1090 future_merged_part, *merge_entry, table_lock, entry.create_time, reserved_space, entry.deduplicate, entry.force_ttl);
1091
1092 merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
1093 removeEmptyColumnsFromPart(part);
1094
1095 try
1096 {
1097 checkPartChecksumsAndCommit(transaction, part);
1098 }
1099 catch (const Exception & e)
1100 {
1101 if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
1102 {
1103 transaction.rollback();
1104
1105 ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
1106
1107 LOG_ERROR(log, getCurrentExceptionMessage(false) << ". "
1108 "Data after merge is not byte-identical to data on another replicas. "
1109 "There could be several reasons: "
1110 "1. Using newer version of compression library after server update. "
1111 "2. Using another compression method. "
1112 "3. Non-deterministic compression algorithm (highly unlikely). "
1113 "4. Non-deterministic merge algorithm due to logical error in code. "
1114 "5. Data corruption in memory due to bug in code. "
1115 "6. Data corruption in memory due to hardware issue. "
1116 "7. Manual modification of source data after server startup. "
1117 "8. Manual modification of checksums stored in ZooKeeper. "
1118 "We will download merged part from replica to force byte-identical result.");
1119
1120 write_part_log(ExecutionStatus::fromCurrentException());
1121
1122 tryRemovePartImmediately(std::move(part));
1123 /// No need to delete the part from ZK because we can be sure that the commit transaction
1124 /// didn't go through.
1125
1126 return false;
1127 }
1128
1129 throw;
1130 }
1131
1132 /** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
1133 */
1134
1135 /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
1136 * This is not a problem, because in this case the merge will remain in the queue, and we will try again.
1137 */
1138 merge_selecting_task->schedule();
1139 ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
1140
1141 write_part_log({});
1142
1143 return true;
1144 }
1145 catch (...)
1146 {
1147 write_part_log(ExecutionStatus::fromCurrentException());
1148 throw;
1149 }
1150}
1151
1152
1153bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
1154{
1155 const String & source_part_name = entry.source_parts.at(0);
1156 const auto storage_settings_ptr = getSettings();
1157 LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name);
1158
1159 DataPartPtr source_part = getActiveContainingPart(source_part_name);
1160 if (!source_part)
1161 {
1162 LOG_DEBUG(log, "Source part " + source_part_name + " for " << entry.new_part_name << " is not ready; will try to fetch it instead");
1163 return false;
1164 }
1165
1166 if (source_part->name != source_part_name)
1167 {
1168 throw Exception("Part " + source_part_name + " is covered by " + source_part->name
1169 + " but should be mutated to " + entry.new_part_name + ". This is a bug.",
1170 ErrorCodes::LOGICAL_ERROR);
1171 }
1172
1173 /// TODO - some better heuristic?
1174 size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
1175
1176 if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
1177 && estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
1178 {
1179 /// If entry is old enough, and have enough size, and some replica has the desired part,
1180 /// then prefer fetching from replica.
1181 String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
1182 if (!replica.empty())
1183 {
1184 LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
1185 return false;
1186 }
1187 }
1188
1189
1190 MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(
1191 entry.new_part_name, format_version);
1192 MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);
1193
1194 /// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
1195 /// Can throw an exception.
1196 ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
1197
1198 auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
1199
1200 MutableDataPartPtr new_part;
1201 Transaction transaction(*this);
1202
1203 FutureMergedMutatedPart future_mutated_part;
1204 future_mutated_part.parts.push_back(source_part);
1205 future_mutated_part.part_info = new_part_info;
1206 future_mutated_part.name = entry.new_part_name;
1207 future_mutated_part.updatePath(*this, reserved_space);
1208
1209 MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(
1210 database_name, table_name, future_mutated_part);
1211
1212 Stopwatch stopwatch;
1213
1214 auto write_part_log = [&] (const ExecutionStatus & execution_status)
1215 {
1216 writePartLog(
1217 PartLogElement::MUTATE_PART, execution_status, stopwatch.elapsed(),
1218 entry.new_part_name, new_part, future_mutated_part.parts, merge_entry.get());
1219 };
1220
1221 try
1222 {
1223 new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, reserved_space, table_lock);
1224 renameTempPartAndReplace(new_part, nullptr, &transaction);
1225
1226 try
1227 {
1228 checkPartChecksumsAndCommit(transaction, new_part);
1229 }
1230 catch (const Exception & e)
1231 {
1232 if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
1233 {
1234 transaction.rollback();
1235
1236 ProfileEvents::increment(ProfileEvents::DataAfterMutationDiffersFromReplica);
1237
1238 LOG_ERROR(log, getCurrentExceptionMessage(false) << ". "
1239 "Data after mutation is not byte-identical to data on another replicas. "
1240 "We will download merged part from replica to force byte-identical result.");
1241
1242 write_part_log(ExecutionStatus::fromCurrentException());
1243
1244 tryRemovePartImmediately(std::move(new_part));
1245 /// No need to delete the part from ZK because we can be sure that the commit transaction
1246 /// didn't go through.
1247
1248 return false;
1249 }
1250
1251 throw;
1252 }
1253
1254 /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
1255 * This is not a problem, because in this case the entry will remain in the queue, and we will try again.
1256 */
1257 merge_selecting_task->schedule();
1258 ProfileEvents::increment(ProfileEvents::ReplicatedPartMutations);
1259 write_part_log({});
1260
1261 return true;
1262 }
1263 catch (...)
1264 {
1265 write_part_log(ExecutionStatus::fromCurrentException());
1266 throw;
1267 }
1268}
1269
1270
1271bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
1272{
1273 String replica = findReplicaHavingCoveringPart(entry, true);
1274 const auto storage_settings_ptr = getSettings();
1275
1276 static std::atomic_uint total_fetches {0};
1277 if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
1278 {
1279 throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
1280 ErrorCodes::TOO_MANY_FETCHES);
1281 }
1282
1283 ++total_fetches;
1284 SCOPE_EXIT({--total_fetches;});
1285
1286 if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
1287 {
1288 throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
1289 ErrorCodes::TOO_MANY_FETCHES);
1290 }
1291
1292 ++current_table_fetches;
1293 SCOPE_EXIT({--current_table_fetches;});
1294
1295 try
1296 {
1297 if (replica.empty())
1298 {
1299 /** If a part is to be written with a quorum and the quorum is not reached yet,
1300 * then (due to the fact that a part is impossible to download right now),
1301 * the quorum entry should be considered unsuccessful.
1302 * TODO Complex code, extract separately.
1303 */
1304 if (entry.quorum)
1305 {
1306 if (entry.type != LogEntry::GET_PART)
1307 throw Exception("Logical error: log entry with quorum but type is not GET_PART", ErrorCodes::LOGICAL_ERROR);
1308
1309 LOG_DEBUG(log, "No active replica has part " << entry.new_part_name << " which needs to be written with quorum."
1310 " Will try to mark that quorum as failed.");
1311
1312 /** Atomically:
1313 * - if replicas do not become active;
1314 * - if there is a `quorum` node with this part;
1315 * - delete `quorum` node;
1316 * - add a part to the list `quorum/failed_parts`;
1317 * - if the part is not already removed from the list for deduplication `blocks/block_num`, then delete it;
1318 *
1319 * If something changes, then we will nothing - we'll get here again next time.
1320 */
1321
1322 /** We collect the `host` node versions from the replicas.
1323 * When the replica becomes active, it changes the value of host in the same transaction (with the creation of `is_active`).
1324 * This will ensure that the replicas do not become active.
1325 */
1326
1327 auto zookeeper = getZooKeeper();
1328
1329 Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
1330
1331 Coordination::Requests ops;
1332
1333 for (size_t i = 0, size = replicas.size(); i < size; ++i)
1334 {
1335 Coordination::Stat stat;
1336 String path = zookeeper_path + "/replicas/" + replicas[i] + "/host";
1337 zookeeper->get(path, &stat);
1338 ops.emplace_back(zkutil::makeCheckRequest(path, stat.version));
1339 }
1340
1341 /// We verify that while we were collecting versions, the replica with the necessary part did not come alive.
1342 replica = findReplicaHavingPart(entry.new_part_name, true);
1343
1344 /// Also during this time a completely new replica could be created.
1345 /// But if a part does not appear on the old, then it can not be on the new one either.
1346
1347 if (replica.empty())
1348 {
1349 Coordination::Stat quorum_stat;
1350 String quorum_path = zookeeper_path + "/quorum/status";
1351 String quorum_str = zookeeper->get(quorum_path, &quorum_stat);
1352 ReplicatedMergeTreeQuorumEntry quorum_entry;
1353 quorum_entry.fromString(quorum_str);
1354
1355 if (quorum_entry.part_name == entry.new_part_name)
1356 {
1357 ops.emplace_back(zkutil::makeRemoveRequest(quorum_path, quorum_stat.version));
1358
1359 auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
1360
1361 if (part_info.min_block != part_info.max_block)
1362 throw Exception("Logical error: log entry with quorum for part covering more than one block number",
1363 ErrorCodes::LOGICAL_ERROR);
1364
1365 ops.emplace_back(zkutil::makeCreateRequest(
1366 zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name,
1367 "",
1368 zkutil::CreateMode::Persistent));
1369
1370 /// Deleting from `blocks`.
1371 if (!entry.block_id.empty() && zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id))
1372 ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/blocks/" + entry.block_id, -1));
1373
1374 Coordination::Responses responses;
1375 auto code = zookeeper->tryMulti(ops, responses);
1376
1377 if (code == Coordination::ZOK)
1378 {
1379 LOG_DEBUG(log, "Marked quorum for part " << entry.new_part_name << " as failed.");
1380 queue.removeFromVirtualParts(part_info);
1381 return true;
1382 }
1383 else if (code == Coordination::ZBADVERSION || code == Coordination::ZNONODE || code == Coordination::ZNODEEXISTS)
1384 {
1385 LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part "
1386 << entry.new_part_name << " as failed. Code: " << zkutil::ZooKeeper::error2string(code));
1387 }
1388 else
1389 throw Coordination::Exception(code);
1390 }
1391 else
1392 {
1393 LOG_WARNING(log, "No active replica has part " << entry.new_part_name
1394 << ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name
1395 << ". It means that part was successfully written to " << entry.quorum
1396 << " replicas, but then all of them goes offline."
1397 << " Or it is a bug.");
1398 }
1399 }
1400 }
1401
1402 if (replica.empty())
1403 {
1404 ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
1405 throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART);
1406 }
1407 }
1408
1409 try
1410 {
1411 if (!fetchPart(entry.actual_new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
1412 return false;
1413 }
1414 catch (Exception & e)
1415 {
1416 /// No stacktrace, just log message
1417 if (e.code() == ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS)
1418 e.addMessage("Too busy replica. Will try later.");
1419 throw;
1420 }
1421
1422 if (entry.type == LogEntry::MERGE_PARTS)
1423 ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
1424 }
1425 catch (...)
1426 {
1427 /** If you can not download the part you need for some merge, it's better not to try to get other parts for this merge,
1428 * but try to get already merged part. To do this, move the action to get the remaining parts
1429 * for this merge at the end of the queue.
1430 */
1431 try
1432 {
1433 auto parts_for_merge = queue.moveSiblingPartsForMergeToEndOfQueue(entry.new_part_name);
1434
1435 if (!parts_for_merge.empty() && replica.empty())
1436 {
1437 LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
1438 return false;
1439 }
1440
1441 /** If no active replica has a part, and there is no merge in the queue with its participation,
1442 * check to see if any (active or inactive) replica has such a part or covering it.
1443 */
1444 if (replica.empty())
1445 enqueuePartForCheck(entry.new_part_name);
1446 }
1447 catch (...)
1448 {
1449 tryLogCurrentException(log, __PRETTY_FUNCTION__);
1450 }
1451
1452 throw;
1453 }
1454
1455 return true;
1456}
1457
1458
1459void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
1460{
1461 auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
1462 queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
1463
1464 LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
1465
1466 /// Delete the parts contained in the range to be deleted.
1467 /// It's important that no old parts remain (after the merge), because otherwise,
1468 /// after adding a new replica, this new replica downloads them, but does not delete them.
1469 /// And, if you do not, the parts will come to life after the server is restarted.
1470 /// Therefore, we use all data parts.
1471
1472 DataPartsVector parts_to_remove;
1473 {
1474 auto data_parts_lock = lockParts();
1475 parts_to_remove = removePartsInRangeFromWorkingSet(drop_range_info, true, true, data_parts_lock);
1476 }
1477
1478 if (entry.detach)
1479 {
1480 /// If DETACH clone parts to detached/ directory
1481 for (const auto & part : parts_to_remove)
1482 {
1483 LOG_INFO(log, "Detaching " << part->relative_path);
1484 part->makeCloneInDetached("");
1485 }
1486 }
1487
1488 /// Forcibly remove parts from ZooKeeper
1489 tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
1490
1491 LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << parts_to_remove.size() << " parts inside " << entry.new_part_name << ".");
1492
1493 /// We want to remove dropped parts from disk as soon as possible
1494 /// To be removed a partition should have zero refcount, therefore call the cleanup thread at exit
1495 parts_to_remove.clear();
1496 cleanup_thread.wakeup();
1497}
1498
1499
1500void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogEntry & entry)
1501{
1502 LOG_INFO(log, "Clear column " << entry.column_name << " in parts inside " << entry.new_part_name << " range");
1503
1504 auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
1505
1506 /// We don't change table structure, only data in some parts
1507 /// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
1508 /// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
1509 auto lock_read_structure = lockStructureForShare(false, RWLockImpl::NO_QUERY);
1510
1511 auto zookeeper = getZooKeeper();
1512
1513 AlterCommand alter_command;
1514 if (entry.type == LogEntry::CLEAR_COLUMN)
1515 {
1516 alter_command.type = AlterCommand::DROP_COLUMN;
1517 alter_command.column_name = entry.column_name;
1518 }
1519 else if (entry.type == LogEntry::CLEAR_INDEX)
1520 {
1521 alter_command.type = AlterCommand::DROP_INDEX;
1522 alter_command.index_name = entry.index_name;
1523 }
1524
1525 StorageInMemoryMetadata metadata = getInMemoryMetadata();
1526 alter_command.apply(metadata);
1527
1528 size_t modified_parts = 0;
1529 auto parts = getDataParts();
1530 auto columns_for_parts = metadata.columns.getAllPhysical();
1531
1532 /// Check there are no merges in range again
1533 /// TODO: Currently, there are no guarantees that a merge covering entry_part_info will happen during the execution.
1534 /// To solve this problem we could add read/write flags for each part in future_parts
1535 /// and make more sophisticated checks for merges in shouldExecuteLogEntry().
1536 /// But this feature will be useless when the mutation feature is implemented.
1537 queue.checkThereAreNoConflictsInRange(entry_part_info, entry);
1538
1539 for (const auto & part : parts)
1540 {
1541 if (!entry_part_info.contains(part->info))
1542 continue;
1543
1544 if (entry.type == LogEntry::CLEAR_COLUMN)
1545 LOG_DEBUG(log, "Clearing column " << alter_command.column_name << " in part " << part->name);
1546 else if (entry.type == LogEntry::CLEAR_INDEX)
1547 LOG_DEBUG(log, "Clearing index " << alter_command.index_name << " in part " << part->name);
1548
1549 MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
1550 alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
1551 if (!transaction->isValid())
1552 continue;
1553
1554 updatePartHeaderInZooKeeperAndCommit(zookeeper, *transaction);
1555
1556 ++modified_parts;
1557 }
1558
1559 if (entry.type == LogEntry::CLEAR_COLUMN)
1560 LOG_DEBUG(log, "Cleared column " << entry.column_name << " in " << modified_parts << " parts");
1561 else if (entry.type == LogEntry::CLEAR_INDEX)
1562 LOG_DEBUG(log, "Cleared index " << entry.index_name << " in " << modified_parts << " parts");
1563
1564 /// Recalculate columns size (not only for the modified column)
1565 recalculateColumnSizes();
1566}
1567
1568
1569bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
1570{
1571 Stopwatch watch;
1572 auto & entry_replace = *entry.replace_range_entry;
1573
1574 MergeTreePartInfo drop_range = MergeTreePartInfo::fromPartName(entry_replace.drop_range_part_name, format_version);
1575 /// Range with only one block has special meaning ATTACH PARTITION
1576 bool replace = drop_range.getBlocksCount() > 1;
1577
1578 queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
1579
1580 struct PartDescription
1581 {
1582 PartDescription(size_t index_, const String & src_part_name_, const String & new_part_name_, const String & checksum_hex_,
1583 MergeTreeDataFormatVersion format_version)
1584 : index(index_),
1585 src_part_name(src_part_name_), src_part_info(MergeTreePartInfo::fromPartName(src_part_name_, format_version)),
1586 new_part_name(new_part_name_), new_part_info(MergeTreePartInfo::fromPartName(new_part_name_, format_version)),
1587 checksum_hex(checksum_hex_) {}
1588
1589 size_t index; // in log entry arrays
1590 String src_part_name;
1591 MergeTreePartInfo src_part_info;
1592 String new_part_name;
1593 MergeTreePartInfo new_part_info;
1594 String checksum_hex;
1595
1596 /// Part which will be committed
1597 MutableDataPartPtr res_part;
1598
1599 /// We could find a covering part
1600 MergeTreePartInfo found_new_part_info;
1601 String found_new_part_name;
1602
1603 /// Hold pointer to part in source table if will clone it from local table
1604 DataPartPtr src_table_part;
1605
1606 /// A replica that will be used to fetch part
1607 String replica;
1608 };
1609
1610 using PartDescriptionPtr = std::shared_ptr<PartDescription>;
1611 using PartDescriptions = std::vector<PartDescriptionPtr>;
1612
1613 PartDescriptions all_parts;
1614 PartDescriptions parts_to_add;
1615 DataPartsVector parts_to_remove;
1616
1617 auto table_lock_holder_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY);
1618
1619 for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
1620 {
1621 all_parts.emplace_back(std::make_shared<PartDescription>(i,
1622 entry_replace.src_part_names.at(i),
1623 entry_replace.new_part_names.at(i),
1624 entry_replace.part_names_checksums.at(i),
1625 format_version));
1626 }
1627
1628 /// What parts we should add? Or we have already added all required parts (we an replica-initializer)
1629 {
1630 auto data_parts_lock = lockParts();
1631
1632 for (const PartDescriptionPtr & part_desc : all_parts)
1633 {
1634 if (!getActiveContainingPart(part_desc->new_part_info, MergeTreeDataPartState::Committed, data_parts_lock))
1635 parts_to_add.emplace_back(part_desc);
1636 }
1637
1638 if (parts_to_add.empty() && replace)
1639 parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
1640 }
1641
1642 if (parts_to_add.empty())
1643 {
1644 LOG_INFO(log, "All parts from REPLACE PARTITION command have been already attached");
1645 tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
1646 return true;
1647 }
1648
1649 if (parts_to_add.size() < all_parts.size())
1650 {
1651 LOG_WARNING(log, "Some (but not all) parts from REPLACE PARTITION command already exist. REPLACE PARTITION will not be atomic.");
1652 }
1653
1654 StoragePtr source_table;
1655 TableStructureReadLockHolder table_lock_holder_src_table;
1656 String source_table_name = entry_replace.from_database + "." + entry_replace.from_table;
1657
1658 auto clone_data_parts_from_source_table = [&] () -> size_t
1659 {
1660 source_table = global_context.tryGetTable(entry_replace.from_database, entry_replace.from_table);
1661 if (!source_table)
1662 {
1663 LOG_DEBUG(log, "Can't use " << source_table_name << " as source table for REPLACE PARTITION command. It does not exist.");
1664 return 0;
1665 }
1666
1667 MergeTreeData * src_data = nullptr;
1668 try
1669 {
1670 src_data = &checkStructureAndGetMergeTreeData(source_table);
1671 }
1672 catch (Exception &)
1673 {
1674 LOG_INFO(log, "Can't use " << source_table_name << " as source table for REPLACE PARTITION command. Will fetch all parts."
1675 << " Reason: " << getCurrentExceptionMessage(false));
1676 return 0;
1677 }
1678
1679 table_lock_holder_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY);
1680
1681 DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
1682 MergeTreeDataPartState::Outdated};
1683
1684 size_t num_clonable_parts = 0;
1685 for (PartDescriptionPtr & part_desc : parts_to_add)
1686 {
1687 auto src_part = src_data->getPartIfExists(part_desc->src_part_info, valid_states);
1688 if (!src_part)
1689 {
1690 LOG_DEBUG(log, "There is no part " << part_desc->src_part_name << " in " << source_table_name);
1691 continue;
1692 }
1693
1694 String checksum_hex;
1695 {
1696 std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
1697 checksum_hex = src_part->checksums.getTotalChecksumHex();
1698 }
1699
1700 if (checksum_hex != part_desc->checksum_hex)
1701 {
1702 LOG_DEBUG(log, "Part " << part_desc->src_part_name << " of " << source_table_name << " has inappropriate checksum");
1703 /// TODO: check version
1704 continue;
1705 }
1706
1707 part_desc->found_new_part_name = part_desc->new_part_name;
1708 part_desc->found_new_part_info = part_desc->new_part_info;
1709 part_desc->src_table_part = src_part;
1710
1711 ++num_clonable_parts;
1712 }
1713
1714 return num_clonable_parts;
1715 };
1716
1717 size_t num_clonable_parts = clone_data_parts_from_source_table();
1718 LOG_DEBUG(log, "Found " << num_clonable_parts << " parts that could be cloned (of " << parts_to_add.size() << " required parts)");
1719
1720 ActiveDataPartSet adding_parts_active_set(format_version);
1721 std::unordered_map<String, PartDescriptionPtr> part_name_to_desc;
1722
1723 for (PartDescriptionPtr & part_desc : parts_to_add)
1724 {
1725 if (part_desc->src_table_part)
1726 {
1727 /// It is clonable part
1728 adding_parts_active_set.add(part_desc->new_part_name);
1729 part_name_to_desc.emplace(part_desc->new_part_name, part_desc);
1730 continue;
1731 }
1732
1733 /// Firstly, try find exact part to produce more accurate part set
1734 String replica = findReplicaHavingPart(part_desc->new_part_name, true);
1735 String found_part_name;
1736 /// TODO: check version
1737
1738 if (replica.empty())
1739 {
1740 LOG_DEBUG(log, "Part " << part_desc->new_part_name << " is not found on remote replicas");
1741
1742 /// Fallback to covering part
1743 replica = findReplicaHavingCoveringPart(part_desc->new_part_name, true, found_part_name);
1744
1745 if (replica.empty())
1746 {
1747 /// It is not fail, since adjacent parts could cover current part
1748 LOG_DEBUG(log, "Parts covering " << part_desc->new_part_name << " are not found on remote replicas");
1749 continue;
1750 }
1751 }
1752 else
1753 {
1754 found_part_name = part_desc->new_part_name;
1755 }
1756
1757 part_desc->found_new_part_name = found_part_name;
1758 part_desc->found_new_part_info = MergeTreePartInfo::fromPartName(found_part_name, format_version);
1759 part_desc->replica = replica;
1760
1761 adding_parts_active_set.add(part_desc->found_new_part_name);
1762 part_name_to_desc.emplace(part_desc->found_new_part_name, part_desc);
1763 }
1764
1765 /// Check that we could cover whole range
1766 for (PartDescriptionPtr & part_desc : parts_to_add)
1767 {
1768 if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
1769 {
1770 throw Exception("Not found part " + part_desc->new_part_name +
1771 " (or part covering it) neither source table neither remote replicas" , ErrorCodes::NO_REPLICA_HAS_PART);
1772 }
1773 }
1774
1775 /// Filter covered parts
1776 PartDescriptions final_parts;
1777 {
1778 Strings final_part_names = adding_parts_active_set.getParts();
1779
1780 for (const String & final_part_name : final_part_names)
1781 {
1782 auto part_desc = part_name_to_desc[final_part_name];
1783 if (!part_desc)
1784 throw Exception("There is no final part " + final_part_name + ". This is a bug", ErrorCodes::LOGICAL_ERROR);
1785
1786 final_parts.emplace_back(part_desc);
1787
1788 if (final_parts.size() > 1)
1789 {
1790 auto & prev = *final_parts[final_parts.size() - 2];
1791 auto & curr = *final_parts[final_parts.size() - 1];
1792
1793 if (!prev.found_new_part_info.isDisjoint(curr.found_new_part_info))
1794 {
1795 throw Exception("Intersected final parts detected: " + prev.found_new_part_name
1796 + " and " + curr.found_new_part_name + ". It should be investigated.", ErrorCodes::INCORRECT_DATA);
1797 }
1798 }
1799 }
1800 }
1801
1802 static const String TMP_PREFIX = "tmp_replace_from_";
1803
1804 auto obtain_part = [&] (PartDescriptionPtr & part_desc)
1805 {
1806 if (part_desc->src_table_part)
1807 {
1808 std::shared_lock<std::shared_mutex> part_lock(part_desc->src_table_part->columns_lock);
1809
1810 if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex())
1811 throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);
1812
1813 part_desc->res_part = cloneAndLoadDataPartOnSameDisk(
1814 part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info);
1815 }
1816 else if (!part_desc->replica.empty())
1817 {
1818 String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
1819 ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
1820 auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
1821 auto [user, password] = global_context.getInterserverCredentials();
1822 String interserver_scheme = global_context.getInterserverScheme();
1823
1824 if (interserver_scheme != address.scheme)
1825 throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
1826
1827 part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, source_replica_path,
1828 address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");
1829
1830 /// TODO: check columns_version of fetched part
1831
1832 ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
1833 }
1834 else
1835 throw Exception("There is no receipt to produce part " + part_desc->new_part_name + ". This is bug", ErrorCodes::LOGICAL_ERROR);
1836 };
1837
1838 /// Download or clone parts
1839 /// TODO: make it in parallel
1840 for (PartDescriptionPtr & part_desc : final_parts)
1841 obtain_part(part_desc);
1842
1843 MutableDataPartsVector res_parts;
1844 for (PartDescriptionPtr & part_desc : final_parts)
1845 res_parts.emplace_back(part_desc->res_part);
1846
1847 try
1848 {
1849 /// Commit parts
1850 auto zookeeper = getZooKeeper();
1851 Transaction transaction(*this);
1852
1853 Coordination::Requests ops;
1854 for (PartDescriptionPtr & part_desc : final_parts)
1855 {
1856 renameTempPartAndReplace(part_desc->res_part, nullptr, &transaction);
1857 getCommitPartOps(ops, part_desc->res_part);
1858
1859 if (ops.size() > zkutil::MULTI_BATCH_SIZE)
1860 {
1861 zookeeper->multi(ops);
1862 ops.clear();
1863 }
1864 }
1865
1866 if (!ops.empty())
1867 zookeeper->multi(ops);
1868
1869 {
1870 auto data_parts_lock = lockParts();
1871
1872 transaction.commit(&data_parts_lock);
1873 if (replace)
1874 parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
1875 }
1876
1877 PartLog::addNewParts(global_context, res_parts, watch.elapsed());
1878 }
1879 catch (...)
1880 {
1881 PartLog::addNewParts(global_context, res_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
1882 throw;
1883 }
1884
1885 tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
1886 res_parts.clear();
1887 parts_to_remove.clear();
1888 cleanup_thread.wakeup();
1889
1890 return true;
1891}
1892
1893
1894void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
1895{
1896 LOG_INFO(log, "Will mimic " << source_replica);
1897
1898 String source_path = zookeeper_path + "/replicas/" + source_replica;
1899
1900 /** TODO: it will be deleted! (It is only to support old version of CH server).
1901 * In current code, the replica is created in single transaction.
1902 * If the reference/master replica is not yet fully created, let's wait.
1903 */
1904 while (!zookeeper->exists(source_path + "/columns"))
1905 {
1906 LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
1907
1908 zkutil::EventPtr event = std::make_shared<Poco::Event>();
1909 if (zookeeper->exists(source_path + "/columns", nullptr, event))
1910 {
1911 LOG_WARNING(log, "Oops, a watch has leaked");
1912 break;
1913 }
1914
1915 event->wait();
1916 }
1917
1918 /// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
1919
1920 String raw_log_pointer = zookeeper->get(source_path + "/log_pointer");
1921
1922 Coordination::Requests ops;
1923 ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
1924
1925 /// For support old versions CH.
1926 if (source_is_lost_stat.version == -1)
1927 {
1928 /// We check that it was not suddenly upgraded to new version.
1929 /// Otherwise it can be upgraded and instantly become lost, but we cannot notice that.
1930 ops.push_back(zkutil::makeCreateRequest(source_path + "/is_lost", "0", zkutil::CreateMode::Persistent));
1931 ops.push_back(zkutil::makeRemoveRequest(source_path + "/is_lost", -1));
1932 }
1933 else /// The replica we clone should not suddenly become lost.
1934 ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version));
1935
1936 Coordination::Responses resp;
1937
1938 auto error = zookeeper->tryMulti(ops, resp);
1939 if (error == Coordination::Error::ZBADVERSION)
1940 throw Exception("Can not clone replica, because the " + source_replica + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED);
1941 else if (error == Coordination::Error::ZNODEEXISTS)
1942 throw Exception("Can not clone replica, because the " + source_replica + " updated to new ClickHouse version", ErrorCodes::REPLICA_STATUS_CHANGED);
1943 else
1944 zkutil::KeeperMultiException::check(error, ops, resp);
1945
1946 /// Let's remember the queue of the reference/master replica.
1947 Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
1948 std::sort(source_queue_names.begin(), source_queue_names.end());
1949 Strings source_queue;
1950 for (const String & entry_name : source_queue_names)
1951 {
1952 String entry;
1953 if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
1954 continue;
1955 source_queue.push_back(entry);
1956 }
1957
1958 /// Add to the queue jobs to receive all the active parts that the reference/master replica has.
1959 Strings source_replica_parts = zookeeper->getChildren(source_path + "/parts");
1960 ActiveDataPartSet active_parts_set(format_version, source_replica_parts);
1961
1962 Strings active_parts = active_parts_set.getParts();
1963
1964 /// Remove local parts if source replica does not have them, because such parts will never be fetched by other replicas.
1965 Strings local_parts_in_zk = zookeeper->getChildren(replica_path + "/parts");
1966 Strings parts_to_remove_from_zk;
1967 for (const auto & part : local_parts_in_zk)
1968 {
1969 if (active_parts_set.getContainingPart(part).empty())
1970 {
1971 queue.remove(zookeeper, part);
1972 parts_to_remove_from_zk.emplace_back(part);
1973 LOG_WARNING(log, "Source replica does not have part " << part << ". Removing it from ZooKeeper.");
1974 }
1975 }
1976 tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
1977
1978 auto local_active_parts = getDataParts();
1979 DataPartsVector parts_to_remove_from_working_set;
1980 for (const auto & part : local_active_parts)
1981 {
1982 if (active_parts_set.getContainingPart(part->name).empty())
1983 {
1984 parts_to_remove_from_working_set.emplace_back(part);
1985 LOG_WARNING(log, "Source replica does not have part " << part->name << ". Removing it from working set.");
1986 }
1987 }
1988 removePartsFromWorkingSet(parts_to_remove_from_working_set, true);
1989
1990 for (const String & name : active_parts)
1991 {
1992 LogEntry log_entry;
1993 log_entry.type = LogEntry::GET_PART;
1994 log_entry.source_replica = "";
1995 log_entry.new_part_name = name;
1996 log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
1997
1998 zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
1999 }
2000
2001 LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
2002
2003 /// Add content of the reference/master replica queue to the queue.
2004 for (const String & entry : source_queue)
2005 {
2006 zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
2007 }
2008
2009 LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
2010}
2011
2012
2013void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper)
2014{
2015 String res;
2016 if (zookeeper->tryGet(replica_path + "/is_lost", res))
2017 {
2018 if (res == "0")
2019 return;
2020 }
2021 else
2022 {
2023 /// Replica was created by old version of CH, so me must create "/is_lost".
2024 /// Note that in old version of CH there was no "lost" replicas possible.
2025 zookeeper->create(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
2026 return;
2027 }
2028
2029 /// is_lost is "1": it means that we are in repair mode.
2030
2031 String source_replica;
2032 Coordination::Stat source_is_lost_stat;
2033 source_is_lost_stat.version = -1;
2034
2035 for (const String & source_replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
2036 {
2037 String source_replica_path = zookeeper_path + "/replicas/" + source_replica_name;
2038
2039 /// Do not clone from myself.
2040 if (source_replica_path != replica_path)
2041 {
2042 /// Do not clone from lost replicas.
2043 String source_replica_is_lost_value;
2044 if (!zookeeper->tryGet(source_replica_path + "/is_lost", source_replica_is_lost_value, &source_is_lost_stat)
2045 || source_replica_is_lost_value == "0")
2046 {
2047 source_replica = source_replica_name;
2048 break;
2049 }
2050 }
2051 }
2052
2053 if (source_replica.empty())
2054 throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
2055
2056 /// Clear obsolete queue that we no longer need.
2057 zookeeper->removeChildren(replica_path + "/queue");
2058
2059 /// Will do repair from the selected replica.
2060 cloneReplica(source_replica, source_is_lost_stat, zookeeper);
2061 /// If repair fails to whatever reason, the exception is thrown, is_lost will remain "1" and the replica will be repaired later.
2062
2063 /// If replica is repaired successfully, we remove is_lost flag.
2064 zookeeper->set(replica_path + "/is_lost", "0");
2065}
2066
2067
2068void StorageReplicatedMergeTree::queueUpdatingTask()
2069{
2070 if (!queue_update_in_progress)
2071 {
2072 last_queue_update_start_time.store(time(nullptr));
2073 queue_update_in_progress = true;
2074 }
2075 try
2076 {
2077 queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
2078 last_queue_update_finish_time.store(time(nullptr));
2079 queue_update_in_progress = false;
2080 }
2081 catch (const Coordination::Exception & e)
2082 {
2083 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2084
2085 if (e.code == Coordination::ZSESSIONEXPIRED)
2086 {
2087 restarting_thread.wakeup();
2088 return;
2089 }
2090
2091 queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
2092 }
2093 catch (...)
2094 {
2095 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2096 queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
2097 }
2098}
2099
2100
2101void StorageReplicatedMergeTree::mutationsUpdatingTask()
2102{
2103 try
2104 {
2105 queue.updateMutations(getZooKeeper(), mutations_updating_task->getWatchCallback());
2106 }
2107 catch (const Coordination::Exception & e)
2108 {
2109 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2110
2111 if (e.code == Coordination::ZSESSIONEXPIRED)
2112 return;
2113
2114 mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
2115 }
2116 catch (...)
2117 {
2118 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2119 mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
2120 }
2121}
2122
2123
2124BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
2125{
2126 /// If replication queue is stopped exit immediately as we successfully executed the task
2127 if (queue.actions_blocker.isCancelled())
2128 {
2129 std::this_thread::sleep_for(std::chrono::milliseconds(5));
2130 return BackgroundProcessingPoolTaskResult::SUCCESS;
2131 }
2132
2133 /// This object will mark the element of the queue as running.
2134 ReplicatedMergeTreeQueue::SelectedEntry selected;
2135
2136 try
2137 {
2138 selected = queue.selectEntryToProcess(merger_mutator, *this);
2139 }
2140 catch (...)
2141 {
2142 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2143 }
2144
2145 LogEntryPtr & entry = selected.first;
2146
2147 if (!entry)
2148 return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
2149
2150 time_t prev_attempt_time = entry->last_attempt_time;
2151
2152 bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
2153 {
2154 try
2155 {
2156 return executeLogEntry(*entry_to_process);
2157 }
2158 catch (const Exception & e)
2159 {
2160 if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
2161 {
2162 /// If no one has the right part, probably not all replicas work; We will not write to log with Error level.
2163 LOG_INFO(log, e.displayText());
2164 }
2165 else if (e.code() == ErrorCodes::ABORTED)
2166 {
2167 /// Interrupted merge or downloading a part is not an error.
2168 LOG_INFO(log, e.message());
2169 }
2170 else if (e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
2171 {
2172 /// Part cannot be added temporarily
2173 LOG_INFO(log, e.displayText());
2174 cleanup_thread.wakeup();
2175 }
2176 else
2177 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2178
2179 /** This exception will be written to the queue element, and it can be looked up using `system.replication_queue` table.
2180 * The thread that performs this action will sleep a few seconds after the exception.
2181 * See `queue.processEntry` function.
2182 */
2183 throw;
2184 }
2185 catch (...)
2186 {
2187 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2188 throw;
2189 }
2190 });
2191
2192 /// We will go to sleep if the processing fails and if we have already processed this record recently.
2193 bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10);
2194
2195 /// If there was no exception, you do not need to sleep.
2196 return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS;
2197}
2198
2199
2200bool StorageReplicatedMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
2201{
2202 return queue.isVirtualPart(part);
2203}
2204
2205BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movePartsTask()
2206{
2207 try
2208 {
2209 if (!selectPartsAndMove())
2210 return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
2211
2212 return BackgroundProcessingPoolTaskResult::SUCCESS;
2213 }
2214 catch (...)
2215 {
2216 tryLogCurrentException(log);
2217 return BackgroundProcessingPoolTaskResult::ERROR;
2218 }
2219}
2220
2221
2222void StorageReplicatedMergeTree::mergeSelectingTask()
2223{
2224 if (!is_leader)
2225 return;
2226
2227 const auto storage_settings_ptr = getSettings();
2228 const bool deduplicate = false; /// TODO: read deduplicate option from table config
2229 const bool force_ttl = false;
2230
2231 bool success = false;
2232
2233 try
2234 {
2235 /// We must select parts for merge under merge_selecting_mutex because other threads
2236 /// (OPTIMIZE queries) can assign new merges.
2237 std::lock_guard merge_selecting_lock(merge_selecting_mutex);
2238
2239 auto zookeeper = getZooKeeper();
2240
2241 ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
2242
2243 /// If many merges is already queued, then will queue only small enough merges.
2244 /// Otherwise merge queue could be filled with only large merges,
2245 /// and in the same time, many small parts could be created and won't be merged.
2246
2247 auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
2248 size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second;
2249 if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
2250 {
2251 LOG_TRACE(log, "Number of queued merges (" << merges_and_mutations_queued.first << ") and part mutations ("
2252 << merges_and_mutations_queued.second << ") is greater than max_replicated_merges_in_queue ("
2253 << storage_settings_ptr->max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
2254 }
2255 else
2256 {
2257 UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
2258 storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum);
2259 UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
2260
2261 FutureMergedMutatedPart future_merged_part;
2262 if (max_source_parts_size_for_merge > 0 &&
2263 merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred))
2264 {
2265 success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
2266 future_merged_part.name, deduplicate, force_ttl);
2267 }
2268 /// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts
2269 else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
2270 && merges_and_mutations_queued.second < storage_settings_ptr->max_replicated_mutations_in_queue)
2271 {
2272 /// Choose a part to mutate.
2273 DataPartsVector data_parts = getDataPartsVector();
2274 for (const auto & part : data_parts)
2275 {
2276 if (part->bytes_on_disk > max_source_part_size_for_mutation)
2277 continue;
2278
2279 std::optional<Int64> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
2280 if (!desired_mutation_version)
2281 continue;
2282
2283 if (createLogEntryToMutatePart(*part, *desired_mutation_version))
2284 {
2285 success = true;
2286 break;
2287 }
2288 }
2289 }
2290 }
2291 }
2292 catch (...)
2293 {
2294 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2295 }
2296
2297 if (!is_leader)
2298 return;
2299
2300 if (!success)
2301 merge_selecting_task->scheduleAfter(MERGE_SELECTING_SLEEP_MS);
2302 else
2303 merge_selecting_task->schedule();
2304
2305}
2306
2307
2308void StorageReplicatedMergeTree::mutationsFinalizingTask()
2309{
2310 bool needs_reschedule = false;
2311
2312 try
2313 {
2314 needs_reschedule = queue.tryFinalizeMutations(getZooKeeper());
2315 }
2316 catch (...)
2317 {
2318 tryLogCurrentException(log, __PRETTY_FUNCTION__);
2319 needs_reschedule = true;
2320 }
2321
2322 if (needs_reschedule)
2323 mutations_finalizing_task->scheduleAfter(MUTATIONS_FINALIZING_SLEEP_MS);
2324}
2325
2326
2327bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
2328 zkutil::ZooKeeperPtr & zookeeper,
2329 const DataPartsVector & parts,
2330 const String & merged_name,
2331 bool deduplicate,
2332 bool force_ttl,
2333 ReplicatedMergeTreeLogEntryData * out_log_entry)
2334{
2335 std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
2336 exists_futures.reserve(parts.size());
2337 for (const auto & part : parts)
2338 exists_futures.emplace_back(zookeeper->asyncExists(replica_path + "/parts/" + part->name));
2339
2340 bool all_in_zk = true;
2341 for (size_t i = 0; i < parts.size(); ++i)
2342 {
2343 /// If there is no information about part in ZK, we will not merge it.
2344 if (exists_futures[i].get().error == Coordination::ZNONODE)
2345 {
2346 all_in_zk = false;
2347
2348 const auto & part = parts[i];
2349 if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
2350 {
2351 LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)"
2352 << " with age " << (time(nullptr) - part->modification_time)
2353 << " seconds exists locally but not in ZooKeeper."
2354 << " Won't do merge with that part and will check it.");
2355 enqueuePartForCheck(part->name);
2356 }
2357 }
2358 }
2359
2360 if (!all_in_zk)
2361 return false;
2362
2363 ReplicatedMergeTreeLogEntryData entry;
2364 entry.type = LogEntry::MERGE_PARTS;
2365 entry.source_replica = replica_name;
2366 entry.new_part_name = merged_name;
2367 entry.deduplicate = deduplicate;
2368 entry.force_ttl = force_ttl;
2369 entry.create_time = time(nullptr);
2370
2371 for (const auto & part : parts)
2372 entry.source_parts.push_back(part->name);
2373
2374 String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
2375 entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
2376
2377 if (out_log_entry)
2378 *out_log_entry = entry;
2379
2380 return true;
2381}
2382
2383
2384bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version)
2385{
2386 auto zookeeper = getZooKeeper();
2387
2388 /// If there is no information about part in ZK, we will not mutate it.
2389 if (!zookeeper->exists(replica_path + "/parts/" + part.name))
2390 {
2391 if (part.modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
2392 {
2393 LOG_WARNING(log, "Part " << part.name << " (that was selected for mutation)"
2394 << " with age " << (time(nullptr) - part.modification_time)
2395 << " seconds exists locally but not in ZooKeeper."
2396 << " Won't mutate that part and will check it.");
2397 enqueuePartForCheck(part.name);
2398 }
2399
2400 return false;
2401 }
2402
2403 MergeTreePartInfo new_part_info = part.info;
2404 new_part_info.mutation = mutation_version;
2405
2406 String new_part_name = part.getNewName(new_part_info);
2407
2408 ReplicatedMergeTreeLogEntryData entry;
2409 entry.type = LogEntry::MUTATE_PART;
2410 entry.source_replica = replica_name;
2411 entry.source_parts.push_back(part.name);
2412 entry.new_part_name = new_part_name;
2413 entry.create_time = time(nullptr);
2414
2415 zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
2416 return true;
2417}
2418
2419
2420void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children)
2421{
2422 String part_path = replica_path + "/parts/" + part_name;
2423
2424 if (has_children)
2425 {
2426 ops.emplace_back(zkutil::makeRemoveRequest(part_path + "/checksums", -1));
2427 ops.emplace_back(zkutil::makeRemoveRequest(part_path + "/columns", -1));
2428 }
2429 ops.emplace_back(zkutil::makeRemoveRequest(part_path, -1));
2430}
2431
2432
2433void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
2434{
2435 auto zookeeper = getZooKeeper();
2436
2437 String part_path = replica_path + "/parts/" + part_name;
2438
2439 Coordination::Requests ops;
2440
2441 time_t part_create_time = 0;
2442 Coordination::Stat stat;
2443 if (zookeeper->exists(part_path, &stat))
2444 {
2445 part_create_time = stat.ctime / 1000;
2446 removePartFromZooKeeper(part_name, ops, stat.numChildren > 0);
2447 }
2448
2449 LogEntryPtr log_entry = std::make_shared<LogEntry>();
2450 log_entry->type = LogEntry::GET_PART;
2451 log_entry->create_time = part_create_time;
2452 log_entry->source_replica = "";
2453 log_entry->new_part_name = part_name;
2454
2455 ops.emplace_back(zkutil::makeCreateRequest(
2456 replica_path + "/queue/queue-", log_entry->toString(),
2457 zkutil::CreateMode::PersistentSequential));
2458
2459 auto results = zookeeper->multi(ops);
2460
2461 String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
2462 log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
2463 queue.insert(zookeeper, log_entry);
2464}
2465
2466
2467void StorageReplicatedMergeTree::enterLeaderElection()
2468{
2469 auto callback = [this]()
2470 {
2471 CurrentMetrics::add(CurrentMetrics::LeaderReplica);
2472 LOG_INFO(log, "Became leader");
2473
2474 is_leader = true;
2475 merge_selecting_task->activateAndSchedule();
2476 };
2477
2478 try
2479 {
2480 leader_election = std::make_shared<zkutil::LeaderElection>(
2481 global_context.getSchedulePool(),
2482 zookeeper_path + "/leader_election",
2483 *current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
2484 /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
2485 callback,
2486 replica_name);
2487 }
2488 catch (...)
2489 {
2490 leader_election = nullptr;
2491 throw;
2492 }
2493}
2494
2495void StorageReplicatedMergeTree::exitLeaderElection()
2496{
2497 if (!leader_election)
2498 return;
2499
2500 /// Shut down the leader election thread to avoid suddenly becoming the leader again after
2501 /// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object.
2502 leader_election->shutdown();
2503
2504 if (is_leader)
2505 {
2506 CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
2507 LOG_INFO(log, "Stopped being leader");
2508
2509 is_leader = false;
2510 merge_selecting_task->deactivate();
2511 }
2512
2513 /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
2514 /// replica assigns merges at any given time.
2515 leader_election = nullptr;
2516}
2517
2518
2519String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
2520{
2521 auto zookeeper = getZooKeeper();
2522 Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
2523
2524 /// Select replicas in uniformly random order.
2525 std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
2526
2527 for (const String & replica : replicas)
2528 {
2529 /// We don't interested in ourself.
2530 if (replica == replica_name)
2531 continue;
2532
2533 if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
2534 (!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
2535 return replica;
2536
2537 /// Obviously, replica could become inactive or even vanish after return from this method.
2538 }
2539
2540 return {};
2541}
2542
2543
2544String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entry, bool active)
2545{
2546 auto zookeeper = getZooKeeper();
2547 Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
2548
2549 /// Select replicas in uniformly random order.
2550 std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
2551
2552 for (const String & replica : replicas)
2553 {
2554 if (replica == replica_name)
2555 continue;
2556
2557 if (active && !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
2558 continue;
2559
2560 String largest_part_found;
2561 Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
2562 for (const String & part_on_replica : parts)
2563 {
2564 if (part_on_replica == entry.new_part_name
2565 || MergeTreePartInfo::contains(part_on_replica, entry.new_part_name, format_version))
2566 {
2567 if (largest_part_found.empty()
2568 || MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version))
2569 {
2570 largest_part_found = part_on_replica;
2571 }
2572 }
2573 }
2574
2575 if (!largest_part_found.empty())
2576 {
2577 bool the_same_part = largest_part_found == entry.new_part_name;
2578
2579 /// Make a check in case if selected part differs from source part
2580 if (!the_same_part)
2581 {
2582 String reject_reason;
2583 if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, entry, reject_reason))
2584 {
2585 LOG_INFO(log, "Will not fetch part " << largest_part_found << " covering " << entry.new_part_name << ". " << reject_reason);
2586 return {};
2587 }
2588 }
2589 else
2590 {
2591 entry.actual_new_part_name = entry.new_part_name;
2592 }
2593
2594 return replica;
2595 }
2596 }
2597
2598 return {};
2599}
2600
2601
2602String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
2603 const String & part_name, bool active, String & found_part_name)
2604{
2605 auto zookeeper = getZooKeeper();
2606 Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
2607
2608 /// Select replicas in uniformly random order.
2609 std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
2610
2611 String largest_part_found;
2612 String largest_replica_found;
2613
2614 for (const String & replica : replicas)
2615 {
2616 if (replica == replica_name)
2617 continue;
2618
2619 if (active && !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
2620 continue;
2621
2622 Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
2623 for (const String & part_on_replica : parts)
2624 {
2625 if (part_on_replica == part_name
2626 || MergeTreePartInfo::contains(part_on_replica, part_name, format_version))
2627 {
2628 if (largest_part_found.empty()
2629 || MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version))
2630 {
2631 largest_part_found = part_on_replica;
2632 largest_replica_found = replica;
2633 }
2634 }
2635 }
2636 }
2637
2638 found_part_name = largest_part_found;
2639 return largest_replica_found;
2640}
2641
2642
2643
2644/** If a quorum is tracked for a part, update information about it in ZK.
2645 */
2646void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
2647{
2648 auto zookeeper = getZooKeeper();
2649
2650 /// Information on which replicas a part has been added, if the quorum has not yet been reached.
2651 const String quorum_status_path = zookeeper_path + "/quorum/status";
2652 /// The name of the previous part for which the quorum was reached.
2653 const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";
2654
2655 String value;
2656 Coordination::Stat stat;
2657
2658 /// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed.
2659 while (zookeeper->tryGet(quorum_status_path, value, &stat))
2660 {
2661 ReplicatedMergeTreeQuorumEntry quorum_entry;
2662 quorum_entry.fromString(value);
2663
2664 if (quorum_entry.part_name != part_name)
2665 {
2666 /// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started.
2667 break;
2668 }
2669
2670 quorum_entry.replicas.insert(replica_name);
2671
2672 if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
2673 {
2674 /// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
2675
2676 Coordination::Requests ops;
2677 Coordination::Responses responses;
2678
2679 Coordination::Stat added_parts_stat;
2680 String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat);
2681
2682 ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(format_version);
2683
2684 if (!old_added_parts.empty())
2685 parts_with_quorum.fromString(old_added_parts);
2686
2687 auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
2688 parts_with_quorum.added_parts[part_info.partition_id] = part_name;
2689
2690 String new_added_parts = parts_with_quorum.toString();
2691
2692 ops.emplace_back(zkutil::makeRemoveRequest(quorum_status_path, stat.version));
2693 ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, new_added_parts, added_parts_stat.version));
2694 auto code = zookeeper->tryMulti(ops, responses);
2695
2696 if (code == Coordination::ZOK)
2697 {
2698 break;
2699 }
2700 else if (code == Coordination::ZNONODE)
2701 {
2702 /// The quorum has already been achieved.
2703 break;
2704 }
2705 else if (code == Coordination::ZBADVERSION)
2706 {
2707 /// Node was updated meanwhile. We must re-read it and repeat all the actions.
2708 continue;
2709 }
2710 else
2711 throw Coordination::Exception(code, quorum_status_path);
2712 }
2713 else
2714 {
2715 /// We update the node, registering there one more replica.
2716 auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
2717
2718 if (code == Coordination::ZOK)
2719 {
2720 break;
2721 }
2722 else if (code == Coordination::ZNONODE)
2723 {
2724 /// The quorum has already been achieved.
2725 break;
2726 }
2727 else if (code == Coordination::ZBADVERSION)
2728 {
2729 /// Node was updated meanwhile. We must re-read it and repeat all the actions.
2730 continue;
2731 }
2732 else
2733 throw Coordination::Exception(code, quorum_status_path);
2734 }
2735 }
2736}
2737
2738
2739bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
2740{
2741 const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
2742
2743 if (auto part = getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
2744 {
2745 LOG_DEBUG(log, "Part " << part->name << " should be deleted after previous attempt before fetch");
2746 /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
2747 cleanup_thread.wakeup();
2748 return false;
2749 }
2750
2751 {
2752 std::lock_guard lock(currently_fetching_parts_mutex);
2753 if (!currently_fetching_parts.insert(part_name).second)
2754 {
2755 LOG_DEBUG(log, "Part " << part_name << " is already fetching right now");
2756 return false;
2757 }
2758 }
2759
2760 SCOPE_EXIT
2761 ({
2762 std::lock_guard lock(currently_fetching_parts_mutex);
2763 currently_fetching_parts.erase(part_name);
2764 });
2765
2766 LOG_DEBUG(log, "Fetching part " << part_name << " from " << source_replica_path);
2767
2768 TableStructureReadLockHolder table_lock_holder;
2769 if (!to_detached)
2770 table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
2771
2772 /// Logging
2773 Stopwatch stopwatch;
2774 MutableDataPartPtr part;
2775 DataPartsVector replaced_parts;
2776
2777 auto write_part_log = [&] (const ExecutionStatus & execution_status)
2778 {
2779 writePartLog(
2780 PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
2781 part_name, part, replaced_parts, nullptr);
2782 };
2783
2784 DataPartPtr part_to_clone;
2785 {
2786 /// If the desired part is a result of a part mutation, try to find the source part and compare
2787 /// its checksums to the checksums of the desired part. If they match, we can just clone the local part.
2788
2789 /// If we have the source part, its part_info will contain covered_part_info.
2790 auto covered_part_info = part_info;
2791 covered_part_info.mutation = 0;
2792 auto source_part = getActiveContainingPart(covered_part_info);
2793
2794 if (source_part)
2795 {
2796 MinimalisticDataPartChecksums source_part_checksums;
2797 source_part_checksums.computeTotalChecksums(source_part->checksums);
2798
2799 MinimalisticDataPartChecksums desired_checksums;
2800 auto zookeeper = getZooKeeper();
2801 String part_path = source_replica_path + "/parts/" + part_name;
2802 String part_znode = zookeeper->get(part_path);
2803 if (!part_znode.empty())
2804 desired_checksums = ReplicatedMergeTreePartHeader::fromString(part_znode).getChecksums();
2805 else
2806 {
2807 String desired_checksums_str = zookeeper->get(part_path + "/checksums");
2808 desired_checksums = MinimalisticDataPartChecksums::deserializeFrom(desired_checksums_str);
2809 }
2810
2811 if (source_part_checksums == desired_checksums)
2812 {
2813 LOG_TRACE(log, "Found local part " << source_part->name << " with the same checksums as " << part_name);
2814 part_to_clone = source_part;
2815 }
2816 }
2817
2818 }
2819
2820 std::function<MutableDataPartPtr()> get_part;
2821 if (part_to_clone)
2822 {
2823 get_part = [&, part_to_clone]()
2824 {
2825 return cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info);
2826 };
2827 }
2828 else
2829 {
2830 ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
2831 auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
2832 auto user_password = global_context.getInterserverCredentials();
2833 String interserver_scheme = global_context.getInterserverScheme();
2834
2835 get_part = [&, address, timeouts, user_password, interserver_scheme]()
2836 {
2837 if (interserver_scheme != address.scheme)
2838 throw Exception("Interserver schemes are different: '" + interserver_scheme
2839 + "' != '" + address.scheme + "', can't fetch part from " + address.host,
2840 ErrorCodes::LOGICAL_ERROR);
2841
2842 return fetcher.fetchPart(
2843 part_name, source_replica_path,
2844 address.host, address.replication_port,
2845 timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
2846 };
2847 }
2848
2849 try
2850 {
2851 part = get_part();
2852
2853 if (!to_detached)
2854 {
2855 Transaction transaction(*this);
2856 renameTempPartAndReplace(part, nullptr, &transaction);
2857
2858 /** NOTE
2859 * Here, an error occurs if ALTER occurred with a change in the column type or column deletion,
2860 * and the part on remote server has not yet been modified.
2861 * After a while, one of the following attempts to make `fetchPart` succeed.
2862 */
2863 replaced_parts = checkPartChecksumsAndCommit(transaction, part);
2864
2865 /** If a quorum is tracked for this part, you must update it.
2866 * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
2867 */
2868 if (quorum)
2869 updateQuorum(part_name);
2870
2871 merge_selecting_task->schedule();
2872
2873 for (const auto & replaced_part : replaced_parts)
2874 {
2875 LOG_DEBUG(log, "Part " << replaced_part->name << " is rendered obsolete by fetching part " << part_name);
2876 ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
2877 }
2878
2879 write_part_log({});
2880 }
2881 else
2882 {
2883 part->renameTo("detached/" + part_name);
2884 }
2885 }
2886 catch (...)
2887 {
2888 if (!to_detached)
2889 write_part_log(ExecutionStatus::fromCurrentException());
2890
2891 throw;
2892 }
2893
2894 ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
2895
2896 if (part_to_clone)
2897 LOG_DEBUG(log, "Cloned part " << part_name << " from " << part_to_clone->name << (to_detached ? " (to 'detached' directory)" : ""));
2898 else
2899 LOG_DEBUG(log, "Fetched part " << part_name << " from " << source_replica_path << (to_detached ? " (to 'detached' directory)" : ""));
2900
2901 return true;
2902}
2903
2904
2905void StorageReplicatedMergeTree::startup()
2906{
2907 if (is_readonly)
2908 return;
2909
2910 if (set_table_structure_at_startup)
2911 set_table_structure_at_startup();
2912
2913 queue.initialize(
2914 zookeeper_path, replica_path,
2915 database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
2916 getDataParts());
2917
2918 StoragePtr ptr = shared_from_this();
2919 InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this, ptr);
2920 data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(
2921 data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
2922
2923 queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
2924 if (areBackgroundMovesNeeded())
2925 move_parts_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
2926
2927 /// In this thread replica will be activated.
2928 restarting_thread.start();
2929
2930 /// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it
2931 startup_event.wait();
2932}
2933
2934
2935void StorageReplicatedMergeTree::shutdown()
2936{
2937 /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
2938 fetcher.blocker.cancelForever();
2939 merger_mutator.merges_blocker.cancelForever();
2940 parts_mover.moves_blocker.cancelForever();
2941
2942 restarting_thread.shutdown();
2943
2944 if (queue_task_handle)
2945 global_context.getBackgroundPool().removeTask(queue_task_handle);
2946 queue_task_handle.reset();
2947
2948 if (move_parts_task_handle)
2949 global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
2950 move_parts_task_handle.reset();
2951
2952 if (data_parts_exchange_endpoint_holder)
2953 {
2954 data_parts_exchange_endpoint_holder->getBlocker().cancelForever();
2955 data_parts_exchange_endpoint_holder = nullptr;
2956 }
2957}
2958
2959
2960StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
2961{
2962 try
2963 {
2964 shutdown();
2965 }
2966 catch (...)
2967 {
2968 tryLogCurrentException(__PRETTY_FUNCTION__);
2969 }
2970}
2971
2972
2973ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMergeTree::getMaxAddedBlocks() const
2974{
2975 ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
2976
2977 for (const auto & data_part : getDataParts())
2978 {
2979 max_added_blocks[data_part->info.partition_id]
2980 = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
2981 }
2982
2983 auto zookeeper = getZooKeeper();
2984
2985 const String quorum_status_path = zookeeper_path + "/quorum/status";
2986
2987 String value;
2988 Coordination::Stat stat;
2989
2990 if (zookeeper->tryGet(quorum_status_path, value, &stat))
2991 {
2992 ReplicatedMergeTreeQuorumEntry quorum_entry;
2993 quorum_entry.fromString(value);
2994
2995 auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, format_version);
2996
2997 max_added_blocks[part_info.partition_id] = part_info.max_block - 1;
2998 }
2999
3000 String added_parts_str;
3001 if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
3002 {
3003 if (!added_parts_str.empty())
3004 {
3005 ReplicatedMergeTreeQuorumAddedParts part_with_quorum(format_version);
3006 part_with_quorum.fromString(added_parts_str);
3007
3008 auto added_parts = part_with_quorum.added_parts;
3009
3010 for (const auto & added_part : added_parts)
3011 if (!getActiveContainingPart(added_part.second))
3012 throw Exception(
3013 "Replica doesn't have part " + added_part.second
3014 + " which was successfully written to quorum of other replicas."
3015 " Send query to another replica or disable 'select_sequential_consistency' setting.",
3016 ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
3017
3018 for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
3019 max_added_blocks[max_block.first] = max_block.second;
3020 }
3021 }
3022 return max_added_blocks;
3023}
3024
3025Pipes StorageReplicatedMergeTree::readWithProcessors(
3026 const Names & column_names,
3027 const SelectQueryInfo & query_info,
3028 const Context & context,
3029 QueryProcessingStage::Enum /*processed_stage*/,
3030 const size_t max_block_size,
3031 const unsigned num_streams)
3032{
3033 const Settings & settings_ = context.getSettingsRef();
3034
3035 /** The `select_sequential_consistency` setting has two meanings:
3036 * 1. To throw an exception if on a replica there are not all parts which have been written down on quorum of remaining replicas.
3037 * 2. Do not read parts that have not yet been written to the quorum of the replicas.
3038 * For this you have to synchronously go to ZooKeeper.
3039 */
3040 if (settings_.select_sequential_consistency)
3041 {
3042 auto max_added_blocks = getMaxAddedBlocks();
3043 return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
3044 }
3045
3046 return reader.read(column_names, query_info, context, max_block_size, num_streams);
3047}
3048
3049
3050std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
3051{
3052 size_t res = 0;
3053 auto max_added_blocks = getMaxAddedBlocks();
3054 auto lock = lockParts();
3055 for (auto & part : getDataPartsStateRange(DataPartState::Committed))
3056 {
3057 if (part->isEmpty())
3058 continue;
3059
3060 auto blocks_iterator = max_added_blocks.find(part->info.partition_id);
3061 if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second)
3062 continue;
3063
3064 res += part->rows_count;
3065 }
3066 return res;
3067}
3068
3069
3070void StorageReplicatedMergeTree::assertNotReadonly() const
3071{
3072 if (is_readonly)
3073 throw Exception("Table is in readonly mode", ErrorCodes::TABLE_IS_READ_ONLY);
3074}
3075
3076
3077BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const Context & context)
3078{
3079 const auto storage_settings_ptr = getSettings();
3080 assertNotReadonly();
3081
3082 const Settings & query_settings = context.getSettingsRef();
3083 bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
3084
3085 return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
3086 query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate);
3087}
3088
3089
3090bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context)
3091{
3092 assertNotReadonly();
3093
3094 if (!is_leader)
3095 {
3096 sendRequestToLeaderReplica(query, query_context);
3097 return true;
3098 }
3099
3100 std::vector<ReplicatedMergeTreeLogEntryData> merge_entries;
3101 {
3102 /// We must select parts for merge under merge_selecting_mutex because other threads
3103 /// (merge_selecting_thread or OPTIMIZE queries) could assign new merges.
3104 std::lock_guard merge_selecting_lock(merge_selecting_mutex);
3105
3106 auto zookeeper = getZooKeeper();
3107 ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper);
3108
3109 auto handle_noop = [&] (const String & message)
3110 {
3111 if (query_context.getSettingsRef().optimize_throw_if_noop)
3112 throw Exception(message, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
3113 return false;
3114 };
3115
3116 bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
3117 const auto storage_settings_ptr = getSettings();
3118
3119 if (!partition && final)
3120 {
3121 DataPartsVector data_parts = getDataPartsVector();
3122 std::unordered_set<String> partition_ids;
3123
3124 for (const DataPartPtr & part : data_parts)
3125 partition_ids.emplace(part->info.partition_id);
3126
3127 UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
3128
3129 for (const String & partition_id : partition_ids)
3130 {
3131 FutureMergedMutatedPart future_merged_part;
3132 bool selected = merger_mutator.selectAllPartsToMergeWithinPartition(
3133 future_merged_part, disk_space, can_merge, partition_id, true, nullptr);
3134 ReplicatedMergeTreeLogEntryData merge_entry;
3135 if (selected && !createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
3136 future_merged_part.name, deduplicate, force_ttl, &merge_entry))
3137 return handle_noop("Can't create merge queue node in ZooKeeper");
3138 if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY)
3139 merge_entries.push_back(std::move(merge_entry));
3140 }
3141 }
3142 else
3143 {
3144 FutureMergedMutatedPart future_merged_part;
3145 String disable_reason;
3146 bool selected = false;
3147 if (!partition)
3148 {
3149 selected = merger_mutator.selectPartsToMerge(
3150 future_merged_part, true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
3151 }
3152 else
3153 {
3154
3155 UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
3156 String partition_id = getPartitionIDFromQuery(partition, query_context);
3157 selected = merger_mutator.selectAllPartsToMergeWithinPartition(
3158 future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);
3159 }
3160
3161 if (!selected)
3162 {
3163 std::stringstream message;
3164 message << "Cannot select parts for optimization";
3165 if (!disable_reason.empty())
3166 message << ": " << disable_reason;
3167 LOG_INFO(log, message.rdbuf());
3168 return handle_noop(message.str());
3169 }
3170
3171 ReplicatedMergeTreeLogEntryData merge_entry;
3172 if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
3173 future_merged_part.name, deduplicate, force_ttl, &merge_entry))
3174 return handle_noop("Can't create merge queue node in ZooKeeper");
3175 if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY)
3176 merge_entries.push_back(std::move(merge_entry));
3177 }
3178 }
3179
3180 if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
3181 {
3182 /// NOTE Table lock must not be held while waiting. Some combination of R-W-R locks from different threads will yield to deadlock.
3183 for (auto & merge_entry : merge_entries)
3184 waitForAllReplicasToProcessLogEntry(merge_entry);
3185 }
3186
3187 return true;
3188}
3189
3190
3191void StorageReplicatedMergeTree::alter(
3192 const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder)
3193{
3194 assertNotReadonly();
3195
3196 LOG_DEBUG(log, "Doing ALTER");
3197
3198 const String current_database_name = getDatabaseName();
3199 const String current_table_name = getTableName();
3200
3201 /// We cannot check this alter commands with method isModifyingData()
3202 /// because ReplicatedMergeTree stores both columns and metadata for
3203 /// each replica. So we have to wait AlterThread even with lightweight
3204 /// metadata alter.
3205 if (params.isSettingsAlter())
3206 {
3207 /// We don't replicate storage_settings_ptr ALTER. It's local operation.
3208 /// Also we don't upgrade alter lock to table structure lock.
3209 LOG_DEBUG(log, "ALTER storage_settings_ptr only");
3210 StorageInMemoryMetadata metadata = getInMemoryMetadata();
3211 params.apply(metadata);
3212
3213 changeSettings(metadata.settings_ast, table_lock_holder);
3214
3215 global_context.getDatabase(current_database_name)->alterTable(query_context, current_table_name, metadata);
3216 return;
3217 }
3218
3219 /// Alter is done by modifying the metadata nodes in ZK that are shared between all replicas
3220 /// (/columns, /metadata). We set contents of the shared nodes to the new values and wait while
3221 /// replicas asynchronously apply changes (see ReplicatedMergeTreeAlterThread.cpp) and modify
3222 /// their respective replica metadata nodes (/replicas/<replica>/columns, /replicas/<replica>/metadata).
3223
3224 struct ChangedNode
3225 {
3226 ChangedNode(const String & table_path_, String name_, String new_value_)
3227 : table_path(table_path_), name(std::move(name_)), shared_path(table_path + "/" + name)
3228 , new_value(std::move(new_value_))
3229 {}
3230
3231 const String & table_path;
3232 String name;
3233
3234 String shared_path;
3235
3236 String getReplicaPath(const String & replica) const
3237 {
3238 return table_path + "/replicas/" + replica + "/" + name;
3239 }
3240
3241 String new_value;
3242 int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
3243 };
3244
3245 auto ast_to_str = [](ASTPtr query) -> String
3246 {
3247 if (!query)
3248 return "";
3249 return queryToString(query);
3250 };
3251
3252 /// /columns and /metadata nodes
3253 std::vector<ChangedNode> changed_nodes;
3254
3255 {
3256 /// Just to read current structure. Alter will be done in separate thread.
3257 auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId());
3258
3259 if (is_readonly)
3260 throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
3261
3262 StorageInMemoryMetadata metadata = getInMemoryMetadata();
3263 params.apply(metadata);
3264
3265 String new_columns_str = metadata.columns.toString();
3266 if (new_columns_str != getColumns().toString())
3267 changed_nodes.emplace_back(zookeeper_path, "columns", new_columns_str);
3268
3269 ReplicatedMergeTreeTableMetadata new_metadata(*this);
3270 if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast))
3271 new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(metadata.order_by_ast));
3272
3273 if (ast_to_str(metadata.ttl_for_table_ast) != ast_to_str(ttl_table_ast))
3274 new_metadata.ttl_table = serializeAST(*metadata.ttl_for_table_ast);
3275
3276 String new_indices_str = metadata.indices.toString();
3277 if (new_indices_str != getIndices().toString())
3278 new_metadata.skip_indices = new_indices_str;
3279
3280 String new_constraints_str = metadata.constraints.toString();
3281 if (new_constraints_str != getConstraints().toString())
3282 new_metadata.constraints = new_constraints_str;
3283
3284 String new_metadata_str = new_metadata.toString();
3285 if (new_metadata_str != ReplicatedMergeTreeTableMetadata(*this).toString())
3286 changed_nodes.emplace_back(zookeeper_path, "metadata", new_metadata_str);
3287
3288 /// Perform settings update locally
3289
3290 auto old_metadata = getInMemoryMetadata();
3291 old_metadata.settings_ast = metadata.settings_ast;
3292 changeSettings(metadata.settings_ast, table_lock_holder);
3293 global_context.getDatabase(current_database_name)->alterTable(query_context, current_table_name, old_metadata);
3294
3295 /// Modify shared metadata nodes in ZooKeeper.
3296 Coordination::Requests ops;
3297 for (const auto & node : changed_nodes)
3298 ops.emplace_back(zkutil::makeSetRequest(node.shared_path, node.new_value, -1));
3299
3300 Coordination::Responses results = getZooKeeper()->multi(ops);
3301
3302 for (size_t i = 0; i < changed_nodes.size(); ++i)
3303 changed_nodes[i].new_version = dynamic_cast<const Coordination::SetResponse &>(*results[i]).stat.version;
3304 }
3305
3306 LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
3307
3308 table_lock_holder.release();
3309
3310 /// Wait until all replicas will apply ALTER.
3311
3312 for (const auto & node : changed_nodes)
3313 {
3314 Coordination::Stat stat;
3315 /// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER.
3316 if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event))
3317 throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
3318
3319 if (stat.version != node.new_version)
3320 {
3321 LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " +
3322 "overlapping ALTER-s are fine but use caution with nontransitive changes");
3323 return;
3324 }
3325 }
3326
3327 Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
3328
3329 std::set<String> inactive_replicas;
3330 std::set<String> timed_out_replicas;
3331
3332 time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
3333
3334 /// This code is quite similar with waitMutationToFinishOnReplicas
3335 /// but contains more complicated details (versions manipulations, multiple nodes, etc.).
3336 /// It will be removed soon in favor of alter-modify implementation on top of mutations.
3337 /// TODO (alesap)
3338 for (const String & replica : replicas)
3339 {
3340 LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
3341
3342 while (!partial_shutdown_called)
3343 {
3344 auto zookeeper = getZooKeeper();
3345
3346 /// Replica could be inactive.
3347 if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
3348 {
3349 LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
3350 " ALTER will be done asynchronously when replica becomes active.");
3351
3352 inactive_replicas.emplace(replica);
3353 break;
3354 }
3355
3356 struct ReplicaNode
3357 {
3358 explicit ReplicaNode(String path_) : path(std::move(path_)) {}
3359
3360 String path;
3361 String value;
3362 int32_t version = -1;
3363 };
3364
3365 std::vector<ReplicaNode> replica_nodes;
3366 for (const auto & node : changed_nodes)
3367 replica_nodes.emplace_back(node.getReplicaPath(replica));
3368
3369 bool replica_was_removed = false;
3370 for (auto & node : replica_nodes)
3371 {
3372 Coordination::Stat stat;
3373
3374 /// Replica could has been removed.
3375 if (!zookeeper->tryGet(node.path, node.value, &stat))
3376 {
3377 LOG_WARNING(log, replica << " was removed");
3378 replica_was_removed = true;
3379 break;
3380 }
3381
3382 node.version = stat.version;
3383 }
3384
3385 if (replica_was_removed)
3386 break;
3387
3388 bool alter_was_applied = true;
3389 for (size_t i = 0; i < replica_nodes.size(); ++i)
3390 {
3391 if (replica_nodes[i].value != changed_nodes[i].new_value)
3392 {
3393 alter_was_applied = false;
3394 break;
3395 }
3396 }
3397
3398 /// The ALTER has been successfully applied.
3399 if (alter_was_applied)
3400 break;
3401
3402 for (const auto & node : changed_nodes)
3403 {
3404 Coordination::Stat stat;
3405 if (!zookeeper->exists(node.shared_path, &stat))
3406 throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
3407
3408 if (stat.version != node.new_version)
3409 {
3410 LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; "
3411 "overlapping ALTER-s are fine but use caution with nontransitive changes");
3412 return;
3413 }
3414 }
3415
3416 bool replica_nodes_changed_concurrently = false;
3417 for (const auto & replica_node : replica_nodes)
3418 {
3419 Coordination::Stat stat;
3420 if (!zookeeper->exists(replica_node.path, &stat, alter_query_event))
3421 {
3422 LOG_WARNING(log, replica << " was removed");
3423 replica_was_removed = true;
3424 break;
3425 }
3426
3427 if (stat.version != replica_node.version)
3428 {
3429 replica_nodes_changed_concurrently = true;
3430 break;
3431 }
3432 }
3433
3434 if (replica_was_removed)
3435 break;
3436
3437 if (replica_nodes_changed_concurrently)
3438 continue;
3439
3440 /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata
3441 /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata,
3442 /// which is common for all replicas. If changes happen with this nodes (delete, set and create)
3443 /// than event will be notified and wait will be interrupted.
3444 ///
3445 /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and
3446 /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer*
3447 /// concurrent alter from other replica. First of all it will update shared nodes and we will have no
3448 /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just
3449 /// return from *current* alter with success result.
3450 if (!replication_alter_columns_timeout)
3451 {
3452 alter_query_event->wait();
3453 /// Everything is fine.
3454 }
3455 else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000))
3456 {
3457 /// Everything is fine.
3458 }
3459 else
3460 {
3461 LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER."
3462 " ALTER will be done asynchronously.");
3463
3464 timed_out_replicas.emplace(replica);
3465 break;
3466 }
3467 }
3468
3469 if (partial_shutdown_called)
3470 throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
3471 ErrorCodes::UNFINISHED);
3472
3473 if (!inactive_replicas.empty() || !timed_out_replicas.empty())
3474 {
3475 std::stringstream exception_message;
3476 exception_message << "Alter is not finished because";
3477
3478 if (!inactive_replicas.empty())
3479 {
3480 exception_message << " some replicas are inactive right now";
3481
3482 for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it)
3483 exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it;
3484 }
3485
3486 if (!timed_out_replicas.empty() && !inactive_replicas.empty())
3487 exception_message << " and";
3488
3489 if (!timed_out_replicas.empty())
3490 {
3491 exception_message << " timeout when waiting for some replicas";
3492
3493 for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it)
3494 exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it;
3495
3496 exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")";
3497 }
3498
3499 exception_message << ". Alter will be done asynchronously.";
3500
3501 throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
3502 }
3503 }
3504
3505 LOG_DEBUG(log, "ALTER finished");
3506}
3507
3508void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
3509{
3510 for (const PartitionCommand & command : commands)
3511 {
3512 switch (command.type)
3513 {
3514 case PartitionCommand::DROP_PARTITION:
3515 checkPartitionCanBeDropped(command.partition);
3516 dropPartition(query, command.partition, command.detach, query_context);
3517 break;
3518
3519 case PartitionCommand::DROP_DETACHED_PARTITION:
3520 dropDetached(command.partition, command.part, query_context);
3521 break;
3522
3523 case PartitionCommand::ATTACH_PARTITION:
3524 attachPartition(command.partition, command.part, query_context);
3525 break;
3526 case PartitionCommand::MOVE_PARTITION:
3527 {
3528 switch (command.move_destination_type)
3529 {
3530 case PartitionCommand::MoveDestinationType::DISK:
3531 movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
3532 break;
3533 case PartitionCommand::MoveDestinationType::VOLUME:
3534 movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
3535 break;
3536 }
3537 }
3538 break;
3539
3540 case PartitionCommand::REPLACE_PARTITION:
3541 {
3542 checkPartitionCanBeDropped(command.partition);
3543 String from_database = command.from_database.empty() ? query_context.getCurrentDatabase() : command.from_database;
3544 auto from_storage = query_context.getTable(from_database, command.from_table);
3545 replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
3546 }
3547 break;
3548
3549 case PartitionCommand::FETCH_PARTITION:
3550 fetchPartition(command.partition, command.from_zookeeper_path, query_context);
3551 break;
3552
3553 case PartitionCommand::FREEZE_PARTITION:
3554 {
3555 auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
3556 freezePartition(command.partition, command.with_name, query_context, lock);
3557 }
3558 break;
3559
3560 case PartitionCommand::CLEAR_COLUMN:
3561 {
3562 LogEntry entry;
3563 entry.type = LogEntry::CLEAR_COLUMN;
3564 entry.column_name = command.column_name.safeGet<String>();
3565 clearColumnOrIndexInPartition(command.partition, std::move(entry), query_context);
3566 }
3567 break;
3568
3569 case PartitionCommand::CLEAR_INDEX:
3570 {
3571 LogEntry entry;
3572 entry.type = LogEntry::CLEAR_INDEX;
3573 entry.index_name = command.index_name.safeGet<String>();
3574 clearColumnOrIndexInPartition(command.partition, std::move(entry), query_context);
3575 }
3576 break;
3577
3578 case PartitionCommand::FREEZE_ALL_PARTITIONS:
3579 {
3580 auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
3581 freezeAll(command.with_name, query_context, lock);
3582 }
3583 break;
3584 }
3585 }
3586}
3587
3588
3589/// If new version returns ordinary name, else returns part name containing the first and last month of the month
3590static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
3591{
3592 if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
3593 {
3594 /// The date range is all month long.
3595 const auto & lut = DateLUT::instance();
3596 time_t start_time = lut.YYYYMMDDToDate(parse<UInt32>(part_info.partition_id + "01"));
3597 DayNum left_date = lut.toDayNum(start_time);
3598 DayNum right_date = DayNum(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);
3599 return part_info.getPartNameV0(left_date, right_date);
3600 }
3601
3602 return part_info.getPartName();
3603}
3604
3605bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info)
3606{
3607 /// Even if there is no data in the partition, you still need to mark the range for deletion.
3608 /// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
3609 Int64 left = 0;
3610
3611 /** Let's skip one number in `block_numbers` for the partition being deleted, and we will only delete parts until this number.
3612 * This prohibits merges of deleted parts with the new inserted
3613 * Invariant: merges of deleted parts with other parts do not appear in the log.
3614 * NOTE: If you need to similarly support a `DROP PART` request, you will have to think of some new mechanism for it,
3615 * to guarantee this invariant.
3616 */
3617 Int64 right;
3618 Int64 mutation_version;
3619
3620 {
3621 auto zookeeper = getZooKeeper();
3622 auto block_number_lock = allocateBlockNumber(partition_id, zookeeper);
3623 right = block_number_lock->getNumber();
3624 block_number_lock->unlock();
3625 mutation_version = queue.getCurrentMutationVersion(partition_id, right);
3626 }
3627
3628 /// Empty partition.
3629 if (right == 0)
3630 return false;
3631
3632 --right;
3633
3634 /// Artificial high level is chosen, to make this part "covering" all parts inside.
3635 part_info = MergeTreePartInfo(partition_id, left, right, MergeTreePartInfo::MAX_LEVEL, mutation_version);
3636 return true;
3637}
3638
3639
3640void StorageReplicatedMergeTree::clearColumnOrIndexInPartition(
3641 const ASTPtr & partition, LogEntry && entry, const Context & query_context)
3642{
3643 assertNotReadonly();
3644
3645 /// We don't block merges, so anyone can manage this task (not only leader)
3646
3647 String partition_id = getPartitionIDFromQuery(partition, query_context);
3648 MergeTreePartInfo drop_range_info;
3649
3650 if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))
3651 {
3652 LOG_INFO(log, "Will not clear partition " << partition_id << ", it is empty.");
3653 return;
3654 }
3655
3656 /// We allocated new block number for this part, so new merges can't merge clearing parts with new ones
3657 entry.new_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
3658 entry.create_time = time(nullptr);
3659
3660 String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
3661 entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
3662
3663 /// If necessary, wait until the operation is performed on itself or on all replicas.
3664 if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
3665 {
3666 if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
3667 waitForReplicaToProcessLogEntry(replica_name, entry);
3668 else
3669 waitForAllReplicasToProcessLogEntry(entry);
3670 }
3671}
3672
3673
3674void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context)
3675{
3676 assertNotReadonly();
3677
3678 zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
3679
3680 if (!is_leader)
3681 {
3682 // TODO: we can manually reconstruct the query from outside the |dropPartition()| and remove the |query| argument from interface.
3683 // It's the only place where we need this argument.
3684 sendRequestToLeaderReplica(query, query_context);
3685 return;
3686 }
3687
3688 String partition_id = getPartitionIDFromQuery(partition, query_context);
3689
3690 LogEntry entry;
3691 if (dropPartsInPartition(*zookeeper, partition_id, entry, detach))
3692 {
3693 /// If necessary, wait until the operation is performed on itself or on all replicas.
3694 if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
3695 {
3696 if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
3697 waitForReplicaToProcessLogEntry(replica_name, entry);
3698 else
3699 waitForAllReplicasToProcessLogEntry(entry);
3700 }
3701 }
3702}
3703
3704
3705void StorageReplicatedMergeTree::truncate(const ASTPtr & query, const Context & query_context, TableStructureWriteLockHolder & table_lock)
3706{
3707 table_lock.release(); /// Truncate is done asynchronously.
3708
3709 assertNotReadonly();
3710
3711 zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
3712
3713 if (!is_leader)
3714 {
3715 sendRequestToLeaderReplica(query, query_context);
3716 return;
3717 }
3718
3719 Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
3720
3721 for (String & partition_id : partitions)
3722 {
3723 LogEntry entry;
3724
3725 if (dropPartsInPartition(*zookeeper, partition_id, entry, false))
3726 waitForAllReplicasToProcessLogEntry(entry);
3727 }
3728}
3729
3730
3731void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool attach_part, const Context & query_context)
3732{
3733 // TODO: should get some locks to prevent race with 'alter … modify column'
3734
3735 assertNotReadonly();
3736
3737 PartsTemporaryRename renamed_parts(*this, "detached/");
3738 MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
3739
3740 ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.
3741 for (size_t i = 0; i < loaded_parts.size(); ++i)
3742 {
3743 String old_name = loaded_parts[i]->name;
3744 output.writeExistingPart(loaded_parts[i]);
3745 renamed_parts.old_and_new_names[i].first.clear();
3746 LOG_DEBUG(log, "Attached part " << old_name << " as " << loaded_parts[i]->name);
3747 }
3748}
3749
3750
3751void StorageReplicatedMergeTree::checkTableCanBeDropped() const
3752{
3753 /// Consider only synchronized data
3754 const_cast<StorageReplicatedMergeTree &>(*this).recalculateColumnSizes();
3755 global_context.checkTableCanBeDropped(database_name, table_name, getTotalActiveSizeInBytes());
3756}
3757
3758
3759void StorageReplicatedMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
3760{
3761 const_cast<StorageReplicatedMergeTree &>(*this).recalculateColumnSizes();
3762
3763 const String partition_id = getPartitionIDFromQuery(partition, global_context);
3764 auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
3765
3766 UInt64 partition_size = 0;
3767
3768 for (const auto & part : parts_to_remove)
3769 partition_size += part->bytes_on_disk;
3770
3771 global_context.checkPartitionCanBeDropped(database_name, table_name, partition_size);
3772}
3773
3774
3775void StorageReplicatedMergeTree::drop(TableStructureWriteLockHolder &)
3776{
3777 {
3778 auto zookeeper = tryGetZooKeeper();
3779
3780 if (is_readonly || !zookeeper)
3781 throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
3782
3783 shutdown();
3784
3785 if (zookeeper->expired())
3786 throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
3787
3788 LOG_INFO(log, "Removing replica " << replica_path);
3789 replica_is_active_node = nullptr;
3790 zookeeper->tryRemoveRecursive(replica_path);
3791
3792 /// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
3793 Strings replicas;
3794 if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
3795 {
3796 LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
3797 zookeeper->tryRemoveRecursive(zookeeper_path);
3798 }
3799 }
3800
3801 dropAllData();
3802}
3803
3804
3805void StorageReplicatedMergeTree::rename(
3806 const String & new_path_to_table_data, const String & new_database_name,
3807 const String & new_table_name, TableStructureWriteLockHolder & lock)
3808{
3809 MergeTreeData::rename(new_path_to_table_data, new_database_name, new_table_name, lock);
3810
3811 /// Update table name in zookeeper
3812 auto zookeeper = getZooKeeper();
3813 zookeeper->set(replica_path + "/host", getReplicatedMergeTreeAddress().toString());
3814
3815 /// TODO: You can update names of loggers.
3816}
3817
3818
3819bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
3820{
3821 {
3822 std::lock_guard lock(existing_nodes_cache_mutex);
3823 if (existing_nodes_cache.count(path))
3824 return true;
3825 }
3826
3827 bool res = getZooKeeper()->exists(path);
3828
3829 if (res)
3830 {
3831 std::lock_guard lock(existing_nodes_cache_mutex);
3832 existing_nodes_cache.insert(path);
3833 }
3834
3835 return res;
3836}
3837
3838
3839std::optional<EphemeralLockInZooKeeper>
3840StorageReplicatedMergeTree::allocateBlockNumber(
3841 const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path)
3842{
3843 /// Lets check for duplicates in advance, to avoid superfluous block numbers allocation
3844 Coordination::Requests deduplication_check_ops;
3845 if (!zookeeper_block_id_path.empty())
3846 {
3847 deduplication_check_ops.emplace_back(zkutil::makeCreateRequest(zookeeper_block_id_path, "", zkutil::CreateMode::Persistent));
3848 deduplication_check_ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_block_id_path, -1));
3849 }
3850
3851 String block_numbers_path = zookeeper_path + "/block_numbers";
3852 String partition_path = block_numbers_path + "/" + partition_id;
3853
3854 if (!existsNodeCached(partition_path))
3855 {
3856 Coordination::Requests ops;
3857 ops.push_back(zkutil::makeCreateRequest(partition_path, "", zkutil::CreateMode::Persistent));
3858 /// We increment data version of the block_numbers node so that it becomes possible
3859 /// to check in a ZK transaction that the set of partitions didn't change
3860 /// (unfortunately there is no CheckChildren op).
3861 ops.push_back(zkutil::makeSetRequest(block_numbers_path, "", -1));
3862
3863 Coordination::Responses responses;
3864 int code = zookeeper->tryMulti(ops, responses);
3865 if (code && code != Coordination::ZNODEEXISTS)
3866 zkutil::KeeperMultiException::check(code, ops, responses);
3867 }
3868
3869 EphemeralLockInZooKeeper lock;
3870 /// 2 RTT
3871 try
3872 {
3873 lock = EphemeralLockInZooKeeper(
3874 partition_path + "/block-", zookeeper_path + "/temp", *zookeeper, &deduplication_check_ops);
3875 }
3876 catch (const zkutil::KeeperMultiException & e)
3877 {
3878 if (e.code == Coordination::ZNODEEXISTS && e.getPathForFirstFailedOp() == zookeeper_block_id_path)
3879 return {};
3880
3881 throw Exception("Cannot allocate block number in ZooKeeper: " + e.displayText(), ErrorCodes::KEEPER_EXCEPTION);
3882 }
3883 catch (const Coordination::Exception & e)
3884 {
3885 throw Exception("Cannot allocate block number in ZooKeeper: " + e.displayText(), ErrorCodes::KEEPER_EXCEPTION);
3886 }
3887
3888 return {std::move(lock)};
3889}
3890
3891
3892void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry)
3893{
3894 LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
3895
3896 Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
3897 for (const String & replica : replicas)
3898 waitForReplicaToProcessLogEntry(replica, entry);
3899
3900 LOG_DEBUG(log, "Finished waiting for all replicas to process " << entry.znode_name);
3901}
3902
3903
3904void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
3905{
3906 String entry_str = entry.toString();
3907 String log_node_name;
3908
3909 /** Two types of entries can be passed to this function
3910 * 1. (more often) From `log` directory - a common log, from where replicas copy entries to their queue.
3911 * 2. From the `queue` directory of one of the replicas.
3912 *
3913 * The problem is that the numbers (`sequential` node) of the queue elements in `log` and in `queue` do not match.
3914 * (And the numbers of the same log element for different replicas do not match in the `queue`.)
3915 *
3916 * Therefore, you should consider these cases separately.
3917 */
3918
3919 /** First, you need to wait until replica takes `queue` element from the `log` to its queue,
3920 * if it has not been done already (see the `pullLogsToQueue` function).
3921 *
3922 * To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1.
3923 */
3924
3925 if (startsWith(entry.znode_name, "log-"))
3926 {
3927 /** In this case, just take the number from the node name `log-xxxxxxxxxx`.
3928 */
3929
3930 UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
3931 log_node_name = entry.znode_name;
3932
3933 LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
3934
3935 /// Let's wait until entry gets into the replica queue.
3936 while (true)
3937 {
3938 zkutil::EventPtr event = std::make_shared<Poco::Event>();
3939
3940 String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
3941 if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
3942 break;
3943
3944 event->wait();
3945 }
3946 }
3947 else if (startsWith(entry.znode_name, "queue-"))
3948 {
3949 /** In this case, the number of `log` node is unknown. You need look through everything from `log_pointer` to the end,
3950 * looking for a node with the same content. And if we do not find it - then the replica has already taken this entry in its queue.
3951 */
3952
3953 String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
3954
3955 Strings log_entries = getZooKeeper()->getChildren(zookeeper_path + "/log");
3956 UInt64 log_index = 0;
3957 bool found = false;
3958
3959 for (const String & log_entry_name : log_entries)
3960 {
3961 log_index = parse<UInt64>(log_entry_name.substr(log_entry_name.size() - 10));
3962
3963 if (!log_pointer.empty() && log_index < parse<UInt64>(log_pointer))
3964 continue;
3965
3966 String log_entry_str;
3967 bool exists = getZooKeeper()->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str);
3968 if (exists && entry_str == log_entry_str)
3969 {
3970 found = true;
3971 log_node_name = log_entry_name;
3972 break;
3973 }
3974 }
3975
3976 if (found)
3977 {
3978 LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
3979
3980 /// Let's wait until the entry gets into the replica queue.
3981 while (true)
3982 {
3983 zkutil::EventPtr event = std::make_shared<Poco::Event>();
3984
3985 String log_pointer_new = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
3986 if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
3987 break;
3988
3989 event->wait();
3990 }
3991 }
3992 }
3993 else
3994 throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);
3995
3996 if (!log_node_name.empty())
3997 LOG_DEBUG(log, "Looking for node corresponding to " << log_node_name << " in " << replica << " queue");
3998 else
3999 LOG_DEBUG(log, "Looking for corresponding node in " << replica << " queue");
4000
4001 /** Second - find the corresponding entry in the queue of the specified replica.
4002 * Its number may match neither the `log` node nor the `queue` node of the current replica (for us).
4003 * Therefore, we search by comparing the content.
4004 */
4005
4006 Strings queue_entries = getZooKeeper()->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
4007 String queue_entry_to_wait_for;
4008
4009 for (const String & entry_name : queue_entries)
4010 {
4011 String queue_entry_str;
4012 bool exists = getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
4013 if (exists && queue_entry_str == entry_str)
4014 {
4015 queue_entry_to_wait_for = entry_name;
4016 break;
4017 }
4018 }
4019
4020 /// While looking for the record, it has already been executed and deleted.
4021 if (queue_entry_to_wait_for.empty())
4022 {
4023 LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes.");
4024 return;
4025 }
4026
4027 LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
4028
4029 /// Third - wait until the entry disappears from the replica queue.
4030 getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
4031}
4032
4033
4034void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
4035{
4036 auto zookeeper = tryGetZooKeeper();
4037 const auto storage_settings_ptr = getSettings();
4038
4039 res.is_leader = is_leader;
4040 res.can_become_leader = storage_settings_ptr->replicated_can_become_leader;
4041 res.is_readonly = is_readonly;
4042 res.is_session_expired = !zookeeper || zookeeper->expired();
4043
4044 res.queue = queue.getStatus();
4045 res.absolute_delay = getAbsoluteDelay(); /// NOTE: may be slightly inconsistent with queue status.
4046
4047 res.parts_to_check = part_check_thread.size();
4048
4049 res.zookeeper_path = zookeeper_path;
4050 res.replica_name = replica_name;
4051 res.replica_path = replica_path;
4052 res.columns_version = columns_version;
4053
4054 if (res.is_session_expired || !with_zk_fields)
4055 {
4056 res.log_max_index = 0;
4057 res.log_pointer = 0;
4058 res.total_replicas = 0;
4059 res.active_replicas = 0;
4060 }
4061 else
4062 {
4063 auto log_entries = zookeeper->getChildren(zookeeper_path + "/log");
4064
4065 if (log_entries.empty())
4066 {
4067 res.log_max_index = 0;
4068 }
4069 else
4070 {
4071 const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end());
4072 res.log_max_index = parse<UInt64>(last_log_entry.substr(strlen("log-")));
4073 }
4074
4075 String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
4076 res.log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
4077
4078 auto all_replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
4079 res.total_replicas = all_replicas.size();
4080
4081 res.active_replicas = 0;
4082 for (const String & replica : all_replicas)
4083 if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
4084 ++res.active_replicas;
4085 }
4086}
4087
4088
4089/// TODO: Probably it is better to have queue in ZK with tasks for leader (like DDL)
4090void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context)
4091{
4092 auto live_replicas = getZooKeeper()->getChildren(zookeeper_path + "/leader_election");
4093 if (live_replicas.empty())
4094 throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS);
4095
4096 std::sort(live_replicas.begin(), live_replicas.end());
4097 const auto leader = getZooKeeper()->get(zookeeper_path + "/leader_election/" + live_replicas.front());
4098
4099 if (leader == replica_name)
4100 throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);
4101
4102 /// SECONDARY_QUERY here means, that we received query from DDLWorker
4103 /// there is no sense to send query to leader, because he will receive it from own DDLWorker
4104 if (query_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
4105 {
4106 throw Exception("Cannot execute DDL query, because leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);
4107 }
4108
4109 ReplicatedMergeTreeAddress leader_address(getZooKeeper()->get(zookeeper_path + "/replicas/" + leader + "/host"));
4110
4111 /// TODO: add setters and getters interface for database and table fields of AST
4112 auto new_query = query->clone();
4113 if (auto * alter = new_query->as<ASTAlterQuery>())
4114 {
4115 alter->database = leader_address.database;
4116 alter->table = leader_address.table;
4117 }
4118 else if (auto * optimize = new_query->as<ASTOptimizeQuery>())
4119 {
4120 optimize->database = leader_address.database;
4121 optimize->table = leader_address.table;
4122 }
4123 else if (auto * drop = new_query->as<ASTDropQuery>(); drop->kind == ASTDropQuery::Kind::Truncate)
4124 {
4125 drop->database = leader_address.database;
4126 drop->table = leader_address.table;
4127 }
4128 else
4129 throw Exception("Can't proxy this query. Unsupported query type", ErrorCodes::NOT_IMPLEMENTED);
4130
4131 const auto & query_settings = query_context.getSettingsRef();
4132 const auto & query_client_info = query_context.getClientInfo();
4133 String user = query_client_info.current_user;
4134 String password = query_client_info.current_password;
4135
4136 if (auto address = findClusterAddress(leader_address); address)
4137 {
4138 user = address->user;
4139 password = address->password;
4140 }
4141
4142 Connection connection(
4143 leader_address.host,
4144 leader_address.queries_port,
4145 leader_address.database,
4146 user, password, "Follower replica");
4147
4148 std::stringstream new_query_ss;
4149 formatAST(*new_query, new_query_ss, false, true);
4150 RemoteBlockInputStream stream(connection, new_query_ss.str(), {}, global_context, &query_settings);
4151 NullBlockOutputStream output({});
4152
4153 copyData(stream, output);
4154 return;
4155}
4156
4157
4158std::optional<Cluster::Address> StorageReplicatedMergeTree::findClusterAddress(const ReplicatedMergeTreeAddress & leader_address) const
4159{
4160 for (auto & iter : global_context.getClusters().getContainer())
4161 {
4162 const auto & shards = iter.second->getShardsAddresses();
4163
4164 for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
4165 {
4166 for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
4167 {
4168 const Cluster::Address & address = shards[shard_num][replica_num];
4169 /// user is actually specified, not default
4170 if (address.host_name == leader_address.host && address.port == leader_address.queries_port && address.user_specified)
4171 return address;
4172 }
4173 }
4174 }
4175 return {};
4176}
4177
4178void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica_name_)
4179{
4180 replica_name_ = replica_name;
4181 queue.getEntries(res);
4182}
4183
4184time_t StorageReplicatedMergeTree::getAbsoluteDelay() const
4185{
4186 time_t min_unprocessed_insert_time = 0;
4187 time_t max_processed_insert_time = 0;
4188 queue.getInsertTimes(min_unprocessed_insert_time, max_processed_insert_time);
4189
4190 /// Load start time, then finish time to avoid reporting false delay when start time is updated
4191 /// between loading of two variables.
4192 time_t queue_update_start_time = last_queue_update_start_time.load();
4193 time_t queue_update_finish_time = last_queue_update_finish_time.load();
4194
4195 time_t current_time = time(nullptr);
4196
4197 if (!queue_update_finish_time)
4198 {
4199 /// We have not updated queue even once yet (perhaps replica is readonly).
4200 /// As we have no info about the current state of replication log, return effectively infinite delay.
4201 return current_time;
4202 }
4203 else if (min_unprocessed_insert_time)
4204 {
4205 /// There are some unprocessed insert entries in queue.
4206 return (current_time > min_unprocessed_insert_time) ? (current_time - min_unprocessed_insert_time) : 0;
4207 }
4208 else if (queue_update_start_time > queue_update_finish_time)
4209 {
4210 /// Queue is empty, but there are some in-flight or failed queue update attempts
4211 /// (likely because of problems with connecting to ZooKeeper).
4212 /// Return the time passed since last attempt.
4213 return (current_time > queue_update_start_time) ? (current_time - queue_update_start_time) : 0;
4214 }
4215 else
4216 {
4217 /// Everything is up-to-date.
4218 return 0;
4219 }
4220}
4221
4222void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
4223{
4224 assertNotReadonly();
4225
4226 time_t current_time = time(nullptr);
4227
4228 out_absolute_delay = getAbsoluteDelay();
4229 out_relative_delay = 0;
4230 const auto storage_settings_ptr = getSettings();
4231
4232 /** Relative delay is the maximum difference of absolute delay from any other replica,
4233 * (if this replica lags behind any other live replica, or zero, otherwise).
4234 * Calculated only if the absolute delay is large enough.
4235 */
4236
4237 if (out_absolute_delay < static_cast<time_t>(storage_settings_ptr->min_relative_delay_to_yield_leadership))
4238 return;
4239
4240 auto zookeeper = getZooKeeper();
4241
4242 time_t max_replicas_unprocessed_insert_time = 0;
4243 bool have_replica_with_nothing_unprocessed = false;
4244
4245 Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
4246
4247 for (const auto & replica : replicas)
4248 {
4249 if (replica == replica_name)
4250 continue;
4251
4252 /// Skip dead replicas.
4253 if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
4254 continue;
4255
4256 String value;
4257 if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/min_unprocessed_insert_time", value))
4258 continue;
4259
4260 time_t replica_time = value.empty() ? 0 : parse<time_t>(value);
4261
4262 if (replica_time == 0)
4263 {
4264 /** Note
4265 * The conclusion that the replica does not lag may be incorrect,
4266 * because the information about `min_unprocessed_insert_time` is taken
4267 * only from that part of the log that has been moved to the queue.
4268 * If the replica for some reason has stalled `queueUpdatingTask`,
4269 * then `min_unprocessed_insert_time` will be incorrect.
4270 */
4271
4272 have_replica_with_nothing_unprocessed = true;
4273 break;
4274 }
4275
4276 if (replica_time > max_replicas_unprocessed_insert_time)
4277 max_replicas_unprocessed_insert_time = replica_time;
4278 }
4279
4280 if (have_replica_with_nothing_unprocessed)
4281 out_relative_delay = out_absolute_delay;
4282 else
4283 {
4284 max_replicas_unprocessed_insert_time = std::min(current_time, max_replicas_unprocessed_insert_time);
4285 time_t min_replicas_delay = current_time - max_replicas_unprocessed_insert_time;
4286 if (out_absolute_delay > min_replicas_delay)
4287 out_relative_delay = out_absolute_delay - min_replicas_delay;
4288 }
4289}
4290
4291
4292void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const String & from_, const Context & query_context)
4293{
4294 String partition_id = getPartitionIDFromQuery(partition, query_context);
4295
4296 String from = from_;
4297 if (from.back() == '/')
4298 from.resize(from.size() - 1);
4299
4300 LOG_INFO(log, "Will fetch partition " << partition_id << " from shard " << from_);
4301
4302 /** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
4303 * Unreliable (there is a race condition) - such a partition may appear a little later.
4304 */
4305 Poco::DirectoryIterator dir_end;
4306 for (const std::string & path : getDataPaths())
4307 {
4308 for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
4309 {
4310 MergeTreePartInfo part_info;
4311 if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
4312 && part_info.partition_id == partition_id)
4313 throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
4314 }
4315
4316 }
4317
4318 zkutil::Strings replicas;
4319 zkutil::Strings active_replicas;
4320 String best_replica;
4321
4322 {
4323 auto zookeeper = getZooKeeper();
4324
4325 /// List of replicas of source shard.
4326 replicas = zookeeper->getChildren(from + "/replicas");
4327
4328 /// Leave only active replicas.
4329 active_replicas.reserve(replicas.size());
4330
4331 for (const String & replica : replicas)
4332 if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
4333 active_replicas.push_back(replica);
4334
4335 if (active_replicas.empty())
4336 throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
4337
4338 /** You must select the best (most relevant) replica.
4339 * This is a replica with the maximum `log_pointer`, then with the minimum `queue` size.
4340 * NOTE This is not exactly the best criteria. It does not make sense to download old partitions,
4341 * and it would be nice to be able to choose the replica closest by network.
4342 * NOTE Of course, there are data races here. You can solve it by retrying.
4343 */
4344 Int64 max_log_pointer = -1;
4345 UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
4346
4347 for (const String & replica : active_replicas)
4348 {
4349 String current_replica_path = from + "/replicas/" + replica;
4350
4351 String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
4352 Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
4353
4354 Coordination::Stat stat;
4355 zookeeper->get(current_replica_path + "/queue", &stat);
4356 size_t queue_size = stat.numChildren;
4357
4358 if (log_pointer > max_log_pointer
4359 || (log_pointer == max_log_pointer && queue_size < min_queue_size))
4360 {
4361 max_log_pointer = log_pointer;
4362 min_queue_size = queue_size;
4363 best_replica = replica;
4364 }
4365 }
4366 }
4367
4368 if (best_replica.empty())
4369 throw Exception("Logical error: cannot choose best replica.", ErrorCodes::LOGICAL_ERROR);
4370
4371 LOG_INFO(log, "Found " << replicas.size() << " replicas, " << active_replicas.size() << " of them are active."
4372 << " Selected " << best_replica << " to fetch from.");
4373
4374 String best_replica_path = from + "/replicas/" + best_replica;
4375
4376 /// Let's find out which parts are on the best replica.
4377
4378 /** Trying to download these parts.
4379 * Some of them could be deleted due to the merge.
4380 * In this case, update the information about the available parts and try again.
4381 */
4382
4383 unsigned try_no = 0;
4384 Strings missing_parts;
4385 do
4386 {
4387 if (try_no)
4388 LOG_INFO(log, "Some of parts (" << missing_parts.size() << ") are missing. Will try to fetch covering parts.");
4389
4390 if (try_no >= query_context.getSettings().max_fetch_partition_retries_count)
4391 throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
4392
4393 Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
4394 ActiveDataPartSet active_parts_set(format_version, parts);
4395 Strings parts_to_fetch;
4396
4397 if (missing_parts.empty())
4398 {
4399 parts_to_fetch = active_parts_set.getParts();
4400
4401 /// Leaving only the parts of the desired partition.
4402 Strings parts_to_fetch_partition;
4403 for (const String & part : parts_to_fetch)
4404 {
4405 if (MergeTreePartInfo::fromPartName(part, format_version).partition_id == partition_id)
4406 parts_to_fetch_partition.push_back(part);
4407 }
4408
4409 parts_to_fetch = std::move(parts_to_fetch_partition);
4410
4411 if (parts_to_fetch.empty())
4412 throw Exception("Partition " + partition_id + " on " + best_replica_path + " doesn't exist", ErrorCodes::PARTITION_DOESNT_EXIST);
4413 }
4414 else
4415 {
4416 for (const String & missing_part : missing_parts)
4417 {
4418 String containing_part = active_parts_set.getContainingPart(missing_part);
4419 if (!containing_part.empty())
4420 parts_to_fetch.push_back(containing_part);
4421 else
4422 LOG_WARNING(log, "Part " << missing_part << " on replica " << best_replica_path << " has been vanished.");
4423 }
4424 }
4425
4426 LOG_INFO(log, "Parts to fetch: " << parts_to_fetch.size());
4427
4428 missing_parts.clear();
4429 for (const String & part : parts_to_fetch)
4430 {
4431 try
4432 {
4433 fetchPart(part, best_replica_path, true, 0);
4434 }
4435 catch (const DB::Exception & e)
4436 {
4437 if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
4438 && e.code() != ErrorCodes::CANNOT_READ_ALL_DATA)
4439 throw;
4440
4441 LOG_INFO(log, e.displayText());
4442 missing_parts.push_back(part);
4443 }
4444 }
4445
4446 ++try_no;
4447 } while (!missing_parts.empty());
4448}
4449
4450
4451void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
4452{
4453 /// Overview of the mutation algorithm.
4454 ///
4455 /// When the client executes a mutation, this method is called. It acquires block numbers in all
4456 /// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in
4457 /// the /mutations folder. This block numbers are needed to determine which parts should be mutated and
4458 /// which shouldn't (parts inserted after the mutation will have the block number higher than the
4459 /// block number acquired by the mutation in that partition and so will not be mutatied).
4460 /// This block number is called "mutation version" in that partition.
4461 ///
4462 /// Mutation versions are acquired atomically in all partitions, so the case when an insert in some
4463 /// partition has the block number higher than the mutation version but the following insert into another
4464 /// partition acquires the block number lower than the mutation version in that partition is impossible.
4465 /// Another important invariant: mutation entries appear in /mutations in the order of their mutation
4466 /// versions (in any partition). This means that mutations form a sequence and we can execute them in
4467 /// the order of their mutation versions and not worry that some mutation with the smaller version
4468 /// will suddenly appear.
4469 ///
4470 /// During mutations individual parts are immutable - when we want to change the contents of a part
4471 /// we prepare the new part and add it to MergeTreeData (the original part gets replaced). The fact that
4472 /// we have mutated the part is recorded in the part->info.mutation field of MergeTreePartInfo.
4473 /// The relation with the original part is preserved because the new part covers the same block range
4474 /// as the original one.
4475 ///
4476 /// We then can for each part determine its "mutation version": the version of the last mutation in
4477 /// the mutation sequence that we regard as already applied to that part. All mutations with the greater
4478 /// version number will still need to be applied to that part.
4479 ///
4480 /// Execution of mutations is done asynchronously. All replicas watch the /mutations directory and
4481 /// load new mutation entries as they appear (see mutationsUpdatingTask()). Next we need to determine
4482 /// how to mutate individual parts consistently with part merges. This is done by the leader replica
4483 /// (see mergeSelectingTask() and class ReplicatedMergeTreeMergePredicate for details). Important
4484 /// invariants here are that a) all source parts for a single merge must have the same mutation version
4485 /// and b) any part can be mutated only once or merged only once (e.g. once we have decided to mutate
4486 /// a part then we need to execute that mutation and can assign merges only to the new part and not to the
4487 /// original part). Multiple consecutive mutations can be executed at once (without writing the
4488 /// intermediate result to a part).
4489 ///
4490 /// Leader replica records its decisions to the replication log (/log directory in ZK) in the form of
4491 /// MUTATE_PART entries and all replicas then execute them in the background pool
4492 /// (see tryExecutePartMutation() function). When a replica encounters a MUTATE_PART command, it is
4493 /// guaranteed that the corresponding mutation entry is already loaded (when we pull entries from
4494 /// replication log into the replica queue, we also load mutation entries). Note that just as with merges
4495 /// the replica can decide not to do the mutation locally and fetch the mutated part from another replica
4496 /// instead.
4497 ///
4498 /// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution
4499 /// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data
4500 /// read from the the source parts is first mutated on the fly to some uniform mutation version and then
4501 /// merged to a resulting part.
4502 ///
4503 /// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
4504 /// the version of this mutation), the mutation is considered done and can be deleted.
4505
4506 ReplicatedMergeTreeMutationEntry entry;
4507 entry.source_replica = replica_name;
4508 entry.commands = commands;
4509
4510 String mutations_path = zookeeper_path + "/mutations";
4511
4512 /// Update the mutations_path node when creating the mutation and check its version to ensure that
4513 /// nodes for mutations are created in the same order as the corresponding block numbers.
4514 /// Should work well if the number of concurrent mutation requests is small.
4515 while (true)
4516 {
4517 auto zookeeper = getZooKeeper();
4518
4519 Coordination::Stat mutations_stat;
4520 zookeeper->get(mutations_path, &mutations_stat);
4521
4522 EphemeralLocksInAllPartitions block_number_locks(
4523 zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
4524
4525 for (const auto & lock : block_number_locks.getLocks())
4526 entry.block_numbers[lock.partition_id] = lock.number;
4527
4528 entry.create_time = time(nullptr);
4529
4530 Coordination::Requests requests;
4531 requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
4532 requests.emplace_back(zkutil::makeCreateRequest(
4533 mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
4534
4535 Coordination::Responses responses;
4536 int32_t rc = zookeeper->tryMulti(requests, responses);
4537
4538 if (rc == Coordination::ZOK)
4539 {
4540 const String & path_created =
4541 dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
4542 entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
4543 LOG_TRACE(log, "Created mutation with ID " << entry.znode_name);
4544 break;
4545 }
4546 else if (rc == Coordination::ZBADVERSION)
4547 {
4548 LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying...");
4549 continue;
4550 }
4551 else
4552 throw Coordination::Exception("Unable to create a mutation znode", rc);
4553 }
4554
4555 /// we have to wait
4556 if (query_context.getSettingsRef().mutations_sync != 0)
4557 {
4558 Strings replicas;
4559 if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas
4560 replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
4561 else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
4562 replicas.push_back(replica_path);
4563
4564 waitMutationToFinishOnReplicas(replicas, entry.znode_name);
4565 }
4566
4567}
4568
4569std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const
4570{
4571 return queue.getMutationsStatus();
4572}
4573
4574CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutation_id)
4575{
4576 assertNotReadonly();
4577
4578 zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
4579
4580 LOG_TRACE(log, "Killing mutation " << mutation_id);
4581
4582 auto mutation_entry = queue.removeMutation(zookeeper, mutation_id);
4583 if (!mutation_entry)
4584 return CancellationCode::NotFound;
4585
4586 /// After this point no new part mutations will start and part mutations that still exist
4587 /// in the queue will be skipped.
4588
4589 /// Cancel already running part mutations.
4590 for (const auto & pair : mutation_entry->block_numbers)
4591 {
4592 const String & partition_id = pair.first;
4593 Int64 block_number = pair.second;
4594 global_context.getMergeList().cancelPartMutations(partition_id, block_number);
4595 }
4596 return CancellationCode::CancelSent;
4597}
4598
4599
4600void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
4601{
4602 /// Critical section is not required (since grabOldParts() returns unique part set on each call)
4603
4604 auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
4605 auto zookeeper = getZooKeeper();
4606
4607 DataPartsVector parts = grabOldParts();
4608 if (parts.empty())
4609 return;
4610
4611 DataPartsVector parts_to_delete_only_from_filesystem; // Only duplicates
4612 DataPartsVector parts_to_delete_completely; // All parts except duplicates
4613 DataPartsVector parts_to_retry_deletion; // Parts that should be retried due to network problems
4614 DataPartsVector parts_to_remove_from_filesystem; // Parts removed from ZK
4615
4616 for (const auto & part : parts)
4617 {
4618 if (!part->is_duplicate)
4619 parts_to_delete_completely.emplace_back(part);
4620 else
4621 parts_to_delete_only_from_filesystem.emplace_back(part);
4622 }
4623 parts.clear();
4624
4625 auto remove_parts_from_filesystem = [log=log] (const DataPartsVector & parts_to_remove)
4626 {
4627 for (auto & part : parts_to_remove)
4628 {
4629 try
4630 {
4631 part->remove();
4632 }
4633 catch (...)
4634 {
4635 tryLogCurrentException(log, "There is a problem with deleting part " + part->name + " from filesystem");
4636 }
4637 }
4638 };
4639
4640 /// Delete duplicate parts from filesystem
4641 if (!parts_to_delete_only_from_filesystem.empty())
4642 {
4643 remove_parts_from_filesystem(parts_to_delete_only_from_filesystem);
4644 removePartsFinally(parts_to_delete_only_from_filesystem);
4645
4646 LOG_DEBUG(log, "Removed " << parts_to_delete_only_from_filesystem.size() << " old duplicate parts");
4647 }
4648
4649 /// Delete normal parts from ZooKeeper
4650 NameSet part_names_to_retry_deletion;
4651 try
4652 {
4653 Strings part_names_to_delete_completely;
4654 for (const auto & part : parts_to_delete_completely)
4655 part_names_to_delete_completely.emplace_back(part->name);
4656
4657 LOG_DEBUG(log, "Removing " << parts_to_delete_completely.size() << " old parts from ZooKeeper");
4658 removePartsFromZooKeeper(zookeeper, part_names_to_delete_completely, &part_names_to_retry_deletion);
4659 }
4660 catch (...)
4661 {
4662 LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(true));
4663 }
4664
4665 /// Part names that were reliably deleted from ZooKeeper should be deleted from filesystem
4666 auto num_reliably_deleted_parts = parts_to_delete_completely.size() - part_names_to_retry_deletion.size();
4667 LOG_DEBUG(log, "Removed " << num_reliably_deleted_parts << " old parts from ZooKeeper. Removing them from filesystem.");
4668
4669 /// Delete normal parts on two sets
4670 for (auto & part : parts_to_delete_completely)
4671 {
4672 if (part_names_to_retry_deletion.count(part->name) == 0)
4673 parts_to_remove_from_filesystem.emplace_back(part);
4674 else
4675 parts_to_retry_deletion.emplace_back(part);
4676 }
4677
4678 /// Will retry deletion
4679 if (!parts_to_retry_deletion.empty())
4680 {
4681 rollbackDeletingParts(parts_to_retry_deletion);
4682 LOG_DEBUG(log, "Will retry deletion of " << parts_to_retry_deletion.size() << " parts in the next time");
4683 }
4684
4685 /// Remove parts from filesystem and finally from data_parts
4686 if (!parts_to_remove_from_filesystem.empty())
4687 {
4688 remove_parts_from_filesystem(parts_to_remove_from_filesystem);
4689 removePartsFinally(parts_to_remove_from_filesystem);
4690
4691 LOG_DEBUG(log, "Removed " << parts_to_remove_from_filesystem.size() << " old parts");
4692 }
4693}
4694
4695
4696bool StorageReplicatedMergeTree::tryRemovePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries)
4697{
4698 Strings part_names_to_remove;
4699 for (const auto & part : parts)
4700 part_names_to_remove.emplace_back(part->name);
4701
4702 return tryRemovePartsFromZooKeeperWithRetries(part_names_to_remove, max_retries);
4703}
4704
4705bool StorageReplicatedMergeTree::tryRemovePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries)
4706{
4707 size_t num_tries = 0;
4708 bool success = false;
4709
4710 while (!success && (max_retries == 0 || num_tries < max_retries))
4711 {
4712 try
4713 {
4714 ++num_tries;
4715 success = true;
4716
4717 auto zookeeper = getZooKeeper();
4718
4719 std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
4720 exists_futures.reserve(part_names.size());
4721 for (const String & part_name : part_names)
4722 {
4723 String part_path = replica_path + "/parts/" + part_name;
4724 exists_futures.emplace_back(zookeeper->asyncExists(part_path));
4725 }
4726
4727 std::vector<std::future<Coordination::MultiResponse>> remove_futures;
4728 remove_futures.reserve(part_names.size());
4729 for (size_t i = 0; i < part_names.size(); ++i)
4730 {
4731 Coordination::ExistsResponse exists_resp = exists_futures[i].get();
4732 if (!exists_resp.error)
4733 {
4734 Coordination::Requests ops;
4735 removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0);
4736 remove_futures.emplace_back(zookeeper->tryAsyncMulti(ops));
4737 }
4738 }
4739
4740 for (auto & future : remove_futures)
4741 {
4742 auto response = future.get();
4743
4744 if (response.error == 0 || response.error == Coordination::ZNONODE)
4745 continue;
4746
4747 if (Coordination::isHardwareError(response.error))
4748 {
4749 success = false;
4750 continue;
4751 }
4752
4753 throw Coordination::Exception(response.error);
4754 }
4755 }
4756 catch (Coordination::Exception & e)
4757 {
4758 success = false;
4759
4760 if (Coordination::isHardwareError(e.code))
4761 tryLogCurrentException(log, __PRETTY_FUNCTION__);
4762 else
4763 throw;
4764 }
4765
4766 if (!success && num_tries < max_retries)
4767 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
4768 }
4769
4770 return success;
4771}
4772
4773void StorageReplicatedMergeTree::removePartsFromZooKeeper(
4774 zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, NameSet * parts_should_be_retried)
4775{
4776 std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
4777 std::vector<std::future<Coordination::MultiResponse>> remove_futures;
4778 exists_futures.reserve(part_names.size());
4779 remove_futures.reserve(part_names.size());
4780 try
4781 {
4782 /// Exception can be thrown from loop
4783 /// if zk session will be dropped
4784 for (const String & part_name : part_names)
4785 {
4786 String part_path = replica_path + "/parts/" + part_name;
4787 exists_futures.emplace_back(zookeeper->asyncExists(part_path));
4788 }
4789
4790 for (size_t i = 0; i < part_names.size(); ++i)
4791 {
4792 Coordination::ExistsResponse exists_resp = exists_futures[i].get();
4793 if (!exists_resp.error)
4794 {
4795 Coordination::Requests ops;
4796 removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0);
4797 remove_futures.emplace_back(zookeeper->tryAsyncMulti(ops));
4798 }
4799 else
4800 {
4801 LOG_DEBUG(log,
4802 "There is no part " << part_names[i] << " in ZooKeeper, it was only in filesystem");
4803 // emplace invalid future so that the total number of futures is the same as part_names.size();
4804 remove_futures.emplace_back();
4805 }
4806 }
4807 }
4808 catch (const Coordination::Exception & e)
4809 {
4810 if (parts_should_be_retried && Coordination::isHardwareError(e.code))
4811 parts_should_be_retried->insert(part_names.begin(), part_names.end());
4812 throw;
4813 }
4814
4815 for (size_t i = 0; i < remove_futures.size(); ++i)
4816 {
4817 auto & future = remove_futures[i];
4818
4819 if (!future.valid())
4820 continue;
4821
4822 auto response = future.get();
4823 if (response.error == Coordination::ZOK)
4824 continue;
4825 else if (response.error == Coordination::ZNONODE)
4826 {
4827 LOG_DEBUG(log,
4828 "There is no part " << part_names[i] << " in ZooKeeper, it was only in filesystem");
4829 continue;
4830 }
4831 else if (Coordination::isHardwareError(response.error))
4832 {
4833 if (parts_should_be_retried)
4834 parts_should_be_retried->insert(part_names[i]);
4835 continue;
4836 }
4837 else
4838 LOG_WARNING(log, "Cannot remove part " << part_names[i] << " from ZooKeeper: "
4839 << zkutil::ZooKeeper::error2string(response.error));
4840 }
4841}
4842
4843
4844void StorageReplicatedMergeTree::clearBlocksInPartition(
4845 zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
4846{
4847 Strings blocks;
4848 if (zookeeper.tryGetChildren(zookeeper_path + "/blocks", blocks))
4849 throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
4850
4851 String partition_prefix = partition_id + "_";
4852 zkutil::AsyncResponses<Coordination::GetResponse> get_futures;
4853 for (const String & block_id : blocks)
4854 {
4855 if (startsWith(block_id, partition_prefix))
4856 {
4857 String path = zookeeper_path + "/blocks/" + block_id;
4858 get_futures.emplace_back(path, zookeeper.asyncTryGet(path));
4859 }
4860 }
4861
4862 zkutil::AsyncResponses<Coordination::RemoveResponse> to_delete_futures;
4863 for (auto & pair : get_futures)
4864 {
4865 const String & path = pair.first;
4866 auto result = pair.second.get();
4867
4868 if (result.error == Coordination::ZNONODE)
4869 continue;
4870
4871 ReadBufferFromString buf(result.data);
4872 Int64 block_num = 0;
4873 bool parsed = tryReadIntText(block_num, buf) && buf.eof();
4874 if (!parsed || (min_block_num <= block_num && block_num <= max_block_num))
4875 to_delete_futures.emplace_back(path, zookeeper.asyncTryRemove(path));
4876 }
4877
4878 for (auto & pair : to_delete_futures)
4879 {
4880 const String & path = pair.first;
4881 int32_t rc = pair.second.get().error;
4882 if (rc == Coordination::ZNOTEMPTY)
4883 {
4884 /// Can happen if there are leftover block nodes with children created by previous server versions.
4885 zookeeper.removeRecursive(path);
4886 }
4887 else if (rc)
4888 LOG_WARNING(log,
4889 "Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
4890 }
4891
4892 LOG_TRACE(log, "Deleted " << to_delete_futures.size() << " deduplication block IDs in partition ID " << partition_id);
4893}
4894
4895void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace,
4896 const Context & context)
4897{
4898 auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
4899 auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
4900
4901 Stopwatch watch;
4902 MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
4903 String partition_id = getPartitionIDFromQuery(partition, context);
4904
4905 DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
4906 DataPartsVector src_parts;
4907 MutableDataPartsVector dst_parts;
4908 Strings block_id_paths;
4909 Strings part_checksums;
4910 std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
4911
4912 LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
4913
4914 static const String TMP_PREFIX = "tmp_replace_from_";
4915 auto zookeeper = getZooKeeper();
4916
4917 /// Firstly, generate last block number and compute drop_range
4918 /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
4919 /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
4920 MergeTreePartInfo drop_range;
4921 drop_range.partition_id = partition_id;
4922 drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
4923 drop_range.min_block = replace ? 0 : drop_range.max_block;
4924 drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
4925
4926 String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
4927
4928 if (drop_range.getBlocksCount() > 1)
4929 {
4930 /// We have to prohibit merges in drop_range, since new merge log entry appeared after this REPLACE FROM entry
4931 /// could produce new merged part instead in place of just deleted parts.
4932 /// It is better to prohibit them on leader replica (like DROP PARTITION makes),
4933 /// but it is inconvenient for a user since he could actually use source table from this replica.
4934 /// Therefore prohibit merges on the initializer server now and on the remaining servers when log entry will be executed.
4935 /// It does not provides strong guarantees, but is suitable for intended use case (assume merges are quite rare).
4936
4937 {
4938 std::lock_guard merge_selecting_lock(merge_selecting_mutex);
4939 queue.disableMergesInBlockRange(drop_range_fake_part_name);
4940 }
4941 }
4942
4943 for (size_t i = 0; i < src_all_parts.size(); ++i)
4944 {
4945 /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
4946 /// Assume that merges in the partition are quite rare
4947 /// Save deduplication block ids with special prefix replace_partition
4948
4949 auto & src_part = src_all_parts[i];
4950
4951 if (!canReplacePartition(src_part))
4952 throw Exception(
4953 "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
4954 ErrorCodes::LOGICAL_ERROR);
4955
4956 String hash_hex = src_part->checksums.getTotalChecksumHex();
4957 String block_id_path = replace ? "" : (zookeeper_path + "/blocks/" + partition_id + "_replace_from_" + hash_hex);
4958
4959 auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path);
4960 if (!lock)
4961 {
4962 LOG_INFO(log, "Part " << src_part->name << " (hash " << hash_hex << ") has been already attached");
4963 continue;
4964 }
4965
4966 UInt64 index = lock->getNumber();
4967 MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
4968 auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info);
4969
4970 src_parts.emplace_back(src_part);
4971 dst_parts.emplace_back(dst_part);
4972 ephemeral_locks.emplace_back(std::move(*lock));
4973 block_id_paths.emplace_back(block_id_path);
4974 part_checksums.emplace_back(hash_hex);
4975 }
4976
4977 ReplicatedMergeTreeLogEntryData entry;
4978 {
4979 entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
4980 entry.source_replica = replica_name;
4981 entry.create_time = time(nullptr);
4982 entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
4983
4984 auto & entry_replace = *entry.replace_range_entry;
4985 entry_replace.drop_range_part_name = drop_range_fake_part_name;
4986 entry_replace.from_database = src_data.database_name;
4987 entry_replace.from_table = src_data.table_name;
4988 for (const auto & part : src_parts)
4989 entry_replace.src_part_names.emplace_back(part->name);
4990 for (const auto & part : dst_parts)
4991 entry_replace.new_part_names.emplace_back(part->name);
4992 for (const String & checksum : part_checksums)
4993 entry_replace.part_names_checksums.emplace_back(checksum);
4994 entry_replace.columns_version = columns_version;
4995 }
4996
4997 /// We are almost ready to commit changes, remove fetches and merges from drop range
4998 queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
4999
5000 /// Remove deduplication block_ids of replacing parts
5001 if (replace)
5002 clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
5003
5004 DataPartsVector parts_to_remove;
5005 Coordination::Responses op_results;
5006
5007 try
5008 {
5009 Coordination::Requests ops;
5010 for (size_t i = 0; i < dst_parts.size(); ++i)
5011 {
5012 getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
5013 ephemeral_locks[i].getUnlockOps(ops);
5014
5015 if (ops.size() > zkutil::MULTI_BATCH_SIZE)
5016 {
5017 /// It is unnecessary to add parts to working set until we commit log entry
5018 zookeeper->multi(ops);
5019 ops.clear();
5020 }
5021 }
5022
5023 ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
5024
5025 Transaction transaction(*this);
5026 {
5027 auto data_parts_lock = lockParts();
5028
5029 for (MutableDataPartPtr & part : dst_parts)
5030 renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock);
5031 }
5032
5033 op_results = zookeeper->multi(ops);
5034
5035 {
5036 auto data_parts_lock = lockParts();
5037
5038 transaction.commit(&data_parts_lock);
5039 if (replace)
5040 parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
5041 }
5042
5043 PartLog::addNewParts(global_context, dst_parts, watch.elapsed());
5044 }
5045 catch (...)
5046 {
5047 PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
5048 throw;
5049 }
5050
5051 String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
5052 entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
5053
5054 for (auto & lock : ephemeral_locks)
5055 lock.assumeUnlocked();
5056
5057 /// Forcibly remove replaced parts from ZooKeeper
5058 tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
5059
5060 /// Speedup removing of replaced parts from filesystem
5061 parts_to_remove.clear();
5062 cleanup_thread.wakeup();
5063
5064 /// If necessary, wait until the operation is performed on all replicas.
5065 if (context.getSettingsRef().replication_alter_partitions_sync > 1)
5066 {
5067 lock2.release();
5068 lock1.release();
5069 waitForAllReplicasToProcessLogEntry(entry);
5070 }
5071}
5072
5073void StorageReplicatedMergeTree::getCommitPartOps(
5074 Coordination::Requests & ops,
5075 MutableDataPartPtr & part,
5076 const String & block_id_path) const
5077{
5078 const String & part_name = part->name;
5079 const auto storage_settings_ptr = getSettings();
5080
5081 if (!block_id_path.empty())
5082 {
5083 /// Make final duplicate check and commit block_id
5084 ops.emplace_back(
5085 zkutil::makeCreateRequest(
5086 block_id_path,
5087 part_name, /// We will be able to know original part number for duplicate blocks, if we want.
5088 zkutil::CreateMode::Persistent));
5089 }
5090
5091 /// Information about the part, in the replica
5092
5093 ops.emplace_back(zkutil::makeCheckRequest(
5094 zookeeper_path + "/columns",
5095 columns_version));
5096
5097 if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
5098 {
5099 ops.emplace_back(zkutil::makeCreateRequest(
5100 replica_path + "/parts/" + part->name,
5101 ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->columns, part->checksums).toString(),
5102 zkutil::CreateMode::Persistent));
5103 }
5104 else
5105 {
5106 ops.emplace_back(zkutil::makeCreateRequest(
5107 replica_path + "/parts/" + part->name,
5108 "",
5109 zkutil::CreateMode::Persistent));
5110 ops.emplace_back(zkutil::makeCreateRequest(
5111 replica_path + "/parts/" + part->name + "/columns",
5112 part->columns.toString(),
5113 zkutil::CreateMode::Persistent));
5114 ops.emplace_back(zkutil::makeCreateRequest(
5115 replica_path + "/parts/" + part->name + "/checksums",
5116 getChecksumsForZooKeeper(part->checksums),
5117 zkutil::CreateMode::Persistent));
5118 }
5119}
5120
5121void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit(
5122 const zkutil::ZooKeeperPtr & zookeeper,
5123 AlterDataPartTransaction & transaction)
5124{
5125 String part_path = replica_path + "/parts/" + transaction.getPartName();
5126 const auto storage_settings_ptr = getSettings();
5127
5128 bool need_delete_columns_and_checksums_nodes = false;
5129 try
5130 {
5131 if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
5132 {
5133 auto part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
5134 transaction.getNewColumns(), transaction.getNewChecksums());
5135 Coordination::Stat stat;
5136 zookeeper->set(part_path, part_header.toString(), -1, &stat);
5137
5138 need_delete_columns_and_checksums_nodes = stat.numChildren > 0;
5139 }
5140 else
5141 {
5142 Coordination::Requests ops;
5143 ops.emplace_back(zkutil::makeSetRequest(
5144 part_path, String(), -1));
5145 ops.emplace_back(zkutil::makeSetRequest(
5146 part_path + "/columns", transaction.getNewColumns().toString(), -1));
5147 ops.emplace_back(zkutil::makeSetRequest(
5148 part_path + "/checksums", getChecksumsForZooKeeper(transaction.getNewChecksums()), -1));
5149 zookeeper->multi(ops);
5150 }
5151 }
5152 catch (const Coordination::Exception & e)
5153 {
5154 /// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
5155 if (e.code == Coordination::ZNONODE)
5156 enqueuePartForCheck(transaction.getPartName());
5157
5158 throw;
5159 }
5160
5161 /// Apply file changes.
5162 transaction.commit();
5163
5164 /// Legacy <part_path>/columns and <part_path>/checksums znodes are not needed anymore and can be deleted.
5165 if (need_delete_columns_and_checksums_nodes)
5166 {
5167 Coordination::Requests ops;
5168 ops.emplace_back(zkutil::makeRemoveRequest(part_path + "/columns", -1));
5169 ops.emplace_back(zkutil::makeRemoveRequest(part_path + "/checksums", -1));
5170 zookeeper->multi(ops);
5171 }
5172}
5173
5174ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAddress() const
5175{
5176 auto host_port = global_context.getInterserverIOAddress();
5177
5178 ReplicatedMergeTreeAddress res;
5179 res.host = host_port.first;
5180 res.replication_port = host_port.second;
5181 res.queries_port = global_context.getTCPPort();
5182 res.database = database_name;
5183 res.table = table_name;
5184 res.scheme = global_context.getInterserverScheme();
5185 return res;
5186}
5187
5188ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType action_type)
5189{
5190 if (action_type == ActionLocks::PartsMerge)
5191 return merger_mutator.merges_blocker.cancel();
5192
5193 if (action_type == ActionLocks::PartsTTLMerge)
5194 return merger_mutator.ttl_merges_blocker.cancel();
5195
5196 if (action_type == ActionLocks::PartsFetch)
5197 return fetcher.blocker.cancel();
5198
5199 if (action_type == ActionLocks::PartsSend)
5200 return data_parts_exchange_endpoint_holder ? data_parts_exchange_endpoint_holder->getBlocker().cancel() : ActionLock();
5201
5202 if (action_type == ActionLocks::ReplicationQueue)
5203 return queue.actions_blocker.cancel();
5204
5205 if (action_type == ActionLocks::PartsMove)
5206 return parts_mover.moves_blocker.cancel();
5207
5208 return {};
5209}
5210
5211
5212bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
5213{
5214 Stopwatch watch;
5215
5216 /// Let's fetch new log entries firstly
5217 queue.pullLogsToQueue(getZooKeeper());
5218
5219 Poco::Event target_size_event;
5220 auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
5221 {
5222 if (new_queue_size <= queue_size)
5223 target_size_event.set();
5224 };
5225 const auto handler = queue.addSubscriber(std::move(callback));
5226
5227 while (!target_size_event.tryWait(50))
5228 {
5229 if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
5230 return false;
5231
5232 if (partial_shutdown_called)
5233 throw Exception("Shutdown is called for table", ErrorCodes::ABORTED);
5234 }
5235
5236 return true;
5237}
5238
5239
5240bool StorageReplicatedMergeTree::dropPartsInPartition(
5241 zkutil::ZooKeeper & zookeeper, String & partition_id, StorageReplicatedMergeTree::LogEntry & entry, bool detach)
5242{
5243 MergeTreePartInfo drop_range_info;
5244 if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))
5245 {
5246 LOG_INFO(log, "Will not drop partition " << partition_id << ", it is empty.");
5247 return false;
5248 }
5249
5250 clearBlocksInPartition(zookeeper, partition_id, drop_range_info.min_block, drop_range_info.max_block);
5251
5252 /** Forbid to choose the parts to be deleted for merging.
5253 * Invariant: after the `DROP_RANGE` entry appears in the log, merge of deleted parts will not appear in the log.
5254 */
5255 String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
5256 {
5257 std::lock_guard merge_selecting_lock(merge_selecting_mutex);
5258 queue.disableMergesInBlockRange(drop_range_fake_part_name);
5259 }
5260
5261 LOG_DEBUG(log, "Disabled merges covered by range " << drop_range_fake_part_name);
5262
5263 /// Finally, having achieved the necessary invariants, you can put an entry in the log.
5264 entry.type = LogEntry::DROP_RANGE;
5265 entry.source_replica = replica_name;
5266 entry.new_part_name = drop_range_fake_part_name;
5267 entry.detach = detach;
5268 entry.create_time = time(nullptr);
5269
5270 String log_znode_path = zookeeper.create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
5271 entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
5272
5273 return true;
5274}
5275
5276
5277CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const Context & context)
5278{
5279 CheckResults results;
5280 DataPartsVector data_parts;
5281 if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
5282 {
5283 String partition_id = getPartitionIDFromQuery(check_query.partition, context);
5284 data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
5285 }
5286 else
5287 data_parts = getDataPartsVector();
5288
5289 for (auto & part : data_parts)
5290 {
5291 try
5292 {
5293 results.push_back(part_check_thread.checkPart(part->name));
5294 }
5295 catch (const Exception & ex)
5296 {
5297 results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
5298 }
5299 }
5300 return results;
5301}
5302
5303bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
5304{
5305 const auto storage_settings_ptr = getSettings();
5306 return storage_settings_ptr->index_granularity_bytes != 0 &&
5307 (storage_settings_ptr->enable_mixed_granularity_parts ||
5308 (!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
5309}
5310
5311
5312}
5313