1#pragma once
2#include <Interpreters/Context.h>
3#include <Interpreters/Cluster.h>
4#include <DataStreams/BlockIO.h>
5#include <Common/CurrentThread.h>
6#include <Common/ThreadPool.h>
7#include <common/logger_useful.h>
8#include <Storages/IStorage.h>
9
10#include <atomic>
11#include <chrono>
12#include <condition_variable>
13#include <mutex>
14#include <thread>
15
16namespace DB
17{
18
19class ASTAlterQuery;
20struct DDLLogEntry;
21struct DDLTask;
22
23
24/// Pushes distributed DDL query to the queue
25BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, NameSet && query_databases);
26
27
28class DDLWorker
29{
30public:
31 DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix);
32 ~DDLWorker();
33
34 /// Pushes query into DDL queue, returns path to created node
35 String enqueueQuery(DDLLogEntry & entry);
36
37 /// Host ID (name:port) for logging purposes
38 /// Note that in each task hosts are identified individually by name:port from initiator server cluster config
39 std::string getCommonHostID() const
40 {
41 return host_fqdn_id;
42 }
43
44private:
45 using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
46
47 /// Returns cached ZooKeeper session (possibly expired).
48 ZooKeeperPtr tryGetZooKeeper() const;
49 /// If necessary, creates a new session and caches it.
50 ZooKeeperPtr getAndSetZooKeeper();
51
52 void processTasks();
53
54 /// Reads entry and check that the host belongs to host list of the task
55 /// Returns true and sets current_task if entry parsed and the check is passed
56 bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
57
58 void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper);
59
60 /// Check that query should be executed on leader replica only
61 bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage) const;
62
63 /// Check that shard has consistent config with table
64 void checkShardConfig(const String & table, const DDLTask & taks, StoragePtr storage) const;
65
66 /// Executes query only on leader replica in case of replicated table.
67 /// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard.
68 /// Most of these queries can be executed on non-leader replica, but actually they still send
69 /// query via RemoteBlockOutputStream to leader, so to avoid such "2-phase" query execution we
70 /// execute query directly on leader.
71 bool tryExecuteQueryOnLeaderReplica(
72 DDLTask & task,
73 StoragePtr storage,
74 const String & rewritten_query,
75 const String & node_path,
76 const ZooKeeperPtr & zookeeper);
77
78 void parseQueryAndResolveHost(DDLTask & task);
79
80 bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status);
81
82 /// Checks and cleanups queue's nodes
83 void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper);
84
85 /// Init task node
86 void createStatusDirs(const std::string & node_name, const ZooKeeperPtr & zookeeper);
87
88
89 void runMainThread();
90 void runCleanupThread();
91
92 void attachToThreadGroup();
93
94private:
95 Context & context;
96 Logger * log;
97 std::unique_ptr<Context> current_context;
98
99 std::string host_fqdn; /// current host domain name
100 std::string host_fqdn_id; /// host_name:port
101 std::string queue_dir; /// dir with queue of queries
102
103 /// Name of last task that was skipped or successfully executed
104 std::string last_processed_task_name;
105
106 mutable std::mutex zookeeper_mutex;
107 ZooKeeperPtr current_zookeeper;
108
109 /// Save state of executed task to avoid duplicate execution on ZK error
110 using DDLTaskPtr = std::unique_ptr<DDLTask>;
111 DDLTaskPtr current_task;
112
113 std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
114 std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
115 std::atomic<bool> stop_flag{false};
116
117 ThreadFromGlobalPool main_thread;
118 ThreadFromGlobalPool cleanup_thread;
119
120 /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
121 Int64 cleanup_delay_period = 60; // minute (in seconds)
122 /// Delete node if its age is greater than that
123 Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
124 /// How many tasks could be in the queue
125 size_t max_tasks_in_queue = 1000;
126
127 ThreadGroupStatusPtr thread_group;
128
129 friend class DDLQueryStatusInputStream;
130 friend struct DDLTask;
131};
132
133
134}
135