1#include <Interpreters/DDLWorker.h>
2#include <Parsers/ASTAlterQuery.h>
3#include <Parsers/ASTDropQuery.h>
4#include <Parsers/ASTOptimizeQuery.h>
5#include <Parsers/ASTQueryWithOnCluster.h>
6#include <Parsers/ASTQueryWithTableAndOutput.h>
7#include <Parsers/ParserQuery.h>
8#include <Parsers/parseQuery.h>
9#include <Parsers/queryToString.h>
10#include <IO/WriteHelpers.h>
11#include <IO/ReadHelpers.h>
12#include <IO/Operators.h>
13#include <IO/ReadBufferFromString.h>
14#include <Storages/IStorage.h>
15#include <DataStreams/IBlockInputStream.h>
16#include <Interpreters/executeQuery.h>
17#include <Interpreters/Cluster.h>
18#include <Interpreters/AddDefaultDatabaseVisitor.h>
19#include <Common/DNSResolver.h>
20#include <Common/Macros.h>
21#include <Common/getFQDNOrHostName.h>
22#include <Common/setThreadName.h>
23#include <Common/Stopwatch.h>
24#include <Common/randomSeed.h>
25#include <common/sleep.h>
26#include <DataTypes/DataTypesNumber.h>
27#include <DataTypes/DataTypeString.h>
28#include <DataTypes/DataTypeArray.h>
29#include <Columns/ColumnsNumber.h>
30#include <Columns/ColumnString.h>
31#include <Columns/ColumnArray.h>
32#include <Common/ZooKeeper/ZooKeeper.h>
33#include <Common/ZooKeeper/KeeperException.h>
34#include <Common/ZooKeeper/Lock.h>
35#include <Common/isLocalAddress.h>
36#include <Storages/StorageReplicatedMergeTree.h>
37#include <Poco/Timestamp.h>
38#include <random>
39#include <pcg_random.hpp>
40#include <Poco/Net/NetException.h>
41
42
43namespace DB
44{
45
46namespace ErrorCodes
47{
48 extern const int LOGICAL_ERROR;
49 extern const int UNKNOWN_ELEMENT_IN_CONFIG;
50 extern const int INVALID_CONFIG_PARAMETER;
51 extern const int UNKNOWN_FORMAT_VERSION;
52 extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS;
53 extern const int INCONSISTENT_CLUSTER_DEFINITION;
54 extern const int TIMEOUT_EXCEEDED;
55 extern const int UNKNOWN_TYPE_OF_QUERY;
56 extern const int UNFINISHED;
57 extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK;
58 extern const int QUERY_IS_PROHIBITED;
59}
60
61
62namespace
63{
64
65struct HostID
66{
67 String host_name;
68 UInt16 port;
69
70 HostID() = default;
71
72 explicit HostID(const Cluster::Address & address)
73 : host_name(address.host_name), port(address.port) {}
74
75 static HostID fromString(const String & host_port_str)
76 {
77 HostID res;
78 std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
79 return res;
80 }
81
82 String toString() const
83 {
84 return Cluster::Address::toString(host_name, port);
85 }
86
87 String readableString() const
88 {
89 return host_name + ":" + DB::toString(port);
90 }
91
92 bool isLocalAddress(UInt16 clickhouse_port) const
93 {
94 try
95 {
96 return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
97 }
98 catch (const Poco::Net::NetException &)
99 {
100 /// Avoid "Host not found" exceptions
101 return false;
102 }
103 }
104
105 static String applyToString(const HostID & host_id)
106 {
107 return host_id.toString();
108 }
109};
110
111}
112
113
114struct DDLLogEntry
115{
116 String query;
117 std::vector<HostID> hosts;
118 String initiator; // optional
119
120 static constexpr int CURRENT_VERSION = 1;
121
122 String toString()
123 {
124 WriteBufferFromOwnString wb;
125
126 Strings host_id_strings(hosts.size());
127 std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
128
129 auto version = CURRENT_VERSION;
130 wb << "version: " << version << "\n";
131 wb << "query: " << escape << query << "\n";
132 wb << "hosts: " << host_id_strings << "\n";
133 wb << "initiator: " << initiator << "\n";
134
135 return wb.str();
136 }
137
138 void parse(const String & data)
139 {
140 ReadBufferFromString rb(data);
141
142 int version;
143 rb >> "version: " >> version >> "\n";
144
145 if (version != CURRENT_VERSION)
146 throw Exception("Unknown DDLLogEntry format version: " + DB::toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
147
148 Strings host_id_strings;
149 rb >> "query: " >> escape >> query >> "\n";
150 rb >> "hosts: " >> host_id_strings >> "\n";
151
152 if (!rb.eof())
153 rb >> "initiator: " >> initiator >> "\n";
154 else
155 initiator.clear();
156
157 assertEOF(rb);
158
159 hosts.resize(host_id_strings.size());
160 std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
161 }
162};
163
164
165struct DDLTask
166{
167 /// Stages of task lifetime correspond ordering of these data fields:
168
169 /// Stage 1: parse entry
170 String entry_name;
171 String entry_path;
172 DDLLogEntry entry;
173
174 /// Stage 2: resolve host_id and check that
175 HostID host_id;
176 String host_id_str;
177
178 /// Stage 3.1: parse query
179 ASTPtr query;
180 ASTQueryWithOnCluster * query_on_cluster = nullptr;
181
182 /// Stage 3.2: check cluster and find the host in cluster
183 String cluster_name;
184 ClusterPtr cluster;
185 Cluster::Address address_in_cluster;
186 size_t host_shard_num;
187 size_t host_replica_num;
188
189 /// Stage 3.3: execute query
190 ExecutionStatus execution_status;
191 bool was_executed = false;
192
193 /// Stage 4: commit results to ZooKeeper
194};
195
196
197static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
198 const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
199{
200 auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
201 zookeeper_holder->initFromInstance(zookeeper);
202 return std::make_unique<zkutil::Lock>(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message);
203}
204
205
206static bool isSupportedAlterType(int type)
207{
208 static const std::unordered_set<int> unsupported_alter_types{
209 ASTAlterCommand::ATTACH_PARTITION,
210 ASTAlterCommand::REPLACE_PARTITION,
211 ASTAlterCommand::FETCH_PARTITION,
212 ASTAlterCommand::FREEZE_PARTITION,
213 ASTAlterCommand::FREEZE_ALL,
214 ASTAlterCommand::NO_TYPE,
215 };
216
217 return unsupported_alter_types.count(type) == 0;
218}
219
220
221DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
222 : context(context_), log(&Logger::get("DDLWorker"))
223{
224 queue_dir = zk_root_dir;
225 if (queue_dir.back() == '/')
226 queue_dir.resize(queue_dir.size() - 1);
227
228 if (config)
229 {
230 task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
231 cleanup_delay_period = config->getUInt64(prefix + ".cleanup_delay_period", static_cast<UInt64>(cleanup_delay_period));
232 max_tasks_in_queue = std::max<UInt64>(1, config->getUInt64(prefix + ".max_tasks_in_queue", max_tasks_in_queue));
233
234 if (config->has(prefix + ".profile"))
235 context.setSetting("profile", config->getString(prefix + ".profile"));
236 }
237
238 if (context.getSettingsRef().readonly)
239 {
240 LOG_WARNING(log, "Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries"
241 << " Set apropriate system_profile or distributed_ddl.profile to fix this.");
242 }
243
244 host_fqdn = getFQDNOrHostName();
245 host_fqdn_id = Cluster::Address::toString(host_fqdn, context.getTCPPort());
246
247 main_thread = ThreadFromGlobalPool(&DDLWorker::runMainThread, this);
248 cleanup_thread = ThreadFromGlobalPool(&DDLWorker::runCleanupThread, this);
249}
250
251
252DDLWorker::~DDLWorker()
253{
254 stop_flag = true;
255 queue_updated_event->set();
256 cleanup_event->set();
257 main_thread.join();
258 cleanup_thread.join();
259}
260
261
262DDLWorker::ZooKeeperPtr DDLWorker::tryGetZooKeeper() const
263{
264 std::lock_guard lock(zookeeper_mutex);
265 return current_zookeeper;
266}
267
268DDLWorker::ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
269{
270 std::lock_guard lock(zookeeper_mutex);
271
272 if (!current_zookeeper || current_zookeeper->expired())
273 current_zookeeper = context.getZooKeeper();
274
275 return current_zookeeper;
276}
277
278
279bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
280{
281 String node_data;
282 String entry_path = queue_dir + "/" + entry_name;
283
284 if (!zookeeper->tryGet(entry_path, node_data))
285 {
286 /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
287 out_reason = "The task was deleted";
288 return false;
289 }
290
291 auto task = std::make_unique<DDLTask>();
292 task->entry_name = entry_name;
293 task->entry_path = entry_path;
294
295 try
296 {
297 task->entry.parse(node_data);
298 }
299 catch (...)
300 {
301 /// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
302 /// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
303 /// Otherwise, that node will be ignored by DDLQueryStatusInputStream.
304
305 tryLogCurrentException(log, "Cannot parse DDL task " + entry_name + ", will try to send error status");
306
307 String status = ExecutionStatus::fromCurrentException().serializeText();
308 try
309 {
310 createStatusDirs(entry_path, zookeeper);
311 zookeeper->tryCreate(entry_path + "/finished/" + host_fqdn_id, status, zkutil::CreateMode::Persistent);
312 }
313 catch (...)
314 {
315 tryLogCurrentException(log, "Can't report the task has invalid format");
316 }
317
318 out_reason = "Incorrect task format";
319 return false;
320 }
321
322 bool host_in_hostlist = false;
323 for (const HostID & host : task->entry.hosts)
324 {
325 auto maybe_secure_port = context.getTCPPortSecure();
326
327 /// The port is considered local if it matches TCP or TCP secure port that the server is listening.
328 bool is_local_port = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port))
329 || host.isLocalAddress(context.getTCPPort());
330
331 if (!is_local_port)
332 continue;
333
334 if (host_in_hostlist)
335 {
336 /// This check could be slow a little bit
337 LOG_WARNING(log, "There are two the same ClickHouse instances in task " << entry_name
338 << ": " << task->host_id.readableString() << " and " << host.readableString() << ". Will use the first one only.");
339 }
340 else
341 {
342 host_in_hostlist = true;
343 task->host_id = host;
344 task->host_id_str = host.toString();
345 }
346 }
347
348 if (host_in_hostlist)
349 current_task = std::move(task);
350 else
351 out_reason = "There is no a local address in host list";
352
353 return host_in_hostlist;
354}
355
356
357static void filterAndSortQueueNodes(Strings & all_nodes)
358{
359 all_nodes.erase(std::remove_if(all_nodes.begin(), all_nodes.end(), [] (const String & s) { return !startsWith(s, "query-"); }), all_nodes.end());
360 std::sort(all_nodes.begin(), all_nodes.end());
361}
362
363
364void DDLWorker::processTasks()
365{
366 LOG_DEBUG(log, "Processing tasks");
367 auto zookeeper = tryGetZooKeeper();
368
369 Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
370 filterAndSortQueueNodes(queue_nodes);
371 if (queue_nodes.empty())
372 return;
373
374 bool server_startup = last_processed_task_name.empty();
375
376 auto begin_node = server_startup
377 ? queue_nodes.begin()
378 : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_task_name);
379
380 for (auto it = begin_node; it != queue_nodes.end(); ++it)
381 {
382 String entry_name = *it;
383
384 if (current_task)
385 {
386 if (current_task->entry_name == entry_name)
387 {
388 LOG_INFO(log, "Trying to process task " << entry_name << " again");
389 }
390 else
391 {
392 LOG_INFO(log, "Task " << current_task->entry_name << " was deleted from ZooKeeper before current host committed it");
393 current_task = nullptr;
394 }
395 }
396
397 if (!current_task)
398 {
399 String reason;
400 if (!initAndCheckTask(entry_name, reason, zookeeper))
401 {
402 LOG_DEBUG(log, "Will not execute task " << entry_name << ": " << reason);
403 last_processed_task_name = entry_name;
404 continue;
405 }
406 }
407
408 DDLTask & task = *current_task;
409
410 bool already_processed = zookeeper->exists(task.entry_path + "/finished/" + task.host_id_str);
411 if (!server_startup && !task.was_executed && already_processed)
412 {
413 throw Exception(
414 "Server expects that DDL task " + task.entry_name + " should be processed, but it was already processed according to ZK",
415 ErrorCodes::LOGICAL_ERROR);
416 }
417
418 if (!already_processed)
419 {
420 try
421 {
422 processTask(task, zookeeper);
423 }
424 catch (...)
425 {
426 LOG_WARNING(log, "An error occurred while processing task " << task.entry_name << " (" << task.entry.query << ") : "
427 << getCurrentExceptionMessage(true));
428 throw;
429 }
430 }
431 else
432 {
433 LOG_DEBUG(log, "Task " << task.entry_name << " (" << task.entry.query << ") has been already processed");
434 }
435
436 last_processed_task_name = task.entry_name;
437 current_task.reset();
438
439 if (stop_flag)
440 break;
441 }
442}
443
444
445/// Parses query and resolves cluster and host in cluster
446void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
447{
448 {
449 const char * begin = task.entry.query.data();
450 const char * end = begin + task.entry.query.size();
451
452 ParserQuery parser_query(end);
453 String description;
454 task.query = parseQuery(parser_query, begin, end, description, 0);
455 }
456
457 // XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
458 if (!task.query || !(task.query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(task.query.get())))
459 throw Exception("Received unknown DDL query", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
460
461 task.cluster_name = task.query_on_cluster->cluster;
462 task.cluster = context.tryGetCluster(task.cluster_name);
463 if (!task.cluster)
464 {
465 throw Exception("DDL task " + task.entry_name + " contains current host " + task.host_id.readableString()
466 + " in cluster " + task.cluster_name + ", but there are no such cluster here.", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
467 }
468
469 /// Try to find host from task host list in cluster
470 /// At the first, try find exact match (host name and ports should be literally equal)
471 /// If the attempt fails, try find it resolving host name of each instance
472 const auto & shards = task.cluster->getShardsAddresses();
473
474 bool found_exact_match = false;
475 for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
476 {
477 for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
478 {
479 const Cluster::Address & address = shards[shard_num][replica_num];
480
481 if (address.host_name == task.host_id.host_name && address.port == task.host_id.port)
482 {
483 if (found_exact_match)
484 {
485 throw Exception("There are two exactly the same ClickHouse instances " + address.readableString()
486 + " in cluster " + task.cluster_name, ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
487 }
488
489 found_exact_match = true;
490 task.host_shard_num = shard_num;
491 task.host_replica_num = replica_num;
492 task.address_in_cluster = address;
493 }
494 }
495 }
496
497 if (found_exact_match)
498 return;
499
500 LOG_WARNING(log, "Not found the exact match of host " << task.host_id.readableString() << " from task " << task.entry_name
501 << " in cluster " << task.cluster_name << " definition. Will try to find it using host name resolving.");
502
503 bool found_via_resolving = false;
504 for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
505 {
506 for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
507 {
508 const Cluster::Address & address = shards[shard_num][replica_num];
509
510 if (auto resolved = address.getResolvedAddress();
511 resolved && (isLocalAddress(*resolved, context.getTCPPort())
512 || (context.getTCPPortSecure() && isLocalAddress(*resolved, *context.getTCPPortSecure()))))
513 {
514 if (found_via_resolving)
515 {
516 throw Exception("There are two the same ClickHouse instances in cluster " + task.cluster_name + " : "
517 + task.address_in_cluster.readableString() + " and " + address.readableString(), ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
518 }
519 else
520 {
521 found_via_resolving = true;
522 task.host_shard_num = shard_num;
523 task.host_replica_num = replica_num;
524 task.address_in_cluster = address;
525 }
526 }
527 }
528 }
529
530 if (!found_via_resolving)
531 {
532 throw Exception("Not found host " + task.host_id.readableString() + " in definition of cluster " + task.cluster_name,
533 ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
534 }
535 else
536 {
537 LOG_INFO(log, "Resolved host " << task.host_id.readableString() << " from task " << task.entry_name
538 << " as host " << task.address_in_cluster.readableString() << " in definition of cluster " << task.cluster_name);
539 }
540}
541
542
543bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status)
544{
545 /// Add special comment at the start of query to easily identify DDL-produced queries in query_log
546 String query_prefix = "/* ddl_entry=" + task.entry_name + " */ ";
547 String query_to_execute = query_prefix + query;
548
549 ReadBufferFromString istr(query_to_execute);
550 String dummy_string;
551 WriteBufferFromString ostr(dummy_string);
552
553 try
554 {
555 current_context = std::make_unique<Context>(context);
556 current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
557 current_context->setCurrentQueryId(""); // generate random query_id
558 executeQuery(istr, ostr, false, *current_context, {}, {});
559 }
560 catch (...)
561 {
562 status = ExecutionStatus::fromCurrentException();
563 tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");
564
565 return false;
566 }
567
568 status = ExecutionStatus(0);
569 LOG_DEBUG(log, "Executed query: " << query);
570
571 return true;
572}
573
574void DDLWorker::attachToThreadGroup()
575{
576 if (thread_group)
577 {
578 /// Put all threads to one thread pool
579 CurrentThread::attachToIfDetached(thread_group);
580 }
581 else
582 {
583 CurrentThread::initializeQuery();
584 thread_group = CurrentThread::getGroup();
585 }
586}
587
588
589void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper)
590{
591 LOG_DEBUG(log, "Processing task " << task.entry_name << " (" << task.entry.query << ")");
592
593 String dummy;
594 String active_node_path = task.entry_path + "/active/" + task.host_id_str;
595 String finished_node_path = task.entry_path + "/finished/" + task.host_id_str;
596
597 auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
598
599 if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS)
600 {
601 // Ok
602 }
603 else if (code == Coordination::ZNONODE)
604 {
605 /// There is no parent
606 createStatusDirs(task.entry_path, zookeeper);
607 if (Coordination::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
608 throw Coordination::Exception(code, active_node_path);
609 }
610 else
611 throw Coordination::Exception(code, active_node_path);
612
613 if (!task.was_executed)
614 {
615 try
616 {
617 parseQueryAndResolveHost(task);
618
619 ASTPtr rewritten_ast = task.query_on_cluster->getRewrittenASTWithoutOnCluster(task.address_in_cluster.default_database);
620 String rewritten_query = queryToString(rewritten_ast);
621 LOG_DEBUG(log, "Executing query: " << rewritten_query);
622
623 if (auto query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(rewritten_ast.get()); query_with_table)
624 {
625 String database = query_with_table->database.empty() ? context.getCurrentDatabase() : query_with_table->database;
626 StoragePtr storage = context.tryGetTable(database, query_with_table->table);
627
628 /// For some reason we check consistency of cluster definition only
629 /// in case of ALTER query, but not in case of CREATE/DROP etc.
630 /// It's strange, but this behaviour exits for a long and we cannot change it.
631 if (storage && query_with_table->as<ASTAlterQuery>())
632 checkShardConfig(query_with_table->table, task, storage);
633
634 if (storage && taskShouldBeExecutedOnLeader(rewritten_ast, storage))
635 tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper);
636 else
637 tryExecuteQuery(rewritten_query, task, task.execution_status);
638 }
639 else
640 tryExecuteQuery(rewritten_query, task, task.execution_status);
641 }
642 catch (const Coordination::Exception &)
643 {
644 throw;
645 }
646 catch (...)
647 {
648 tryLogCurrentException(log, "An error occurred before execution of DDL task: ");
649 task.execution_status = ExecutionStatus::fromCurrentException("An error occurred before execution");
650 }
651
652 /// We need to distinguish ZK errors occured before and after query executing
653 task.was_executed = true;
654 }
655
656 /// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
657
658 /// Delete active flag and create finish flag
659 Coordination::Requests ops;
660 ops.emplace_back(zkutil::makeRemoveRequest(active_node_path, -1));
661 ops.emplace_back(zkutil::makeCreateRequest(finished_node_path, task.execution_status.serializeText(), zkutil::CreateMode::Persistent));
662 zookeeper->multi(ops);
663}
664
665
666bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, const StoragePtr storage) const
667{
668 /// Pure DROP queries have to be executed on each node separately
669 if (auto query = ast_ddl->as<ASTDropQuery>(); query && query->kind != ASTDropQuery::Kind::Truncate)
670 return false;
671
672 if (!ast_ddl->as<ASTAlterQuery>() && !ast_ddl->as<ASTOptimizeQuery>() && !ast_ddl->as<ASTDropQuery>())
673 return false;
674
675 return storage->supportsReplication();
676}
677
678
679void DDLWorker::checkShardConfig(const String & table, const DDLTask & task, StoragePtr storage) const
680{
681 const auto & shard_info = task.cluster->getShardsInfo().at(task.host_shard_num);
682 bool config_is_replicated_shard = shard_info.hasInternalReplication();
683
684 if (storage->supportsReplication() && !config_is_replicated_shard)
685 {
686 throw Exception("Table '" + table + "' is replicated, but shard #" + toString(task.host_shard_num + 1) +
687 " isn't replicated according to its cluster definition."
688 " Possibly <internal_replication>true</internal_replication> is forgotten in the cluster config.",
689 ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
690 }
691
692 if (!storage->supportsReplication() && config_is_replicated_shard)
693 {
694 throw Exception("Table '" + table + "' isn't replicated, but shard #" + toString(task.host_shard_num + 1) +
695 " is replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
696 }
697}
698
699
700bool DDLWorker::tryExecuteQueryOnLeaderReplica(
701 DDLTask & task,
702 StoragePtr storage,
703 const String & rewritten_query,
704 const String & node_path,
705 const ZooKeeperPtr & zookeeper)
706{
707 StorageReplicatedMergeTree * replicated_storage = dynamic_cast<StorageReplicatedMergeTree *>(storage.get());
708
709 /// If we will develop new replicated storage
710 if (!replicated_storage)
711 throw Exception("Storage type '" + storage->getName() + "' is not supported by distributed DDL", ErrorCodes::NOT_IMPLEMENTED);
712
713 /// Generate unique name for shard node, it will be used to execute the query by only single host
714 /// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
715 /// Where replica_name is 'replica_config_host_name:replica_port'
716 auto get_shard_name = [] (const Cluster::Addresses & shard_addresses)
717 {
718 Strings replica_names;
719 for (const Cluster::Address & address : shard_addresses)
720 replica_names.emplace_back(address.readableString());
721 std::sort(replica_names.begin(), replica_names.end());
722
723 String res;
724 for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
725 res += *it + (std::next(it) != replica_names.end() ? "," : "");
726
727 return res;
728 };
729
730 String shard_node_name = get_shard_name(task.cluster->getShardsAddresses().at(task.host_shard_num));
731 String shard_path = node_path + "/shards/" + shard_node_name;
732 String is_executed_path = shard_path + "/executed";
733 zookeeper->createAncestors(shard_path + "/");
734
735 auto is_already_executed = [&]() -> bool
736 {
737 String executed_by;
738 if (zookeeper->tryGet(is_executed_path, executed_by))
739 {
740 LOG_DEBUG(log, "Task " << task.entry_name << " has already been executed by leader replica ("
741 << executed_by << ") of the same shard.");
742 return true;
743 }
744
745 return false;
746 };
747
748 pcg64 rng(randomSeed());
749
750 auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
751 static const size_t max_tries = 20;
752 bool executed_by_leader = false;
753 for (size_t num_tries = 0; num_tries < max_tries; ++num_tries)
754 {
755 if (is_already_executed())
756 {
757 executed_by_leader = true;
758 break;
759 }
760
761 StorageReplicatedMergeTree::Status status;
762 replicated_storage->getStatus(status);
763
764 /// Leader replica take lock
765 if (status.is_leader && lock->tryLock())
766 {
767 if (is_already_executed())
768 {
769 executed_by_leader = true;
770 break;
771 }
772
773 /// If the leader will unexpectedly changed this method will return false
774 /// and on the next iteration new leader will take lock
775 if (tryExecuteQuery(rewritten_query, task, task.execution_status))
776 {
777 zookeeper->create(is_executed_path, task.host_id_str, zkutil::CreateMode::Persistent);
778 executed_by_leader = true;
779 break;
780 }
781
782 }
783
784 /// Does nothing if wasn't previously locked
785 lock->unlock();
786 std::this_thread::sleep_for(std::chrono::milliseconds(std::uniform_int_distribution<long>(0, 1000)(rng)));
787 }
788
789 /// Not executed by leader so was not executed at all
790 if (!executed_by_leader)
791 {
792 task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED,
793 "Cannot execute replicated DDL query on leader");
794 return false;
795 }
796 return true;
797}
798
799
800void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper)
801{
802 LOG_DEBUG(log, "Cleaning queue");
803
804 Strings queue_nodes = zookeeper->getChildren(queue_dir);
805 filterAndSortQueueNodes(queue_nodes);
806
807 size_t num_outdated_nodes = (queue_nodes.size() > max_tasks_in_queue) ? queue_nodes.size() - max_tasks_in_queue : 0;
808 auto first_non_outdated_node = queue_nodes.begin() + num_outdated_nodes;
809
810 for (auto it = queue_nodes.cbegin(); it < queue_nodes.cend(); ++it)
811 {
812 if (stop_flag)
813 return;
814
815 String node_name = *it;
816 String node_path = queue_dir + "/" + node_name;
817 String lock_path = node_path + "/lock";
818
819 Coordination::Stat stat;
820 String dummy;
821
822 try
823 {
824 /// Already deleted
825 if (!zookeeper->exists(node_path, &stat))
826 continue;
827
828 /// Delete node if its lifetmie is expired (according to task_max_lifetime parameter)
829 constexpr UInt64 zookeeper_time_resolution = 1000;
830 Int64 zookeeper_time_seconds = stat.ctime / zookeeper_time_resolution;
831 bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds;
832
833 /// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
834 bool node_is_outside_max_window = it < first_non_outdated_node;
835
836 if (!node_lifetime_is_expired && !node_is_outside_max_window)
837 continue;
838
839 /// Skip if there are active nodes (it is weak guard)
840 if (zookeeper->exists(node_path + "/active", &stat) && stat.numChildren > 0)
841 {
842 LOG_INFO(log, "Task " << node_name << " should be deleted, but there are active workers. Skipping it.");
843 continue;
844 }
845
846 /// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
847 /// But the lock will be required to implement system.distributed_ddl_queue table
848 auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
849 if (!lock->tryLock())
850 {
851 LOG_INFO(log, "Task " << node_name << " should be deleted, but it is locked. Skipping it.");
852 continue;
853 }
854
855 if (node_lifetime_is_expired)
856 LOG_INFO(log, "Lifetime of task " << node_name << " is expired, deleting it");
857 else if (node_is_outside_max_window)
858 LOG_INFO(log, "Task " << node_name << " is outdated, deleting it");
859
860 /// Deleting
861 {
862 Strings childs = zookeeper->getChildren(node_path);
863 for (const String & child : childs)
864 {
865 if (child != "lock")
866 zookeeper->tryRemoveRecursive(node_path + "/" + child);
867 }
868
869 /// Remove the lock node and its parent atomically
870 Coordination::Requests ops;
871 ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1));
872 ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
873 zookeeper->multi(ops);
874
875 lock->unlockAssumeLockNodeRemovedManually();
876 }
877 }
878 catch (...)
879 {
880 LOG_INFO(log, "An error occured while checking and cleaning task " + node_name + " from queue: " + getCurrentExceptionMessage(false));
881 }
882 }
883}
884
885
886/// Try to create nonexisting "status" dirs for a node
887void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper)
888{
889 Coordination::Requests ops;
890 {
891 Coordination::CreateRequest request;
892 request.path = node_path + "/active";
893 ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
894 }
895 {
896 Coordination::CreateRequest request;
897 request.path = node_path + "/finished";
898 ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
899 }
900 Coordination::Responses responses;
901 int code = zookeeper->tryMulti(ops, responses);
902 if (code && code != Coordination::ZNODEEXISTS)
903 throw Coordination::Exception(code);
904}
905
906
907String DDLWorker::enqueueQuery(DDLLogEntry & entry)
908{
909 if (entry.hosts.empty())
910 throw Exception("Empty host list in a distributed DDL task", ErrorCodes::LOGICAL_ERROR);
911
912 auto zookeeper = getAndSetZooKeeper();
913
914 String query_path_prefix = queue_dir + "/query-";
915 zookeeper->createAncestors(query_path_prefix);
916
917 String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
918
919 /// Optional step
920 try
921 {
922 createStatusDirs(node_path, zookeeper);
923 }
924 catch (...)
925 {
926 LOG_INFO(log, "An error occurred while creating auxiliary ZooKeeper directories in " << node_path << " . They will be created later"
927 << ". Error : " << getCurrentExceptionMessage(true));
928 }
929
930 return node_path;
931}
932
933
934void DDLWorker::runMainThread()
935{
936 setThreadName("DDLWorker");
937 LOG_DEBUG(log, "Started DDLWorker thread");
938
939 bool initialized = false;
940 do
941 {
942 try
943 {
944 auto zookeeper = getAndSetZooKeeper();
945 zookeeper->createAncestors(queue_dir + "/");
946 initialized = true;
947 }
948 catch (const Coordination::Exception & e)
949 {
950 if (!Coordination::isHardwareError(e.code))
951 throw; /// A logical error.
952
953 tryLogCurrentException(__PRETTY_FUNCTION__);
954
955 /// Avoid busy loop when ZooKeeper is not available.
956 sleepForSeconds(1);
957 }
958 catch (...)
959 {
960 tryLogCurrentException(log, "Terminating. Cannot initialize DDL queue.");
961 return;
962 }
963 }
964 while (!initialized && !stop_flag);
965
966 while (!stop_flag)
967 {
968 try
969 {
970 attachToThreadGroup();
971
972 cleanup_event->set();
973 processTasks();
974
975 LOG_DEBUG(log, "Waiting a watch");
976 queue_updated_event->wait();
977 }
978 catch (const Coordination::Exception & e)
979 {
980 if (Coordination::isHardwareError(e.code))
981 {
982 LOG_DEBUG(log, "Recovering ZooKeeper session after: " << getCurrentExceptionMessage(false));
983
984 while (!stop_flag)
985 {
986 try
987 {
988 getAndSetZooKeeper();
989 break;
990 }
991 catch (...)
992 {
993 tryLogCurrentException(__PRETTY_FUNCTION__);
994
995 using namespace std::chrono_literals;
996 std::this_thread::sleep_for(5s);
997 }
998 }
999 }
1000 else if (e.code == Coordination::ZNONODE)
1001 {
1002 LOG_ERROR(log, "ZooKeeper error: " << getCurrentExceptionMessage(true));
1003 }
1004 else
1005 {
1006 LOG_ERROR(log, "Unexpected ZooKeeper error: " << getCurrentExceptionMessage(true) << ". Terminating.");
1007 return;
1008 }
1009 }
1010 catch (...)
1011 {
1012 tryLogCurrentException(log, "Unexpected error, will terminate:");
1013 return;
1014 }
1015 }
1016}
1017
1018
1019void DDLWorker::runCleanupThread()
1020{
1021 setThreadName("DDLWorkerClnr");
1022 LOG_DEBUG(log, "Started DDLWorker cleanup thread");
1023
1024 Int64 last_cleanup_time_seconds = 0;
1025 while (!stop_flag)
1026 {
1027 try
1028 {
1029 cleanup_event->wait();
1030 if (stop_flag)
1031 break;
1032
1033 Int64 current_time_seconds = Poco::Timestamp().epochTime();
1034 if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
1035 {
1036 LOG_TRACE(log, "Too early to clean queue, will do it later.");
1037 continue;
1038 }
1039
1040 auto zookeeper = tryGetZooKeeper();
1041 if (zookeeper->expired())
1042 continue;
1043
1044 cleanupQueue(current_time_seconds, zookeeper);
1045 last_cleanup_time_seconds = current_time_seconds;
1046 }
1047 catch (...)
1048 {
1049 tryLogCurrentException(log, __PRETTY_FUNCTION__);
1050 }
1051 }
1052}
1053
1054
1055class DDLQueryStatusInputStream : public IBlockInputStream
1056{
1057public:
1058
1059 DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_)
1060 : node_path(zk_node_path), context(context_), watch(CLOCK_MONOTONIC_COARSE), log(&Logger::get("DDLQueryStatusInputStream"))
1061 {
1062 sample = Block{
1063 {std::make_shared<DataTypeString>(), "host"},
1064 {std::make_shared<DataTypeUInt16>(), "port"},
1065 {std::make_shared<DataTypeInt64>(), "status"},
1066 {std::make_shared<DataTypeString>(), "error"},
1067 {std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
1068 {std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
1069 };
1070
1071 for (const HostID & host: entry.hosts)
1072 waiting_hosts.emplace(host.toString());
1073
1074 addTotalRowsApprox(entry.hosts.size());
1075
1076 timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout;
1077 }
1078
1079 String getName() const override
1080 {
1081 return "DDLQueryStatusInputStream";
1082 }
1083
1084 Block getHeader() const override { return sample; }
1085
1086 Block readImpl() override
1087 {
1088 Block res;
1089 if (num_hosts_finished >= waiting_hosts.size())
1090 {
1091 if (first_exception)
1092 throw Exception(*first_exception);
1093
1094 return res;
1095 }
1096
1097 auto zookeeper = context.getZooKeeper();
1098 size_t try_number = 0;
1099
1100 while (res.rows() == 0)
1101 {
1102 if (isCancelled())
1103 {
1104 if (first_exception)
1105 throw Exception(*first_exception);
1106
1107 return res;
1108 }
1109
1110 if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
1111 {
1112 size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
1113 size_t num_active_hosts = current_active_hosts.size();
1114
1115 std::stringstream msg;
1116 msg << "Watching task " << node_path << " is executing longer than distributed_ddl_task_timeout"
1117 << " (=" << timeout_seconds << ") seconds."
1118 << " There are " << num_unfinished_hosts << " unfinished hosts"
1119 << " (" << num_active_hosts << " of them are currently active)"
1120 << ", they are going to execute the query in background";
1121
1122 throw Exception(msg.str(), ErrorCodes::TIMEOUT_EXCEEDED);
1123 }
1124
1125 if (num_hosts_finished != 0 || try_number != 0)
1126 {
1127 auto current_sleep_for = std::chrono::milliseconds(std::min(static_cast<size_t>(1000), 50 * (try_number + 1)));
1128 std::this_thread::sleep_for(current_sleep_for);
1129 }
1130
1131 /// TODO: add shared lock
1132 if (!zookeeper->exists(node_path))
1133 {
1134 throw Exception("Cannot provide query execution status. The query's node " + node_path
1135 + " has been deleted by the cleaner since it was finished (or its lifetime is expired)",
1136 ErrorCodes::UNFINISHED);
1137 }
1138
1139 Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, node_path + "/finished"));
1140 ++try_number;
1141 if (new_hosts.empty())
1142 continue;
1143
1144 current_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
1145
1146 MutableColumns columns = sample.cloneEmptyColumns();
1147 for (const String & host_id : new_hosts)
1148 {
1149 ExecutionStatus status(-1, "Cannot obtain error message");
1150 {
1151 String status_data;
1152 if (zookeeper->tryGet(node_path + "/finished/" + host_id, status_data))
1153 status.tryDeserializeText(status_data);
1154 }
1155
1156 auto [host, port] = Cluster::Address::fromString(host_id);
1157
1158 if (status.code != 0 && first_exception == nullptr)
1159 first_exception = std::make_unique<Exception>("There was an error on [" + host + ":" + toString(port) + "]: " + status.message, status.code);
1160
1161 ++num_hosts_finished;
1162
1163 columns[0]->insert(host);
1164 columns[1]->insert(port);
1165 columns[2]->insert(status.code);
1166 columns[3]->insert(status.message);
1167 columns[4]->insert(waiting_hosts.size() - num_hosts_finished);
1168 columns[5]->insert(current_active_hosts.size());
1169 }
1170 res = sample.cloneWithColumns(std::move(columns));
1171 }
1172
1173 return res;
1174 }
1175
1176 Block getSampleBlock() const
1177 {
1178 return sample.cloneEmpty();
1179 }
1180
1181 ~DDLQueryStatusInputStream() override = default;
1182
1183private:
1184
1185 static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
1186 {
1187 Strings res;
1188 int code = zookeeper->tryGetChildren(node_path, res);
1189 if (code && code != Coordination::ZNONODE)
1190 throw Coordination::Exception(code, node_path);
1191 return res;
1192 }
1193
1194 Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts)
1195 {
1196 Strings diff;
1197 for (const String & host : current_list_of_finished_hosts)
1198 {
1199 if (!waiting_hosts.count(host))
1200 {
1201 if (!ignoring_hosts.count(host))
1202 {
1203 ignoring_hosts.emplace(host);
1204 LOG_INFO(log, "Unexpected host " << host << " appeared " << " in task " << node_path);
1205 }
1206 continue;
1207 }
1208
1209 if (!finished_hosts.count(host))
1210 {
1211 diff.emplace_back(host);
1212 finished_hosts.emplace(host);
1213 }
1214 }
1215
1216 return diff;
1217 }
1218
1219private:
1220 String node_path;
1221 const Context & context;
1222 Stopwatch watch;
1223 Logger * log;
1224
1225 Block sample;
1226
1227 NameSet waiting_hosts; /// hosts from task host list
1228 NameSet finished_hosts; /// finished hosts from host list
1229 NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
1230 Strings current_active_hosts; /// Hosts that were in active state at the last check
1231 size_t num_hosts_finished = 0;
1232
1233 /// Save the first detected error and throw it at the end of execution
1234 std::unique_ptr<Exception> first_exception;
1235
1236 Int64 timeout_seconds = 120;
1237};
1238
1239
1240BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context, NameSet && query_databases)
1241{
1242 /// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
1243 ASTPtr query_ptr = query_ptr_->clone();
1244 ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr);
1245
1246 // XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
1247 auto * query = dynamic_cast<ASTQueryWithOnCluster *>(query_ptr.get());
1248 if (!query)
1249 {
1250 throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
1251 }
1252
1253 if (!context.getSettingsRef().allow_distributed_ddl)
1254 throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
1255
1256 if (const auto * query_alter = query_ptr->as<ASTAlterQuery>())
1257 {
1258 for (const auto & command : query_alter->command_list->commands)
1259 {
1260 if (!isSupportedAlterType(command->type))
1261 throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
1262 }
1263 }
1264
1265 query->cluster = context.getMacros()->expand(query->cluster);
1266 ClusterPtr cluster = context.getCluster(query->cluster);
1267 DDLWorker & ddl_worker = context.getDDLWorker();
1268
1269 /// Check database access rights, assume that all servers have the same users config
1270 NameSet databases_to_access;
1271 const String & current_database = context.getCurrentDatabase();
1272
1273 Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
1274
1275 std::vector<HostID> hosts;
1276 bool use_shard_default_db = false;
1277 bool use_local_default_db = false;
1278 for (const auto & shard : shards)
1279 {
1280 for (const auto & addr : shard)
1281 {
1282 hosts.emplace_back(addr);
1283
1284 /// Expand empty database name to shards' default (o current) database name
1285 for (const String & database : query_databases)
1286 {
1287 if (database.empty())
1288 {
1289 bool has_shard_default_db = !addr.default_database.empty();
1290 use_shard_default_db |= has_shard_default_db;
1291 use_local_default_db |= !has_shard_default_db;
1292 databases_to_access.emplace(has_shard_default_db ? addr.default_database : current_database);
1293 }
1294 else
1295 databases_to_access.emplace(database);
1296 }
1297 }
1298 }
1299
1300 if (use_shard_default_db && use_local_default_db)
1301 throw Exception("Mixed local default DB and shard default DB in DDL query", ErrorCodes::NOT_IMPLEMENTED);
1302
1303 if (databases_to_access.empty())
1304 throw Exception("No databases to access in distributed DDL query", ErrorCodes::LOGICAL_ERROR);
1305
1306 for (const String & database : databases_to_access)
1307 context.checkDatabaseAccessRights(database);
1308
1309 if (use_local_default_db)
1310 {
1311 AddDefaultDatabaseVisitor visitor(current_database);
1312 visitor.visitDDL(query_ptr);
1313 }
1314
1315 DDLLogEntry entry;
1316 entry.hosts = std::move(hosts);
1317 entry.query = queryToString(query_ptr);
1318 entry.initiator = ddl_worker.getCommonHostID();
1319 String node_path = ddl_worker.enqueueQuery(entry);
1320
1321 BlockIO io;
1322 if (context.getSettingsRef().distributed_ddl_task_timeout == 0)
1323 return io;
1324
1325 auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context);
1326 io.in = std::move(stream);
1327 return io;
1328}
1329
1330
1331}
1332