1#include "ClusterCopier.h"
2
3#include <chrono>
4#include <optional>
5#include <Poco/Util/XMLConfiguration.h>
6#include <Poco/Logger.h>
7#include <Poco/ConsoleChannel.h>
8#include <Poco/FormattingChannel.h>
9#include <Poco/PatternFormatter.h>
10#include <Poco/UUIDGenerator.h>
11#include <Poco/File.h>
12#include <Poco/Process.h>
13#include <Poco/FileChannel.h>
14#include <Poco/SplitterChannel.h>
15#include <Poco/Util/HelpFormatter.h>
16#include <boost/algorithm/string.hpp>
17#include <pcg_random.hpp>
18#include <common/logger_useful.h>
19#include <Common/ThreadPool.h>
20#include <Common/Exception.h>
21#include <Common/ZooKeeper/ZooKeeper.h>
22#include <Common/ZooKeeper/KeeperException.h>
23#include <Common/getFQDNOrHostName.h>
24#include <Common/isLocalAddress.h>
25#include <Common/typeid_cast.h>
26#include <Common/ClickHouseRevision.h>
27#include <Common/formatReadable.h>
28#include <Common/DNSResolver.h>
29#include <Common/CurrentThread.h>
30#include <Common/escapeForFileName.h>
31#include <Common/getNumberOfPhysicalCPUCores.h>
32#include <Common/ThreadStatus.h>
33#include <Client/Connection.h>
34#include <Interpreters/Context.h>
35#include <Interpreters/Cluster.h>
36#include <Interpreters/InterpreterFactory.h>
37#include <Interpreters/InterpreterExistsQuery.h>
38#include <Interpreters/InterpreterShowCreateQuery.h>
39#include <Interpreters/InterpreterDropQuery.h>
40#include <Interpreters/InterpreterCreateQuery.h>
41#include <Columns/ColumnString.h>
42#include <Columns/ColumnsNumber.h>
43#include <DataTypes/DataTypeString.h>
44#include <Parsers/ParserCreateQuery.h>
45#include <Parsers/parseQuery.h>
46#include <Parsers/ParserQuery.h>
47#include <Parsers/ASTCreateQuery.h>
48#include <Parsers/queryToString.h>
49#include <Parsers/ASTDropQuery.h>
50#include <Parsers/ASTLiteral.h>
51#include <Parsers/ASTExpressionList.h>
52#include <Formats/FormatSettings.h>
53#include <DataStreams/RemoteBlockInputStream.h>
54#include <DataStreams/SquashingBlockInputStream.h>
55#include <DataStreams/AsynchronousBlockInputStream.h>
56#include <DataStreams/copyData.h>
57#include <DataStreams/NullBlockOutputStream.h>
58#include <IO/ConnectionTimeouts.h>
59#include <IO/Operators.h>
60#include <IO/ReadBufferFromString.h>
61#include <IO/ReadBufferFromFile.h>
62#include <Functions/registerFunctions.h>
63#include <TableFunctions/registerTableFunctions.h>
64#include <AggregateFunctions/registerAggregateFunctions.h>
65#include <Storages/registerStorages.h>
66#include <Storages/StorageDistributed.h>
67#include <Dictionaries/registerDictionaries.h>
68#include <Disks/registerDisks.h>
69#include <Databases/DatabaseMemory.h>
70#include <Common/StatusFile.h>
71
72
73namespace DB
74{
75
76namespace ErrorCodes
77{
78 extern const int NO_ZOOKEEPER;
79 extern const int BAD_ARGUMENTS;
80 extern const int UNKNOWN_TABLE;
81 extern const int UNFINISHED;
82 extern const int UNKNOWN_ELEMENT_IN_CONFIG;
83}
84
85
86using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
87
88static ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data)
89{
90 std::stringstream ss(xml_data);
91 Poco::XML::InputSource input_source{ss};
92 return {new Poco::Util::XMLConfiguration{&input_source}};
93}
94
95namespace
96{
97
98
99using DatabaseAndTableName = std::pair<String, String>;
100
101String getQuotedTable(const String & database, const String & table)
102{
103 if (database.empty())
104 {
105 return backQuoteIfNeed(table);
106 }
107
108 return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
109}
110
111String getQuotedTable(const DatabaseAndTableName & db_and_table)
112{
113 return getQuotedTable(db_and_table.first, db_and_table.second);
114}
115
116
117enum class TaskState
118{
119 Started = 0,
120 Finished,
121 Unknown
122};
123
124/// Used to mark status of shard partition tasks
125struct TaskStateWithOwner
126{
127 TaskStateWithOwner() = default;
128 TaskStateWithOwner(TaskState state_, const String & owner_) : state(state_), owner(owner_) {}
129
130 TaskState state{TaskState::Unknown};
131 String owner;
132
133 static String getData(TaskState state, const String & owner)
134 {
135 return TaskStateWithOwner(state, owner).toString();
136 }
137
138 String toString()
139 {
140 WriteBufferFromOwnString wb;
141 wb << static_cast<UInt32>(state) << "\n" << escape << owner;
142 return wb.str();
143 }
144
145 static TaskStateWithOwner fromString(const String & data)
146 {
147 ReadBufferFromString rb(data);
148 TaskStateWithOwner res;
149 UInt32 state;
150
151 rb >> state >> "\n" >> escape >> res.owner;
152
153 if (state >= static_cast<int>(TaskState::Unknown))
154 throw Exception("Unknown state " + data, ErrorCodes::LOGICAL_ERROR);
155
156 res.state = static_cast<TaskState>(state);
157 return res;
158 }
159};
160
161
162/// Hierarchical description of the tasks
163struct ShardPartition;
164struct TaskShard;
165struct TaskTable;
166struct TaskCluster;
167struct ClusterPartition;
168
169using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
170using ShardInfo = Cluster::ShardInfo;
171using TaskShardPtr = std::shared_ptr<TaskShard>;
172using TasksShard = std::vector<TaskShardPtr>;
173using TasksTable = std::list<TaskTable>;
174using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
175
176
177/// Just destination partition of a shard
178struct ShardPartition
179{
180 ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {}
181
182 String getPartitionPath() const;
183 String getPartitionCleanStartPath() const;
184 String getCommonPartitionIsDirtyPath() const;
185 String getCommonPartitionIsCleanedPath() const;
186 String getPartitionActiveWorkersPath() const;
187 String getActiveWorkerPath() const;
188 String getPartitionShardsPath() const;
189 String getShardStatusPath() const;
190
191 TaskShard & task_shard;
192 String name;
193};
194
195
196struct ShardPriority
197{
198 UInt8 is_remote = 1;
199 size_t hostname_difference = 0;
200 UInt8 random = 0;
201
202 static bool greaterPriority(const ShardPriority & current, const ShardPriority & other)
203 {
204 return std::forward_as_tuple(current.is_remote, current.hostname_difference, current.random)
205 < std::forward_as_tuple(other.is_remote, other.hostname_difference, other.random);
206 }
207};
208
209
210struct TaskShard
211{
212 TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_) {}
213
214 TaskTable & task_table;
215
216 ShardInfo info;
217 UInt32 numberInCluster() const { return info.shard_num; }
218 UInt32 indexInCluster() const { return info.shard_num - 1; }
219
220 String getDescription() const;
221 String getHostNameExample() const;
222
223 /// Used to sort clusters by their proximity
224 ShardPriority priority;
225
226 /// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
227 ColumnWithTypeAndName partition_key_column;
228
229 /// There is a task for each destination partition
230 TasksPartition partition_tasks;
231
232 /// Which partitions have been checked for existence
233 /// If some partition from this lists is exists, it is in partition_tasks
234 std::set<String> checked_partitions;
235
236 /// Last CREATE TABLE query of the table of the shard
237 ASTPtr current_pull_table_create_query;
238
239 /// Internal distributed tables
240 DatabaseAndTableName table_read_shard;
241 DatabaseAndTableName table_split_shard;
242};
243
244
245/// Contains info about all shards that contain a partition
246struct ClusterPartition
247{
248 double elapsed_time_seconds = 0;
249 UInt64 bytes_copied = 0;
250 UInt64 rows_copied = 0;
251 UInt64 blocks_copied = 0;
252
253 UInt64 total_tries = 0;
254};
255
256
257struct TaskTable
258{
259 TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
260 const String & table_key);
261
262 TaskCluster & task_cluster;
263
264 String getPartitionPath(const String & partition_name) const;
265 String getPartitionIsDirtyPath(const String & partition_name) const;
266 String getPartitionIsCleanedPath(const String & partition_name) const;
267 String getPartitionTaskStatusPath(const String & partition_name) const;
268
269 String name_in_config;
270
271 /// Used as task ID
272 String table_id;
273
274 /// Source cluster and table
275 String cluster_pull_name;
276 DatabaseAndTableName table_pull;
277
278 /// Destination cluster and table
279 String cluster_push_name;
280 DatabaseAndTableName table_push;
281
282 /// Storage of destination table
283 String engine_push_str;
284 ASTPtr engine_push_ast;
285 ASTPtr engine_push_partition_key_ast;
286
287 /// A Distributed table definition used to split data
288 String sharding_key_str;
289 ASTPtr sharding_key_ast;
290 ASTPtr engine_split_ast;
291
292 /// Additional WHERE expression to filter input data
293 String where_condition_str;
294 ASTPtr where_condition_ast;
295
296 /// Resolved clusters
297 ClusterPtr cluster_pull;
298 ClusterPtr cluster_push;
299
300 /// Filter partitions that should be copied
301 bool has_enabled_partitions = false;
302 Strings enabled_partitions;
303 NameSet enabled_partitions_set;
304
305 /// Prioritized list of shards
306 TasksShard all_shards;
307 TasksShard local_shards;
308
309 ClusterPartitions cluster_partitions;
310 NameSet finished_cluster_partitions;
311
312 /// Parition names to process in user-specified order
313 Strings ordered_partition_names;
314
315 ClusterPartition & getClusterPartition(const String & partition_name)
316 {
317 auto it = cluster_partitions.find(partition_name);
318 if (it == cluster_partitions.end())
319 throw Exception("There are no cluster partition " + partition_name + " in " + table_id, ErrorCodes::LOGICAL_ERROR);
320 return it->second;
321 }
322
323 Stopwatch watch;
324 UInt64 bytes_copied = 0;
325 UInt64 rows_copied = 0;
326
327 template <typename RandomEngine>
328 void initShards(RandomEngine && random_engine);
329};
330
331
332struct TaskCluster
333{
334 TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
335 : task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {}
336
337 void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
338
339 /// Set (or update) settings and max_workers param
340 void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
341
342 /// Base node for all tasks. Its structure:
343 /// workers/ - directory with active workers (amount of them is less or equal max_workers)
344 /// description - node with task configuration
345 /// table_table1/ - directories with per-partition copying status
346 String task_zookeeper_path;
347
348 /// Database used to create temporary Distributed tables
349 String default_local_database;
350
351 /// Limits number of simultaneous workers
352 UInt64 max_workers = 0;
353
354 /// Base settings for pull and push
355 Settings settings_common;
356 /// Settings used to fetch data
357 Settings settings_pull;
358 /// Settings used to insert data
359 Settings settings_push;
360
361 String clusters_prefix;
362
363 /// Subtasks
364 TasksTable table_tasks;
365
366 std::random_device random_device;
367 pcg64 random_engine;
368};
369
370
371struct MultiTransactionInfo
372{
373 int32_t code;
374 Coordination::Requests requests;
375 Coordination::Responses responses;
376};
377
378// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
379std::shared_ptr<ASTStorage> createASTStorageDistributed(
380 const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr)
381{
382 auto args = std::make_shared<ASTExpressionList>();
383 args->children.emplace_back(std::make_shared<ASTLiteral>(cluster_name));
384 args->children.emplace_back(std::make_shared<ASTIdentifier>(database));
385 args->children.emplace_back(std::make_shared<ASTIdentifier>(table));
386 if (sharding_key_ast)
387 args->children.emplace_back(sharding_key_ast);
388
389 auto engine = std::make_shared<ASTFunction>();
390 engine->name = "Distributed";
391 engine->arguments = args;
392
393 auto storage = std::make_shared<ASTStorage>();
394 storage->set(storage->engine, engine);
395
396 return storage;
397}
398
399
400BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream)
401{
402 return std::make_shared<SquashingBlockInputStream>(
403 stream,
404 std::numeric_limits<size_t>::max(),
405 std::numeric_limits<size_t>::max());
406}
407
408Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream)
409{
410 return squashStreamIntoOneBlock(stream)->read();
411}
412
413
414/// Path getters
415
416String TaskTable::getPartitionPath(const String & partition_name) const
417{
418 return task_cluster.task_zookeeper_path // root
419 + "/tables/" + table_id // tables/dst_cluster.merge.hits
420 + "/" + escapeForFileName(partition_name); // 201701
421}
422
423String ShardPartition::getPartitionCleanStartPath() const
424{
425 return getPartitionPath() + "/clean_start";
426}
427
428String ShardPartition::getPartitionPath() const
429{
430 return task_shard.task_table.getPartitionPath(name);
431}
432
433String ShardPartition::getShardStatusPath() const
434{
435 // schema: /<root...>/tables/<table>/<partition>/shards/<shard>
436 // e.g. /root/table_test.hits/201701/shards/1
437 return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster());
438}
439
440String ShardPartition::getPartitionShardsPath() const
441{
442 return getPartitionPath() + "/shards";
443}
444
445String ShardPartition::getPartitionActiveWorkersPath() const
446{
447 return getPartitionPath() + "/partition_active_workers";
448}
449
450String ShardPartition::getActiveWorkerPath() const
451{
452 return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster());
453}
454
455String ShardPartition::getCommonPartitionIsDirtyPath() const
456{
457 return getPartitionPath() + "/is_dirty";
458}
459
460String ShardPartition::getCommonPartitionIsCleanedPath() const
461{
462 return getCommonPartitionIsDirtyPath() + "/cleaned";
463}
464
465String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const
466{
467 return getPartitionPath(partition_name) + "/is_dirty";
468}
469
470String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const
471{
472 return getPartitionIsDirtyPath(partition_name) + "/cleaned";
473}
474
475String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const
476{
477 return getPartitionPath(partition_name) + "/shards";
478}
479
480String DB::TaskShard::getDescription() const
481{
482 std::stringstream ss;
483 ss << "N" << numberInCluster()
484 << " (having a replica " << getHostNameExample()
485 << ", pull table " + getQuotedTable(task_table.table_pull)
486 << " of cluster " + task_table.cluster_pull_name << ")";
487 return ss.str();
488}
489
490String DB::TaskShard::getHostNameExample() const
491{
492 auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
493 return replicas.at(0).readableString();
494}
495
496
497static bool isExtendedDefinitionStorage(const ASTPtr & storage_ast)
498{
499 const auto & storage = storage_ast->as<ASTStorage &>();
500 return storage.partition_by || storage.order_by || storage.sample_by;
501}
502
503static ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
504{
505 String storage_str = queryToString(storage_ast);
506
507 const auto & storage = storage_ast->as<ASTStorage &>();
508 const auto & engine = storage.engine->as<ASTFunction &>();
509
510 if (!endsWith(engine.name, "MergeTree"))
511 {
512 throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
513 ErrorCodes::BAD_ARGUMENTS);
514 }
515
516 if (isExtendedDefinitionStorage(storage_ast))
517 {
518 if (storage.partition_by)
519 return storage.partition_by->clone();
520
521 static const char * all = "all";
522 return std::make_shared<ASTLiteral>(Field(all, strlen(all)));
523 }
524 else
525 {
526 bool is_replicated = startsWith(engine.name, "Replicated");
527 size_t min_args = is_replicated ? 3 : 1;
528
529 if (!engine.arguments)
530 throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
531
532 ASTPtr arguments_ast = engine.arguments->clone();
533 ASTs & arguments = arguments_ast->children;
534
535 if (arguments.size() < min_args)
536 throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
537
538 ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1];
539 return makeASTFunction("toYYYYMM", month_arg->clone());
540 }
541}
542
543
544TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_,
545 const String & table_key)
546: task_cluster(parent)
547{
548 String table_prefix = prefix_ + "." + table_key + ".";
549
550 name_in_config = table_key;
551
552 cluster_pull_name = config.getString(table_prefix + "cluster_pull");
553 cluster_push_name = config.getString(table_prefix + "cluster_push");
554
555 table_pull.first = config.getString(table_prefix + "database_pull");
556 table_pull.second = config.getString(table_prefix + "table_pull");
557
558 table_push.first = config.getString(table_prefix + "database_push");
559 table_push.second = config.getString(table_prefix + "table_push");
560
561 /// Used as node name in ZooKeeper
562 table_id = escapeForFileName(cluster_push_name)
563 + "." + escapeForFileName(table_push.first)
564 + "." + escapeForFileName(table_push.second);
565
566 engine_push_str = config.getString(table_prefix + "engine");
567 {
568 ParserStorage parser_storage;
569 engine_push_ast = parseQuery(parser_storage, engine_push_str, 0);
570 engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
571 }
572
573 sharding_key_str = config.getString(table_prefix + "sharding_key");
574 {
575 ParserExpressionWithOptionalAlias parser_expression(false);
576 sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0);
577 engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
578 }
579
580 where_condition_str = config.getString(table_prefix + "where_condition", "");
581 if (!where_condition_str.empty())
582 {
583 ParserExpressionWithOptionalAlias parser_expression(false);
584 where_condition_ast = parseQuery(parser_expression, where_condition_str, 0);
585
586 // Will use canonical expression form
587 where_condition_str = queryToString(where_condition_ast);
588 }
589
590 String enabled_partitions_prefix = table_prefix + "enabled_partitions";
591 has_enabled_partitions = config.has(enabled_partitions_prefix);
592
593 if (has_enabled_partitions)
594 {
595 Strings keys;
596 config.keys(enabled_partitions_prefix, keys);
597
598 if (keys.empty())
599 {
600 /// Parse list of partition from space-separated string
601 String partitions_str = config.getString(table_prefix + "enabled_partitions");
602 boost::trim_if(partitions_str, isWhitespaceASCII);
603 boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
604 }
605 else
606 {
607 /// Parse sequence of <partition>...</partition>
608 for (const String & key : keys)
609 {
610 if (!startsWith(key, "partition"))
611 throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
612
613 enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
614 }
615 }
616
617 std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
618 }
619}
620
621
622static ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random)
623{
624 ShardPriority res;
625
626 if (replicas.empty())
627 return res;
628
629 res.is_remote = 1;
630 for (auto & replica : replicas)
631 {
632 if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name)))
633 {
634 res.is_remote = 0;
635 break;
636 }
637 }
638
639 res.hostname_difference = std::numeric_limits<size_t>::max();
640 for (auto & replica : replicas)
641 {
642 size_t difference = getHostNameDifference(local_hostname, replica.host_name);
643 res.hostname_difference = std::min(difference, res.hostname_difference);
644 }
645
646 res.random = random;
647 return res;
648}
649
650template<typename RandomEngine>
651void TaskTable::initShards(RandomEngine && random_engine)
652{
653 const String & fqdn_name = getFQDNOrHostName();
654 std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
655
656 // Compute the priority
657 for (auto & shard_info : cluster_pull->getShardsInfo())
658 {
659 TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
660 const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
661 task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
662
663 all_shards.emplace_back(task_shard);
664 }
665
666 // Sort by priority
667 std::sort(all_shards.begin(), all_shards.end(),
668 [] (const TaskShardPtr & lhs, const TaskShardPtr & rhs)
669 {
670 return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
671 });
672
673 // Cut local shards
674 auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
675 [] (const TaskShardPtr & lhs, UInt8 is_remote)
676 {
677 return lhs->priority.is_remote < is_remote;
678 });
679
680 local_shards.assign(all_shards.begin(), it_first_remote);
681}
682
683
684void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key)
685{
686 String prefix = base_key.empty() ? "" : base_key + ".";
687
688 clusters_prefix = prefix + "remote_servers";
689 if (!config.has(clusters_prefix))
690 throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS);
691
692 Poco::Util::AbstractConfiguration::Keys tables_keys;
693 config.keys(prefix + "tables", tables_keys);
694
695 for (const auto & table_key : tables_keys)
696 {
697 table_tasks.emplace_back(*this, config, prefix + "tables", table_key);
698 }
699}
700
701void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key)
702{
703 String prefix = base_key.empty() ? "" : base_key + ".";
704
705 max_workers = config.getUInt64(prefix + "max_workers");
706
707 settings_common = Settings();
708 if (config.has(prefix + "settings"))
709 settings_common.loadSettingsFromConfig(prefix + "settings", config);
710
711 settings_pull = settings_common;
712 if (config.has(prefix + "settings_pull"))
713 settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
714
715 settings_push = settings_common;
716 if (config.has(prefix + "settings_push"))
717 settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
718
719 auto set_default_value = [] (auto && setting, auto && default_value)
720 {
721 setting = setting.changed ? setting.value : default_value;
722 };
723
724 /// Override important settings
725 settings_pull.readonly = 1;
726 settings_push.insert_distributed_sync = 1;
727 set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
728 set_default_value(settings_pull.max_threads, 1);
729 set_default_value(settings_pull.max_block_size, 8192UL);
730 set_default_value(settings_pull.preferred_block_size_bytes, 0);
731 set_default_value(settings_push.insert_distributed_timeout, 0);
732}
733
734
735} // end of an anonymous namespace
736
737
738class ClusterCopier
739{
740public:
741
742 ClusterCopier(const String & task_path_,
743 const String & host_id_,
744 const String & proxy_database_name_,
745 Context & context_)
746 :
747 task_zookeeper_path(task_path_),
748 host_id(host_id_),
749 working_database_name(proxy_database_name_),
750 context(context_),
751 log(&Poco::Logger::get("ClusterCopier"))
752 {
753 }
754
755 void init()
756 {
757 auto zookeeper = context.getZooKeeper();
758
759 task_description_watch_callback = [this] (const Coordination::WatchResponse & response)
760 {
761 if (response.error != Coordination::ZOK)
762 return;
763 UInt64 version = ++task_descprtion_version;
764 LOG_DEBUG(log, "Task description should be updated, local version " << version);
765 };
766
767 task_description_path = task_zookeeper_path + "/description";
768 task_cluster = std::make_unique<TaskCluster>(task_zookeeper_path, working_database_name);
769
770 reloadTaskDescription();
771 task_cluster_initial_config = task_cluster_current_config;
772
773 task_cluster->loadTasks(*task_cluster_initial_config);
774 context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);
775
776 /// Set up shards and their priority
777 task_cluster->random_engine.seed(task_cluster->random_device());
778 for (auto & task_table : task_cluster->table_tasks)
779 {
780 task_table.cluster_pull = context.getCluster(task_table.cluster_pull_name);
781 task_table.cluster_push = context.getCluster(task_table.cluster_push_name);
782 task_table.initShards(task_cluster->random_engine);
783 }
784
785 LOG_DEBUG(log, "Will process " << task_cluster->table_tasks.size() << " table tasks");
786
787 /// Do not initialize tables, will make deferred initialization in process()
788
789 zookeeper->createAncestors(getWorkersPathVersion() + "/");
790 zookeeper->createAncestors(getWorkersPath() + "/");
791 }
792
793 template <typename T>
794 decltype(auto) retry(T && func, UInt64 max_tries = 100)
795 {
796 std::exception_ptr exception;
797
798 for (UInt64 try_number = 1; try_number <= max_tries; ++try_number)
799 {
800 try
801 {
802 return func();
803 }
804 catch (...)
805 {
806 exception = std::current_exception();
807 if (try_number < max_tries)
808 {
809 tryLogCurrentException(log, "Will retry");
810 std::this_thread::sleep_for(default_sleep_time);
811 }
812 }
813 }
814
815 std::rethrow_exception(exception);
816 }
817
818
819 void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard)
820 {
821 TaskTable & task_table = task_shard->task_table;
822
823 LOG_INFO(log, "Discover partitions of shard " << task_shard->getDescription());
824
825 auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); };
826 auto existing_partitions_names = retry(get_partitions, 60);
827 Strings filtered_partitions_names;
828 Strings missing_partitions;
829
830 /// Check that user specified correct partition names
831 auto check_partition_format = [] (const DataTypePtr & type, const String & partition_text_quoted)
832 {
833 MutableColumnPtr column_dummy = type->createColumn();
834 ReadBufferFromString rb(partition_text_quoted);
835
836 try
837 {
838 type->deserializeAsTextQuoted(*column_dummy, rb, FormatSettings());
839 }
840 catch (Exception & e)
841 {
842 throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
843 }
844 };
845
846 if (task_table.has_enabled_partitions)
847 {
848 /// Process partition in order specified by <enabled_partitions/>
849 for (const String & partition_name : task_table.enabled_partitions)
850 {
851 /// Check that user specified correct partition names
852 check_partition_format(task_shard->partition_key_column.type, partition_name);
853
854 auto it = existing_partitions_names.find(partition_name);
855
856 /// Do not process partition if it is not in enabled_partitions list
857 if (it == existing_partitions_names.end())
858 {
859 missing_partitions.emplace_back(partition_name);
860 continue;
861 }
862
863 filtered_partitions_names.emplace_back(*it);
864 }
865
866 for (const String & partition_name : existing_partitions_names)
867 {
868 if (!task_table.enabled_partitions_set.count(partition_name))
869 {
870 LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in "
871 << "enabled_partitions of " << task_table.table_id);
872 }
873 }
874 }
875 else
876 {
877 for (const String & partition_name : existing_partitions_names)
878 filtered_partitions_names.emplace_back(partition_name);
879 }
880
881 for (const String & partition_name : filtered_partitions_names)
882 {
883 task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name));
884 task_shard->checked_partitions.emplace(partition_name, true);
885 }
886
887 if (!missing_partitions.empty())
888 {
889 std::stringstream ss;
890 for (const String & missing_partition : missing_partitions)
891 ss << " " << missing_partition;
892
893 LOG_WARNING(log, "There are no " << missing_partitions.size() << " partitions from enabled_partitions in shard "
894 << task_shard->getDescription() << " :" << ss.str());
895 }
896
897 LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription());
898 }
899
900 /// Compute set of partitions, assume set of partitions aren't changed during the processing
901 void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0)
902 {
903 /// Fetch partitions list from a shard
904 {
905 ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
906
907 for (const TaskShardPtr & task_shard : task_table.all_shards)
908 thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
909
910 LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
911 thread_pool.wait();
912 }
913 }
914
915 void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
916 {
917 auto local_task_description_path = task_path + "/description";
918
919 String task_config_str;
920 {
921 ReadBufferFromFile in(task_file);
922 readStringUntilEOF(task_config_str, in);
923 }
924 if (task_config_str.empty())
925 return;
926
927 auto zookeeper = context.getZooKeeper();
928
929 zookeeper->createAncestors(local_task_description_path);
930 auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
931 if (code && force)
932 zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
933
934 LOG_DEBUG(log, "Task description " << ((code && !force) ? "not " : "") << "uploaded to " << local_task_description_path << " with result " << code << " ("<< zookeeper->error2string(code) << ")");
935 }
936
937 void reloadTaskDescription()
938 {
939 auto zookeeper = context.getZooKeeper();
940 task_description_watch_zookeeper = zookeeper;
941
942 String task_config_str;
943 Coordination::Stat stat;
944 int code;
945
946 zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
947 if (code)
948 throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
949
950 LOG_DEBUG(log, "Loading description, zxid=" << task_descprtion_current_stat.czxid);
951 auto config = getConfigurationFromXMLString(task_config_str);
952
953 /// Setup settings
954 task_cluster->reloadSettings(*config);
955 context.getSettingsRef() = task_cluster->settings_common;
956
957 task_cluster_current_config = config;
958 task_descprtion_current_stat = stat;
959 }
960
961 void updateConfigIfNeeded()
962 {
963 UInt64 version_to_update = task_descprtion_version;
964 bool is_outdated_version = task_descprtion_current_version != version_to_update;
965 bool is_expired_session = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired();
966
967 if (!is_outdated_version && !is_expired_session)
968 return;
969
970 LOG_DEBUG(log, "Updating task description");
971 reloadTaskDescription();
972
973 task_descprtion_current_version = version_to_update;
974 }
975
976 void process(const ConnectionTimeouts & timeouts)
977 {
978 for (TaskTable & task_table : task_cluster->table_tasks)
979 {
980 LOG_INFO(log, "Process table task " << task_table.table_id << " with "
981 << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones");
982
983 if (task_table.all_shards.empty())
984 continue;
985
986 /// Discover partitions of each shard and total set of partitions
987 if (!task_table.has_enabled_partitions)
988 {
989 /// If there are no specified enabled_partitions, we must discover them manually
990 discoverTablePartitions(timeouts, task_table);
991
992 /// After partitions of each shard are initialized, initialize cluster partitions
993 for (const TaskShardPtr & task_shard : task_table.all_shards)
994 {
995 for (const auto & partition_elem : task_shard->partition_tasks)
996 {
997 const String & partition_name = partition_elem.first;
998 task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
999 }
1000 }
1001
1002 for (auto & partition_elem : task_table.cluster_partitions)
1003 {
1004 const String & partition_name = partition_elem.first;
1005
1006 for (const TaskShardPtr & task_shard : task_table.all_shards)
1007 task_shard->checked_partitions.emplace(partition_name);
1008
1009 task_table.ordered_partition_names.emplace_back(partition_name);
1010 }
1011 }
1012 else
1013 {
1014 /// If enabled_partitions are specified, assume that each shard has all partitions
1015 /// We will refine partition set of each shard in future
1016
1017 for (const String & partition_name : task_table.enabled_partitions)
1018 {
1019 task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
1020 task_table.ordered_partition_names.emplace_back(partition_name);
1021 }
1022 }
1023
1024 task_table.watch.restart();
1025
1026 /// Retry table processing
1027 bool table_is_done = false;
1028 for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
1029 {
1030 if (tryProcessTable(timeouts, task_table))
1031 {
1032 table_is_done = true;
1033 break;
1034 }
1035 }
1036
1037 if (!table_is_done)
1038 {
1039 throw Exception("Too many tries to process table " + task_table.table_id + ". Abort remaining execution",
1040 ErrorCodes::UNFINISHED);
1041 }
1042 }
1043 }
1044
1045 /// Disables DROP PARTITION commands that used to clear data after errors
1046 void setSafeMode(bool is_safe_mode_ = true)
1047 {
1048 is_safe_mode = is_safe_mode_;
1049 }
1050
1051 void setCopyFaultProbability(double copy_fault_probability_)
1052 {
1053 copy_fault_probability = copy_fault_probability_;
1054 }
1055
1056
1057protected:
1058
1059 String getWorkersPath() const
1060 {
1061 return task_cluster->task_zookeeper_path + "/task_active_workers";
1062 }
1063
1064 String getWorkersPathVersion() const
1065 {
1066 return getWorkersPath() + "_version";
1067 }
1068
1069 String getCurrentWorkerNodePath() const
1070 {
1071 return getWorkersPath() + "/" + host_id;
1072 }
1073
1074 zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(
1075 const zkutil::ZooKeeperPtr & zookeeper,
1076 const String & description,
1077 bool unprioritized)
1078 {
1079 std::chrono::milliseconds current_sleep_time = default_sleep_time;
1080 static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
1081
1082 if (unprioritized)
1083 std::this_thread::sleep_for(current_sleep_time);
1084
1085 String workers_version_path = getWorkersPathVersion();
1086 String workers_path = getWorkersPath();
1087 String current_worker_path = getCurrentWorkerNodePath();
1088
1089 UInt64 num_bad_version_errors = 0;
1090
1091 while (true)
1092 {
1093 updateConfigIfNeeded();
1094
1095 Coordination::Stat stat;
1096 zookeeper->get(workers_version_path, &stat);
1097 auto version = stat.version;
1098 zookeeper->get(workers_path, &stat);
1099
1100 if (static_cast<UInt64>(stat.numChildren) >= task_cluster->max_workers)
1101 {
1102 LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
1103 << ". Postpone processing " << description);
1104
1105 if (unprioritized)
1106 current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
1107
1108 std::this_thread::sleep_for(current_sleep_time);
1109 num_bad_version_errors = 0;
1110 }
1111 else
1112 {
1113 Coordination::Requests ops;
1114 ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
1115 ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
1116 Coordination::Responses responses;
1117 auto code = zookeeper->tryMulti(ops, responses);
1118
1119 if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS)
1120 return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
1121
1122 if (code == Coordination::ZBADVERSION)
1123 {
1124 ++num_bad_version_errors;
1125
1126 /// Try to make fast retries
1127 if (num_bad_version_errors > 3)
1128 {
1129 LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
1130 std::chrono::milliseconds random_sleep_time(std::uniform_int_distribution<int>(1, 1000)(task_cluster->random_engine));
1131 std::this_thread::sleep_for(random_sleep_time);
1132 num_bad_version_errors = 0;
1133 }
1134 }
1135 else
1136 throw Coordination::Exception(code);
1137 }
1138 }
1139 }
1140
1141 /** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
1142 * State of some task could change during the processing.
1143 * We have to ensure that all shards have the finished state and there is no dirty flag.
1144 * Moreover, we have to check status twice and check zxid, because state can change during the checking.
1145 */
1146 bool checkPartitionIsDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition)
1147 {
1148 LOG_DEBUG(log, "Check that all shards processed partition " << partition_name << " successfully");
1149
1150 auto zookeeper = context.getZooKeeper();
1151
1152 Strings status_paths;
1153 for (auto & shard : shards_with_partition)
1154 {
1155 ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second;
1156 status_paths.emplace_back(task_shard_partition.getShardStatusPath());
1157 }
1158
1159 std::vector<int64_t> zxid1, zxid2;
1160
1161 try
1162 {
1163 std::vector<zkutil::ZooKeeper::FutureGet> get_futures;
1164 for (const String & path : status_paths)
1165 get_futures.emplace_back(zookeeper->asyncGet(path));
1166
1167 // Check that state is Finished and remember zxid
1168 for (auto & future : get_futures)
1169 {
1170 auto res = future.get();
1171
1172 TaskStateWithOwner status = TaskStateWithOwner::fromString(res.data);
1173 if (status.state != TaskState::Finished)
1174 {
1175 LOG_INFO(log, "The task " << res.data << " is being rewritten by " << status.owner << ". Partition will be rechecked");
1176 return false;
1177 }
1178
1179 zxid1.push_back(res.stat.pzxid);
1180 }
1181
1182 // Check that partition is not dirty
1183 {
1184 CleanStateClock clean_state_clock (
1185 zookeeper,
1186 task_table.getPartitionIsDirtyPath(partition_name),
1187 task_table.getPartitionIsCleanedPath(partition_name)
1188 );
1189 Coordination::Stat stat;
1190 LogicalClock task_start_clock;
1191 if (zookeeper->exists(task_table.getPartitionTaskStatusPath(partition_name), &stat))
1192 task_start_clock = LogicalClock(stat.mzxid);
1193 zookeeper->get(task_table.getPartitionTaskStatusPath(partition_name), &stat);
1194 if (!clean_state_clock.is_clean() || task_start_clock <= clean_state_clock.discovery_zxid)
1195 {
1196 LOG_INFO(log, "Partition " << partition_name << " become dirty");
1197 return false;
1198 }
1199 }
1200
1201 get_futures.clear();
1202 for (const String & path : status_paths)
1203 get_futures.emplace_back(zookeeper->asyncGet(path));
1204
1205 // Remember zxid of states again
1206 for (auto & future : get_futures)
1207 {
1208 auto res = future.get();
1209 zxid2.push_back(res.stat.pzxid);
1210 }
1211 }
1212 catch (const Coordination::Exception & e)
1213 {
1214 LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name
1215 << ". Will recheck the partition. Error: " << e.displayText());
1216 return false;
1217 }
1218
1219 // If all task is finished and zxid is not changed then partition could not become dirty again
1220 for (UInt64 shard_num = 0; shard_num < status_paths.size(); ++shard_num)
1221 {
1222 if (zxid1[shard_num] != zxid2[shard_num])
1223 {
1224 LOG_INFO(log, "The task " << status_paths[shard_num] << " is being modified now. Partition will be rechecked");
1225 return false;
1226 }
1227 }
1228
1229 LOG_INFO(log, "Partition " << partition_name << " is copied successfully");
1230 return true;
1231 }
1232
1233 /// Removes MATERIALIZED and ALIAS columns from create table query
1234 static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
1235 {
1236 const ASTs & column_asts = query_ast->as<ASTCreateQuery &>().columns_list->columns->children;
1237 auto new_columns = std::make_shared<ASTExpressionList>();
1238
1239 for (const ASTPtr & column_ast : column_asts)
1240 {
1241 const auto & column = column_ast->as<ASTColumnDeclaration &>();
1242
1243 if (!column.default_specifier.empty())
1244 {
1245 ColumnDefaultKind kind = columnDefaultKindFromString(column.default_specifier);
1246 if (kind == ColumnDefaultKind::Materialized || kind == ColumnDefaultKind::Alias)
1247 continue;
1248 }
1249
1250 new_columns->children.emplace_back(column_ast->clone());
1251 }
1252
1253 ASTPtr new_query_ast = query_ast->clone();
1254 auto & new_query = new_query_ast->as<ASTCreateQuery &>();
1255
1256 auto new_columns_list = std::make_shared<ASTColumns>();
1257 new_columns_list->set(new_columns_list->columns, new_columns);
1258 if (auto indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
1259 new_columns_list->set(new_columns_list->indices, indices->clone());
1260
1261 new_query.replace(new_query.columns_list, new_columns_list);
1262
1263 return new_query_ast;
1264 }
1265
1266 /// Replaces ENGINE and table name in a create query
1267 std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table, const ASTPtr & new_storage_ast)
1268 {
1269 const auto & create = create_query_ast->as<ASTCreateQuery &>();
1270 auto res = std::make_shared<ASTCreateQuery>(create);
1271
1272 if (create.storage == nullptr || new_storage_ast == nullptr)
1273 throw Exception("Storage is not specified", ErrorCodes::LOGICAL_ERROR);
1274
1275 res->database = new_table.first;
1276 res->table = new_table.second;
1277
1278 res->children.clear();
1279 res->set(res->columns_list, create.columns_list->clone());
1280 res->set(res->storage, new_storage_ast->clone());
1281
1282 return res;
1283 }
1284
1285 /** Allows to compare two incremental counters of type UInt32 in presence of possible overflow.
1286 * We assume that we compare values that are not too far away.
1287 * For example, when we increment 0xFFFFFFFF, we get 0. So, 0xFFFFFFFF is less than 0.
1288 */
1289 class WrappingUInt32
1290 {
1291 public:
1292 UInt32 value;
1293
1294 WrappingUInt32(UInt32 _value)
1295 : value(_value)
1296 {}
1297
1298 bool operator<(const WrappingUInt32 & other) const
1299 {
1300 return value != other.value && *this <= other;
1301 }
1302
1303 bool operator<=(const WrappingUInt32 & other) const
1304 {
1305 const UInt32 HALF = 1 << 31;
1306 return (value <= other.value && other.value - value < HALF)
1307 || (value > other.value && value - other.value > HALF);
1308 }
1309
1310 bool operator==(const WrappingUInt32 & other) const
1311 {
1312 return value == other.value;
1313 }
1314 };
1315
1316 /** Conforming Zxid definition.
1317 * cf. https://github.com/apache/zookeeper/blob/631d1b284f0edb1c4f6b0fb221bf2428aec71aaa/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md#guarantees-properties-and-definitions
1318 */
1319 class Zxid
1320 {
1321 public:
1322 WrappingUInt32 epoch;
1323 WrappingUInt32 counter;
1324 Zxid(UInt64 _zxid)
1325 : epoch(_zxid >> 32)
1326 , counter(_zxid)
1327 {}
1328
1329 bool operator<=(const Zxid & other) const
1330 {
1331 return (epoch < other.epoch)
1332 || (epoch == other.epoch && counter <= other.counter);
1333 }
1334
1335 bool operator==(const Zxid & other) const
1336 {
1337 return epoch == other.epoch && counter == other.counter;
1338 }
1339 };
1340
1341 class LogicalClock
1342 {
1343 public:
1344 std::optional<Zxid> zxid;
1345
1346 LogicalClock() = default;
1347
1348 LogicalClock(UInt64 _zxid)
1349 : zxid(_zxid)
1350 {}
1351
1352 bool hasHappened() const
1353 {
1354 return bool(zxid);
1355 }
1356
1357 // happens-before relation with a reasonable time bound
1358 bool happensBefore(const LogicalClock & other) const
1359 {
1360 return !zxid
1361 || (other.zxid && *zxid <= *other.zxid);
1362 }
1363
1364 bool operator<=(const LogicalClock & other) const
1365 {
1366 return happensBefore(other);
1367 }
1368
1369 // strict equality check
1370 bool operator==(const LogicalClock & other) const
1371 {
1372 return zxid == other.zxid;
1373 }
1374 };
1375
1376 class CleanStateClock
1377 {
1378 public:
1379 LogicalClock discovery_zxid;
1380 std::optional<UInt32> discovery_version;
1381
1382 LogicalClock clean_state_zxid;
1383 std::optional<UInt32> clean_state_version;
1384
1385 std::shared_ptr<std::atomic_bool> stale;
1386
1387 bool is_clean() const
1388 {
1389 return
1390 !is_stale()
1391 && (
1392 !discovery_zxid.hasHappened()
1393 || (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
1394 }
1395
1396 bool is_stale() const
1397 {
1398 return stale->load();
1399 }
1400
1401 CleanStateClock(
1402 const zkutil::ZooKeeperPtr & zookeeper,
1403 const String & discovery_path,
1404 const String & clean_state_path)
1405 : stale(std::make_shared<std::atomic_bool>(false))
1406 {
1407 Coordination::Stat stat;
1408 String _some_data;
1409 auto watch_callback =
1410 [stale = stale] (const Coordination::WatchResponse & rsp)
1411 {
1412 auto logger = &Poco::Logger::get("ClusterCopier");
1413 if (rsp.error == Coordination::ZOK)
1414 {
1415 switch (rsp.type)
1416 {
1417 case Coordination::CREATED:
1418 LOG_DEBUG(logger, "CleanStateClock change: CREATED, at " << rsp.path);
1419 stale->store(true);
1420 break;
1421 case Coordination::CHANGED:
1422 LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at" << rsp.path);
1423 stale->store(true);
1424 }
1425 }
1426 };
1427 if (zookeeper->tryGetWatch(discovery_path, _some_data, &stat, watch_callback))
1428 {
1429 discovery_zxid = LogicalClock(stat.mzxid);
1430 discovery_version = stat.version;
1431 }
1432 if (zookeeper->tryGetWatch(clean_state_path, _some_data, &stat, watch_callback))
1433 {
1434 clean_state_zxid = LogicalClock(stat.mzxid);
1435 clean_state_version = stat.version;
1436 }
1437 }
1438
1439 bool operator==(const CleanStateClock & other) const
1440 {
1441 return !is_stale()
1442 && !other.is_stale()
1443 && discovery_zxid == other.discovery_zxid
1444 && discovery_version == other.discovery_version
1445 && clean_state_zxid == other.clean_state_zxid
1446 && clean_state_version == other.clean_state_version;
1447 }
1448
1449 bool operator!=(const CleanStateClock & other) const
1450 {
1451 return !(*this == other);
1452 }
1453 };
1454
1455 bool tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock)
1456 {
1457 if (is_safe_mode)
1458 throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED);
1459
1460 TaskTable & task_table = task_partition.task_shard.task_table;
1461
1462 const String current_shards_path = task_partition.getPartitionShardsPath();
1463 const String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath();
1464 const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
1465 const String dirt_cleaner_path = is_dirty_flag_path + "/cleaner";
1466 const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath();
1467
1468 zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
1469 try
1470 {
1471 cleaner_holder = zkutil::EphemeralNodeHolder::create(dirt_cleaner_path, *zookeeper, host_id);
1472 }
1473 catch (const Coordination::Exception & e)
1474 {
1475 if (e.code == Coordination::ZNODEEXISTS)
1476 {
1477 LOG_DEBUG(log, "Partition " << task_partition.name << " is cleaning now by somebody, sleep");
1478 std::this_thread::sleep_for(default_sleep_time);
1479 return false;
1480 }
1481
1482 throw;
1483 }
1484
1485 Coordination::Stat stat;
1486 if (zookeeper->exists(current_partition_active_workers_dir, &stat))
1487 {
1488 if (stat.numChildren != 0)
1489 {
1490 LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers while trying to drop it. Going to sleep.");
1491 std::this_thread::sleep_for(default_sleep_time);
1492 return false;
1493 }
1494 else
1495 {
1496 zookeeper->remove(current_partition_active_workers_dir);
1497 }
1498 }
1499
1500 {
1501 zkutil::EphemeralNodeHolder::Ptr active_workers_lock;
1502 try
1503 {
1504 active_workers_lock = zkutil::EphemeralNodeHolder::create(current_partition_active_workers_dir, *zookeeper, host_id);
1505 }
1506 catch (const Coordination::Exception & e)
1507 {
1508 if (e.code == Coordination::ZNODEEXISTS)
1509 {
1510 LOG_DEBUG(log, "Partition " << task_partition.name << " is being filled now by somebody, sleep");
1511 return false;
1512 }
1513
1514 throw;
1515 }
1516
1517 // Lock the dirty flag
1518 zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value());
1519 zookeeper->tryRemove(task_partition.getPartitionCleanStartPath());
1520 CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
1521
1522 /// Remove all status nodes
1523 {
1524 Strings children;
1525 if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::ZOK)
1526 for (const auto & child : children)
1527 {
1528 zookeeper->removeRecursive(current_shards_path + "/" + child);
1529 }
1530 }
1531
1532 String query = "ALTER TABLE " + getQuotedTable(task_table.table_push);
1533 query += " DROP PARTITION " + task_partition.name + "";
1534
1535 /// TODO: use this statement after servers will be updated up to 1.1.54310
1536 // query += " DROP PARTITION ID '" + task_partition.name + "'";
1537
1538 ClusterPtr & cluster_push = task_table.cluster_push;
1539 Settings settings_push = task_cluster->settings_push;
1540
1541 /// It is important, DROP PARTITION must be done synchronously
1542 settings_push.replication_alter_partitions_sync = 2;
1543
1544 LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
1545 /// Limit number of max executing replicas to 1
1546 UInt64 num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ONE, 1);
1547
1548 if (num_shards < cluster_push->getShardCount())
1549 {
1550 LOG_INFO(log, "DROP PARTITION wasn't successfully executed on " << cluster_push->getShardCount() - num_shards << " shards");
1551 return false;
1552 }
1553
1554 /// Update the locking node
1555 if (!my_clock.is_stale())
1556 {
1557 zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value());
1558 if (my_clock.clean_state_version)
1559 zookeeper->set(is_dirt_cleaned_path, host_id, my_clock.clean_state_version.value());
1560 else
1561 zookeeper->create(is_dirt_cleaned_path, host_id, zkutil::CreateMode::Persistent);
1562 }
1563 else
1564 {
1565 LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
1566 /// clean state is stale
1567 return false;
1568 }
1569
1570 LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name);
1571 if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::ZNODEEXISTS)
1572 zookeeper->set(current_shards_path, host_id);
1573 }
1574
1575 LOG_INFO(log, "Partition " << task_partition.name << " is safe for work now.");
1576 return true;
1577 }
1578
1579
1580 static constexpr UInt64 max_table_tries = 1000;
1581 static constexpr UInt64 max_shard_partition_tries = 600;
1582
1583 bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
1584 {
1585 /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
1586 bool previous_shard_is_instantly_finished = false;
1587
1588 /// Process each partition that is present in cluster
1589 for (const String & partition_name : task_table.ordered_partition_names)
1590 {
1591 if (!task_table.cluster_partitions.count(partition_name))
1592 throw Exception("There are no expected partition " + partition_name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
1593
1594 ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
1595
1596 Stopwatch watch;
1597 TasksShard expected_shards;
1598 UInt64 num_failed_shards = 0;
1599
1600 ++cluster_partition.total_tries;
1601
1602 LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster");
1603
1604 /// Process each source shard having current partition and copy current partition
1605 /// NOTE: shards are sorted by "distance" to current host
1606 bool has_shard_to_process = false;
1607 for (const TaskShardPtr & shard : task_table.all_shards)
1608 {
1609 /// Does shard have a node with current partition?
1610 if (shard->partition_tasks.count(partition_name) == 0)
1611 {
1612 /// If not, did we check existence of that partition previously?
1613 if (shard->checked_partitions.count(partition_name) == 0)
1614 {
1615 auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); };
1616 bool has_partition = retry(check_shard_has_partition);
1617
1618 shard->checked_partitions.emplace(partition_name);
1619
1620 if (has_partition)
1621 {
1622 shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name));
1623 LOG_DEBUG(log, "Discovered partition " << partition_name << " in shard " << shard->getDescription());
1624 }
1625 else
1626 {
1627 LOG_DEBUG(log, "Found that shard " << shard->getDescription() << " does not contain current partition " << partition_name);
1628 continue;
1629 }
1630 }
1631 else
1632 {
1633 /// We have already checked that partition, but did not discover it
1634 previous_shard_is_instantly_finished = true;
1635 continue;
1636 }
1637 }
1638
1639 auto it_shard_partition = shard->partition_tasks.find(partition_name);
1640 if (it_shard_partition == shard->partition_tasks.end())
1641 throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
1642 auto & partition = it_shard_partition->second;
1643
1644 expected_shards.emplace_back(shard);
1645
1646 /// Do not sleep if there is a sequence of already processed shards to increase startup
1647 bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
1648 PartitionTaskStatus task_status = PartitionTaskStatus::Error;
1649 bool was_error = false;
1650 has_shard_to_process = true;
1651 for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
1652 {
1653 task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
1654
1655 /// Exit if success
1656 if (task_status == PartitionTaskStatus::Finished)
1657 break;
1658
1659 was_error = true;
1660
1661 /// Skip if the task is being processed by someone
1662 if (task_status == PartitionTaskStatus::Active)
1663 break;
1664
1665 /// Repeat on errors
1666 std::this_thread::sleep_for(default_sleep_time);
1667 }
1668
1669 if (task_status == PartitionTaskStatus::Error)
1670 ++num_failed_shards;
1671
1672 previous_shard_is_instantly_finished = !was_error;
1673 }
1674
1675 cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();
1676
1677 /// Check that whole cluster partition is done
1678 /// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
1679 bool partition_is_done = num_failed_shards == 0;
1680 try
1681 {
1682 partition_is_done =
1683 !has_shard_to_process
1684 || (partition_is_done && checkPartitionIsDone(task_table, partition_name, expected_shards));
1685 }
1686 catch (...)
1687 {
1688 tryLogCurrentException(log);
1689 partition_is_done = false;
1690 }
1691
1692 if (partition_is_done)
1693 {
1694 task_table.finished_cluster_partitions.emplace(partition_name);
1695
1696 task_table.bytes_copied += cluster_partition.bytes_copied;
1697 task_table.rows_copied += cluster_partition.rows_copied;
1698 double elapsed = cluster_partition.elapsed_time_seconds;
1699
1700 LOG_INFO(log, "It took " << std::fixed << std::setprecision(2) << elapsed << " seconds to copy partition " << partition_name
1701 << ": " << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied) << " uncompressed bytes"
1702 << ", " << formatReadableQuantity(cluster_partition.rows_copied) << " rows"
1703 << " and " << cluster_partition.blocks_copied << " source blocks are copied");
1704
1705 if (cluster_partition.rows_copied)
1706 {
1707 LOG_INFO(log, "Average partition speed: "
1708 << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed) << " per second.");
1709 }
1710
1711 if (task_table.rows_copied)
1712 {
1713 LOG_INFO(log, "Average table " << task_table.table_id << " speed: "
1714 << formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed) << " per second.");
1715 }
1716 }
1717 }
1718
1719 UInt64 required_partitions = task_table.cluster_partitions.size();
1720 UInt64 finished_partitions = task_table.finished_cluster_partitions.size();
1721 bool table_is_done = finished_partitions >= required_partitions;
1722
1723 if (!table_is_done)
1724 {
1725 LOG_INFO(log, "Table " + task_table.table_id + " is not processed yet."
1726 << "Copied " << finished_partitions << " of " << required_partitions << ", will retry");
1727 }
1728
1729 return table_is_done;
1730 }
1731
1732
1733 /// Execution status of a task
1734 enum class PartitionTaskStatus
1735 {
1736 Active,
1737 Finished,
1738 Error,
1739 };
1740
1741 PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
1742 {
1743 PartitionTaskStatus res;
1744
1745 try
1746 {
1747 res = processPartitionTaskImpl(timeouts, task_partition, is_unprioritized_task);
1748 }
1749 catch (...)
1750 {
1751 tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
1752 res = PartitionTaskStatus::Error;
1753 }
1754
1755 /// At the end of each task check if the config is updated
1756 try
1757 {
1758 updateConfigIfNeeded();
1759 }
1760 catch (...)
1761 {
1762 tryLogCurrentException(log, "An error occurred while updating the config");
1763 }
1764
1765 return res;
1766 }
1767
1768 PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
1769 {
1770 TaskShard & task_shard = task_partition.task_shard;
1771 TaskTable & task_table = task_shard.task_table;
1772 ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
1773
1774 /// We need to update table definitions for each partition, it could be changed after ALTER
1775 createShardInternalTables(timeouts, task_shard);
1776
1777 auto zookeeper = context.getZooKeeper();
1778
1779 const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
1780 const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath();
1781 const String current_task_is_active_path = task_partition.getActiveWorkerPath();
1782 const String current_task_status_path = task_partition.getShardStatusPath();
1783
1784 /// Auxiliary functions:
1785
1786 /// Creates is_dirty node to initialize DROP PARTITION
1787 auto create_is_dirty_node = [&, this] (const CleanStateClock & clock)
1788 {
1789 if (clock.is_stale())
1790 LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
1791 else if (!clock.is_clean())
1792 LOG_DEBUG(log, "Thank you, Captain Obvious");
1793 else if (clock.discovery_version)
1794 {
1795 LOG_DEBUG(log, "Updating clean state clock");
1796 zookeeper->set(is_dirty_flag_path, host_id, clock.discovery_version.value());
1797 }
1798 else
1799 {
1800 LOG_DEBUG(log, "Creating clean state clock");
1801 zookeeper->create(is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
1802 }
1803 };
1804
1805 /// Returns SELECT query filtering current partition and applying user filter
1806 auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "")
1807 {
1808 String query;
1809 query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
1810 /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
1811 query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
1812 if (!task_table.where_condition_str.empty())
1813 query += " AND (" + task_table.where_condition_str + ")";
1814 if (!limit.empty())
1815 query += " LIMIT " + limit;
1816
1817 ParserQuery p_query(query.data() + query.size());
1818 return parseQuery(p_query, query, 0);
1819 };
1820
1821 /// Load balancing
1822 auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, is_unprioritized_task);
1823
1824 LOG_DEBUG(log, "Processing " << current_task_status_path);
1825
1826 CleanStateClock clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
1827
1828 LogicalClock task_start_clock;
1829 {
1830 Coordination::Stat stat;
1831 if (zookeeper->exists(task_partition.getPartitionShardsPath(), &stat))
1832 task_start_clock = LogicalClock(stat.mzxid);
1833 }
1834
1835 /// Do not start if partition is dirty, try to clean it
1836 if (clean_state_clock.is_clean()
1837 && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock))
1838 {
1839 LOG_DEBUG(log, "Partition " << task_partition.name << " appears to be clean");
1840 zookeeper->createAncestors(current_task_status_path);
1841 }
1842 else
1843 {
1844 LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it");
1845
1846 try
1847 {
1848 tryDropPartition(task_partition, zookeeper, clean_state_clock);
1849 }
1850 catch (...)
1851 {
1852 tryLogCurrentException(log, "An error occurred when clean partition");
1853 }
1854
1855 return PartitionTaskStatus::Error;
1856 }
1857
1858 /// Create ephemeral node to mark that we are active and process the partition
1859 zookeeper->createAncestors(current_task_is_active_path);
1860 zkutil::EphemeralNodeHolderPtr partition_task_node_holder;
1861 try
1862 {
1863 partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_is_active_path, *zookeeper, host_id);
1864 }
1865 catch (const Coordination::Exception & e)
1866 {
1867 if (e.code == Coordination::ZNODEEXISTS)
1868 {
1869 LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path);
1870 return PartitionTaskStatus::Active;
1871 }
1872
1873 throw;
1874 }
1875
1876 /// Exit if task has been already processed;
1877 /// create blocking node to signal cleaning up if it is abandoned
1878 {
1879 String status_data;
1880 if (zookeeper->tryGet(current_task_status_path, status_data))
1881 {
1882 TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
1883 if (status.state == TaskState::Finished)
1884 {
1885 LOG_DEBUG(log, "Task " << current_task_status_path << " has been successfully executed by " << status.owner);
1886 return PartitionTaskStatus::Finished;
1887 }
1888
1889 // Task is abandoned, initialize DROP PARTITION
1890 LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner << ". Partition will be dropped and refilled.");
1891
1892 create_is_dirty_node(clean_state_clock);
1893 return PartitionTaskStatus::Error;
1894 }
1895 }
1896
1897 /// Check that destination partition is empty if we are first worker
1898 /// NOTE: this check is incorrect if pull and push tables have different partition key!
1899 String clean_start_status;
1900 if (!zookeeper->tryGet(task_partition.getPartitionCleanStartPath(), clean_start_status) || clean_start_status != "ok")
1901 {
1902 zookeeper->createIfNotExists(task_partition.getPartitionCleanStartPath(), "");
1903 auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker", *zookeeper, host_id);
1904 // Maybe we are the first worker
1905 ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()");
1906 UInt64 count;
1907 {
1908 Context local_context = context;
1909 // Use pull (i.e. readonly) settings, but fetch data from destination servers
1910 local_context.getSettingsRef() = task_cluster->settings_pull;
1911 local_context.getSettingsRef().skip_unavailable_shards = true;
1912
1913 Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().in);
1914 count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
1915 }
1916
1917 if (count != 0)
1918 {
1919 Coordination::Stat stat_shards;
1920 zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards);
1921
1922 /// NOTE: partition is still fresh if dirt discovery happens before cleaning
1923 if (stat_shards.numChildren == 0)
1924 {
1925 LOG_WARNING(log, "There are no workers for partition " << task_partition.name
1926 << ", but destination table contains " << count << " rows"
1927 << ". Partition will be dropped and refilled.");
1928
1929 create_is_dirty_node(clean_state_clock);
1930 return PartitionTaskStatus::Error;
1931 }
1932 }
1933 zookeeper->set(task_partition.getPartitionCleanStartPath(), "ok");
1934 }
1935 /// At this point, we need to sync that the destination table is clean
1936 /// before any actual work
1937
1938 /// Try start processing, create node about it
1939 {
1940 String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
1941 CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
1942 if (clean_state_clock != new_clean_state_clock)
1943 {
1944 LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing");
1945 return PartitionTaskStatus::Error;
1946 }
1947 else if (!new_clean_state_clock.is_clean())
1948 {
1949 LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
1950 create_is_dirty_node(new_clean_state_clock);
1951 return PartitionTaskStatus::Error;
1952 }
1953 zookeeper->create(current_task_status_path, start_state, zkutil::CreateMode::Persistent);
1954 }
1955
1956 /// Try create table (if not exists) on each shard
1957 {
1958 auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast);
1959 create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
1960 String query = queryToString(create_query_push_ast);
1961
1962 LOG_DEBUG(log, "Create destination tables. Query: " << query);
1963 UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push,
1964 PoolMode::GET_MANY);
1965 LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << " have been created on " << shards
1966 << " shards of " << task_table.cluster_push->getShardCount());
1967 }
1968
1969 /// Do the copying
1970 {
1971 bool inject_fault = false;
1972 if (copy_fault_probability > 0)
1973 {
1974 double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
1975 inject_fault = value < copy_fault_probability;
1976 }
1977
1978 // Select all fields
1979 ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", inject_fault ? "1" : "");
1980
1981 LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription()
1982 << " : " << queryToString(query_select_ast));
1983
1984 ASTPtr query_insert_ast;
1985 {
1986 String query;
1987 query += "INSERT INTO " + getQuotedTable(task_shard.table_split_shard) + " VALUES ";
1988
1989 ParserQuery p_query(query.data() + query.size());
1990 query_insert_ast = parseQuery(p_query, query, 0);
1991
1992 LOG_DEBUG(log, "Executing INSERT query: " << query);
1993 }
1994
1995 try
1996 {
1997 /// Custom INSERT SELECT implementation
1998 Context context_select = context;
1999 context_select.getSettingsRef() = task_cluster->settings_pull;
2000
2001 Context context_insert = context;
2002 context_insert.getSettingsRef() = task_cluster->settings_push;
2003
2004 BlockInputStreamPtr input;
2005 BlockOutputStreamPtr output;
2006 {
2007 BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
2008 BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
2009
2010 input = io_select.in;
2011 output = io_insert.out;
2012 }
2013
2014 /// Fail-fast optimization to abort copying when the current clean state expires
2015 std::future<Coordination::ExistsResponse> future_is_dirty_checker;
2016
2017 Stopwatch watch(CLOCK_MONOTONIC_COARSE);
2018 constexpr UInt64 check_period_milliseconds = 500;
2019
2020 /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data
2021 auto cancel_check = [&] ()
2022 {
2023 if (zookeeper->expired())
2024 throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
2025
2026 if (!future_is_dirty_checker.valid())
2027 future_is_dirty_checker = zookeeper->asyncExists(is_dirty_flag_path);
2028
2029 /// check_period_milliseconds should less than average insert time of single block
2030 /// Otherwise, the insertion will slow a little bit
2031 if (watch.elapsedMilliseconds() >= check_period_milliseconds)
2032 {
2033 Coordination::ExistsResponse status = future_is_dirty_checker.get();
2034
2035 if (status.error != Coordination::ZNONODE)
2036 {
2037 LogicalClock dirt_discovery_epoch (status.stat.mzxid);
2038 if (dirt_discovery_epoch == clean_state_clock.discovery_zxid)
2039 return false;
2040 throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
2041 }
2042 }
2043
2044 return false;
2045 };
2046
2047 /// Update statistics
2048 /// It is quite rough: bytes_copied don't take into account DROP PARTITION.
2049 auto update_stats = [&cluster_partition] (const Block & block)
2050 {
2051 cluster_partition.bytes_copied += block.bytes();
2052 cluster_partition.rows_copied += block.rows();
2053 cluster_partition.blocks_copied += 1;
2054 };
2055
2056 /// Main work is here
2057 copyData(*input, *output, cancel_check, update_stats);
2058
2059 // Just in case
2060 if (future_is_dirty_checker.valid())
2061 future_is_dirty_checker.get();
2062
2063 if (inject_fault)
2064 throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
2065 }
2066 catch (...)
2067 {
2068 tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty");
2069 return PartitionTaskStatus::Error;
2070 }
2071 }
2072
2073 /// Finalize the processing, change state of current partition task (and also check is_dirty flag)
2074 {
2075 String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
2076 CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path);
2077 if (clean_state_clock != new_clean_state_clock)
2078 {
2079 LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing");
2080 return PartitionTaskStatus::Error;
2081 }
2082 else if (!new_clean_state_clock.is_clean())
2083 {
2084 LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled");
2085 create_is_dirty_node(new_clean_state_clock);
2086 return PartitionTaskStatus::Error;
2087 }
2088 zookeeper->set(current_task_status_path, state_finished, 0);
2089 }
2090
2091 LOG_INFO(log, "Partition " << task_partition.name << " copied");
2092 return PartitionTaskStatus::Finished;
2093 }
2094
2095 void dropAndCreateLocalTable(const ASTPtr & create_ast)
2096 {
2097 const auto & create = create_ast->as<ASTCreateQuery &>();
2098 dropLocalTableIfExists({create.database, create.table});
2099
2100 InterpreterCreateQuery interpreter(create_ast, context);
2101 interpreter.execute();
2102 }
2103
2104 void dropLocalTableIfExists(const DatabaseAndTableName & table_name) const
2105 {
2106 auto drop_ast = std::make_shared<ASTDropQuery>();
2107 drop_ast->if_exists = true;
2108 drop_ast->database = table_name.first;
2109 drop_ast->table = table_name.second;
2110
2111 InterpreterDropQuery interpreter(drop_ast, context);
2112 interpreter.execute();
2113 }
2114
2115 String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr)
2116 {
2117 String query = "SHOW CREATE TABLE " + getQuotedTable(table);
2118 Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
2119 connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings));
2120
2121 return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
2122 }
2123
2124 ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
2125 {
2126 /// Fetch and parse (possibly) new definition
2127 auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull);
2128 String create_query_pull_str = getRemoteCreateTable(
2129 task_shard.task_table.table_pull,
2130 *connection_entry,
2131 &task_cluster->settings_pull);
2132
2133 ParserCreateQuery parser_create_query;
2134 return parseQuery(parser_create_query, create_query_pull_str, 0);
2135 }
2136
2137 void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true)
2138 {
2139 TaskTable & task_table = task_shard.task_table;
2140
2141 /// We need to update table definitions for each part, it could be changed after ALTER
2142 task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
2143
2144 /// Create local Distributed tables:
2145 /// a table fetching data from current shard and a table inserting data to the whole destination cluster
2146 String read_shard_prefix = ".read_shard_" + toString(task_shard.indexInCluster()) + ".";
2147 String split_shard_prefix = ".split.";
2148 task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id);
2149 task_shard.table_split_shard = DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id);
2150
2151 /// Create special cluster with single shard
2152 String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;
2153 ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
2154 context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
2155
2156 auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
2157 const auto & storage_split_ast = task_table.engine_split_ast;
2158
2159 auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
2160 auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
2161 auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_split_shard, storage_split_ast);
2162
2163 dropAndCreateLocalTable(create_table_pull_ast);
2164
2165 if (create_split)
2166 dropAndCreateLocalTable(create_table_split_ast);
2167 }
2168
2169
2170 std::set<String> getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
2171 {
2172 createShardInternalTables(timeouts, task_shard, false);
2173
2174 TaskTable & task_table = task_shard.task_table;
2175
2176 String query;
2177 {
2178 WriteBufferFromOwnString wb;
2179 wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
2180 << " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
2181 query = wb.str();
2182 }
2183
2184 ParserQuery parser_query(query.data() + query.size());
2185 ASTPtr query_ast = parseQuery(parser_query, query, 0);
2186
2187 LOG_DEBUG(log, "Computing destination partition set, executing query: " << query);
2188
2189 Context local_context = context;
2190 local_context.setSettings(task_cluster->settings_pull);
2191 Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in);
2192
2193 std::set<String> res;
2194 if (block)
2195 {
2196 ColumnWithTypeAndName & column = block.getByPosition(0);
2197 task_shard.partition_key_column = column;
2198
2199 for (size_t i = 0; i < column.column->size(); ++i)
2200 {
2201 WriteBufferFromOwnString wb;
2202 column.type->serializeAsTextQuoted(*column.column, i, wb, FormatSettings());
2203 res.emplace(wb.str());
2204 }
2205 }
2206
2207 LOG_DEBUG(log, "There are " << res.size() << " destination partitions in shard " << task_shard.getDescription());
2208
2209 return res;
2210 }
2211
2212 bool checkShardHasPartition(const ConnectionTimeouts & timeouts, TaskShard & task_shard, const String & partition_quoted_name)
2213 {
2214 createShardInternalTables(timeouts, task_shard, false);
2215
2216 TaskTable & task_table = task_shard.task_table;
2217
2218 std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
2219 + " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + partition_quoted_name + " AS partition_key))";
2220
2221 if (!task_table.where_condition_str.empty())
2222 query += " AND (" + task_table.where_condition_str + ")";
2223
2224 query += " LIMIT 1";
2225
2226 LOG_DEBUG(log, "Checking shard " << task_shard.getDescription() << " for partition "
2227 << partition_quoted_name << " existence, executing query: " << query);
2228
2229 ParserQuery parser_query(query.data() + query.size());
2230 ASTPtr query_ast = parseQuery(parser_query, query, 0);
2231
2232 Context local_context = context;
2233 local_context.setSettings(task_cluster->settings_pull);
2234 return InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows() != 0;
2235 }
2236
2237 /** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
2238 * Returns number of shards for which at least one replica executed query successfully
2239 */
2240 UInt64 executeQueryOnCluster(
2241 const ClusterPtr & cluster,
2242 const String & query,
2243 const ASTPtr & query_ast_ = nullptr,
2244 const Settings * settings = nullptr,
2245 PoolMode pool_mode = PoolMode::GET_ALL,
2246 UInt64 max_successful_executions_per_shard = 0) const
2247 {
2248 auto num_shards = cluster->getShardsInfo().size();
2249 std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
2250
2251 ASTPtr query_ast;
2252 if (query_ast_ == nullptr)
2253 {
2254 ParserQuery p_query(query.data() + query.size());
2255 query_ast = parseQuery(p_query, query, 0);
2256 }
2257 else
2258 query_ast = query_ast_;
2259
2260
2261 /// We need to execute query on one replica at least
2262 auto do_for_shard = [&] (UInt64 shard_index)
2263 {
2264 const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
2265 UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
2266 num_successful_executions = 0;
2267
2268 auto increment_and_check_exit = [&] ()
2269 {
2270 ++num_successful_executions;
2271 return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
2272 };
2273
2274 UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
2275 UInt64 num_local_replicas = shard.getLocalNodeCount();
2276 UInt64 num_remote_replicas = num_replicas - num_local_replicas;
2277
2278 /// In that case we don't have local replicas, but do it just in case
2279 for (UInt64 i = 0; i < num_local_replicas; ++i)
2280 {
2281 auto interpreter = InterpreterFactory::get(query_ast, context);
2282 interpreter->execute();
2283
2284 if (increment_and_check_exit())
2285 return;
2286 }
2287
2288 /// Will try to make as many as possible queries
2289 if (shard.hasRemoteConnections())
2290 {
2291 Settings current_settings = settings ? *settings : task_cluster->settings_common;
2292 current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
2293
2294 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);
2295 auto connections = shard.pool->getMany(timeouts, &current_settings, pool_mode);
2296
2297 for (auto & connection : connections)
2298 {
2299 if (connection.isNull())
2300 continue;
2301
2302 try
2303 {
2304 /// CREATE TABLE and DROP PARTITION queries return empty block
2305 RemoteBlockInputStream stream{*connection, query, Block{}, context, &current_settings};
2306 NullBlockOutputStream output{Block{}};
2307 copyData(stream, output);
2308
2309 if (increment_and_check_exit())
2310 return;
2311 }
2312 catch (const Exception &)
2313 {
2314 LOG_INFO(log, getCurrentExceptionMessage(false, true));
2315 }
2316 }
2317 }
2318 };
2319
2320 {
2321 ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
2322
2323 for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
2324 thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
2325
2326 thread_pool.wait();
2327 }
2328
2329 UInt64 successful_shards = 0;
2330 for (UInt64 num_replicas : per_shard_num_successful_replicas)
2331 successful_shards += (num_replicas > 0);
2332
2333 return successful_shards;
2334 }
2335
2336private:
2337 String task_zookeeper_path;
2338 String task_description_path;
2339 String host_id;
2340 String working_database_name;
2341
2342 /// Auto update config stuff
2343 UInt64 task_descprtion_current_version = 1;
2344 std::atomic<UInt64> task_descprtion_version{1};
2345 Coordination::WatchCallback task_description_watch_callback;
2346 /// ZooKeeper session used to set the callback
2347 zkutil::ZooKeeperPtr task_description_watch_zookeeper;
2348
2349 ConfigurationPtr task_cluster_initial_config;
2350 ConfigurationPtr task_cluster_current_config;
2351 Coordination::Stat task_descprtion_current_stat{};
2352
2353 std::unique_ptr<TaskCluster> task_cluster;
2354
2355 bool is_safe_mode = false;
2356 double copy_fault_probability = 0.0;
2357
2358 Context & context;
2359 Poco::Logger * log;
2360
2361 std::chrono::milliseconds default_sleep_time{1000};
2362};
2363
2364
2365/// ClusterCopierApp
2366
2367
2368void ClusterCopierApp::initialize(Poco::Util::Application & self)
2369{
2370 is_help = config().has("help");
2371 if (is_help)
2372 return;
2373
2374 config_xml_path = config().getString("config-file");
2375 task_path = config().getString("task-path");
2376 log_level = config().getString("log-level", "debug");
2377 is_safe_mode = config().has("safe-mode");
2378 if (config().has("copy-fault-probability"))
2379 copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
2380 base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current();
2381
2382 // process_id is '<hostname>#<start_timestamp>_<pid>'
2383 time_t timestamp = Poco::Timestamp().epochTime();
2384 auto curr_pid = Poco::Process::id();
2385
2386 process_id = std::to_string(DateLUT::instance().toNumYYYYMMDDhhmmss(timestamp)) + "_" + std::to_string(curr_pid);
2387 host_id = escapeForFileName(getFQDNOrHostName()) + '#' + process_id;
2388 process_path = Poco::Path(base_dir + "/clickhouse-copier_" + process_id).absolute().toString();
2389 Poco::File(process_path).createDirectories();
2390
2391 /// Override variables for BaseDaemon
2392 if (config().has("log-level"))
2393 config().setString("logger.level", config().getString("log-level"));
2394
2395 if (config().has("base-dir") || !config().has("logger.log"))
2396 config().setString("logger.log", process_path + "/log.log");
2397
2398 if (config().has("base-dir") || !config().has("logger.errorlog"))
2399 config().setString("logger.errorlog", process_path + "/log.err.log");
2400
2401 Base::initialize(self);
2402}
2403
2404
2405void ClusterCopierApp::handleHelp(const std::string &, const std::string &)
2406{
2407 Poco::Util::HelpFormatter helpFormatter(options());
2408 helpFormatter.setCommand(commandName());
2409 helpFormatter.setHeader("Copies tables from one cluster to another");
2410 helpFormatter.setUsage("--config-file <config-file> --task-path <task-path>");
2411 helpFormatter.format(std::cerr);
2412
2413 stopOptionsProcessing();
2414}
2415
2416
2417void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
2418{
2419 Base::defineOptions(options);
2420
2421 options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
2422 .argument("task-path").binding("task-path"));
2423 options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
2424 .argument("task-file").binding("task-file"));
2425 options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
2426 .argument("task-upload-force").binding("task-upload-force"));
2427 options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
2428 .binding("safe-mode"));
2429 options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
2430 .argument("copy-fault-probability").binding("copy-fault-probability"));
2431 options.addOption(Poco::Util::Option("log-level", "", "sets log level")
2432 .argument("log-level").binding("log-level"));
2433 options.addOption(Poco::Util::Option("base-dir", "", "base directory for copiers, consequitive copier launches will populate /base-dir/launch_id/* directories")
2434 .argument("base-dir").binding("base-dir"));
2435
2436 using Me = std::decay_t<decltype(*this)>;
2437 options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
2438 .callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
2439}
2440
2441
2442void ClusterCopierApp::mainImpl()
2443{
2444 StatusFile status_file(process_path + "/status");
2445 ThreadStatus thread_status;
2446
2447 auto log = &logger();
2448 LOG_INFO(log, "Starting clickhouse-copier ("
2449 << "id " << process_id << ", "
2450 << "host_id " << host_id << ", "
2451 << "path " << process_path << ", "
2452 << "revision " << ClickHouseRevision::get() << ")");
2453
2454 auto context = std::make_unique<Context>(Context::createGlobal());
2455 context->makeGlobalContext();
2456 SCOPE_EXIT(context->shutdown());
2457
2458 context->setConfig(loaded_config.configuration);
2459 context->setApplicationType(Context::ApplicationType::LOCAL);
2460 context->setPath(process_path);
2461
2462 registerFunctions();
2463 registerAggregateFunctions();
2464 registerTableFunctions();
2465 registerStorages();
2466 registerDictionaries();
2467 registerDisks();
2468
2469 static const std::string default_database = "_local";
2470 context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
2471 context->setCurrentDatabase(default_database);
2472
2473 /// Initialize query scope just in case.
2474 CurrentThread::QueryScope query_scope(*context);
2475
2476 auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
2477 copier->setSafeMode(is_safe_mode);
2478 copier->setCopyFaultProbability(copy_fault_probability);
2479
2480 auto task_file = config().getString("task-file", "");
2481 if (!task_file.empty())
2482 copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
2483
2484 copier->init();
2485 copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
2486
2487 /// Reset ZooKeeper before removing ClusterCopier.
2488 /// Otherwise zookeeper watch can call callback which use already removed ClusterCopier object.
2489 context->resetZooKeeper();
2490}
2491
2492
2493int ClusterCopierApp::main(const std::vector<std::string> &)
2494{
2495 if (is_help)
2496 return 0;
2497
2498 try
2499 {
2500 mainImpl();
2501 }
2502 catch (...)
2503 {
2504 tryLogCurrentException(&Poco::Logger::root(), __PRETTY_FUNCTION__);
2505 auto code = getCurrentExceptionCode();
2506
2507 return (code) ? code : -1;
2508 }
2509
2510 return 0;
2511}
2512
2513
2514}
2515
2516#pragma GCC diagnostic ignored "-Wunused-function"
2517#pragma GCC diagnostic ignored "-Wmissing-declarations"
2518
2519int mainEntryClickHouseClusterCopier(int argc, char ** argv)
2520{
2521 try
2522 {
2523 DB::ClusterCopierApp app;
2524 return app.run(argc, argv);
2525 }
2526 catch (...)
2527 {
2528 std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
2529 auto code = DB::getCurrentExceptionCode();
2530
2531 return (code) ? code : -1;
2532 }
2533}
2534