1 | #pragma once |
2 | #include <Interpreters/Context.h> |
3 | #include <Interpreters/Cluster.h> |
4 | #include <DataStreams/BlockIO.h> |
5 | #include <Common/CurrentThread.h> |
6 | #include <Common/ThreadPool.h> |
7 | #include <common/logger_useful.h> |
8 | #include <Storages/IStorage.h> |
9 | |
10 | #include <atomic> |
11 | #include <chrono> |
12 | #include <condition_variable> |
13 | #include <mutex> |
14 | #include <thread> |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | class ASTAlterQuery; |
20 | struct DDLLogEntry; |
21 | struct DDLTask; |
22 | |
23 | |
24 | /// Pushes distributed DDL query to the queue |
25 | BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, NameSet && query_databases); |
26 | |
27 | |
28 | class DDLWorker |
29 | { |
30 | public: |
31 | DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix); |
32 | ~DDLWorker(); |
33 | |
34 | /// Pushes query into DDL queue, returns path to created node |
35 | String enqueueQuery(DDLLogEntry & entry); |
36 | |
37 | /// Host ID (name:port) for logging purposes |
38 | /// Note that in each task hosts are identified individually by name:port from initiator server cluster config |
39 | std::string getCommonHostID() const |
40 | { |
41 | return host_fqdn_id; |
42 | } |
43 | |
44 | private: |
45 | using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>; |
46 | |
47 | /// Returns cached ZooKeeper session (possibly expired). |
48 | ZooKeeperPtr tryGetZooKeeper() const; |
49 | /// If necessary, creates a new session and caches it. |
50 | ZooKeeperPtr getAndSetZooKeeper(); |
51 | |
52 | void processTasks(); |
53 | |
54 | /// Reads entry and check that the host belongs to host list of the task |
55 | /// Returns true and sets current_task if entry parsed and the check is passed |
56 | bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); |
57 | |
58 | void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper); |
59 | |
60 | /// Check that query should be executed on leader replica only |
61 | bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage) const; |
62 | |
63 | /// Check that shard has consistent config with table |
64 | void checkShardConfig(const String & table, const DDLTask & taks, StoragePtr storage) const; |
65 | |
66 | /// Executes query only on leader replica in case of replicated table. |
67 | /// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard. |
68 | /// Most of these queries can be executed on non-leader replica, but actually they still send |
69 | /// query via RemoteBlockOutputStream to leader, so to avoid such "2-phase" query execution we |
70 | /// execute query directly on leader. |
71 | bool tryExecuteQueryOnLeaderReplica( |
72 | DDLTask & task, |
73 | StoragePtr storage, |
74 | const String & rewritten_query, |
75 | const String & node_path, |
76 | const ZooKeeperPtr & zookeeper); |
77 | |
78 | void parseQueryAndResolveHost(DDLTask & task); |
79 | |
80 | bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status); |
81 | |
82 | /// Checks and cleanups queue's nodes |
83 | void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper); |
84 | |
85 | /// Init task node |
86 | void createStatusDirs(const std::string & node_name, const ZooKeeperPtr & zookeeper); |
87 | |
88 | |
89 | void runMainThread(); |
90 | void runCleanupThread(); |
91 | |
92 | void attachToThreadGroup(); |
93 | |
94 | private: |
95 | Context & context; |
96 | Logger * log; |
97 | std::unique_ptr<Context> current_context; |
98 | |
99 | std::string host_fqdn; /// current host domain name |
100 | std::string host_fqdn_id; /// host_name:port |
101 | std::string queue_dir; /// dir with queue of queries |
102 | |
103 | /// Name of last task that was skipped or successfully executed |
104 | std::string last_processed_task_name; |
105 | |
106 | mutable std::mutex zookeeper_mutex; |
107 | ZooKeeperPtr current_zookeeper; |
108 | |
109 | /// Save state of executed task to avoid duplicate execution on ZK error |
110 | using DDLTaskPtr = std::unique_ptr<DDLTask>; |
111 | DDLTaskPtr current_task; |
112 | |
113 | std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>(); |
114 | std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>(); |
115 | std::atomic<bool> stop_flag{false}; |
116 | |
117 | ThreadFromGlobalPool main_thread; |
118 | ThreadFromGlobalPool cleanup_thread; |
119 | |
120 | /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago |
121 | Int64 cleanup_delay_period = 60; // minute (in seconds) |
122 | /// Delete node if its age is greater than that |
123 | Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds) |
124 | /// How many tasks could be in the queue |
125 | size_t max_tasks_in_queue = 1000; |
126 | |
127 | ThreadGroupStatusPtr thread_group; |
128 | |
129 | friend class DDLQueryStatusInputStream; |
130 | friend struct DDLTask; |
131 | }; |
132 | |
133 | |
134 | } |
135 | |