| 1 | #pragma once | 
|---|
| 2 | #include <Interpreters/Context.h> | 
|---|
| 3 | #include <Interpreters/Cluster.h> | 
|---|
| 4 | #include <DataStreams/BlockIO.h> | 
|---|
| 5 | #include <Common/CurrentThread.h> | 
|---|
| 6 | #include <Common/ThreadPool.h> | 
|---|
| 7 | #include <common/logger_useful.h> | 
|---|
| 8 | #include <Storages/IStorage.h> | 
|---|
| 9 |  | 
|---|
| 10 | #include <atomic> | 
|---|
| 11 | #include <chrono> | 
|---|
| 12 | #include <condition_variable> | 
|---|
| 13 | #include <mutex> | 
|---|
| 14 | #include <thread> | 
|---|
| 15 |  | 
|---|
| 16 | namespace DB | 
|---|
| 17 | { | 
|---|
| 18 |  | 
|---|
| 19 | class ASTAlterQuery; | 
|---|
| 20 | struct DDLLogEntry; | 
|---|
| 21 | struct DDLTask; | 
|---|
| 22 |  | 
|---|
| 23 |  | 
|---|
| 24 | /// Pushes distributed DDL query to the queue | 
|---|
| 25 | BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, NameSet && query_databases); | 
|---|
| 26 |  | 
|---|
| 27 |  | 
|---|
| 28 | class DDLWorker | 
|---|
| 29 | { | 
|---|
| 30 | public: | 
|---|
| 31 | DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix); | 
|---|
| 32 | ~DDLWorker(); | 
|---|
| 33 |  | 
|---|
| 34 | /// Pushes query into DDL queue, returns path to created node | 
|---|
| 35 | String enqueueQuery(DDLLogEntry & entry); | 
|---|
| 36 |  | 
|---|
| 37 | /// Host ID (name:port) for logging purposes | 
|---|
| 38 | /// Note that in each task hosts are identified individually by name:port from initiator server cluster config | 
|---|
| 39 | std::string getCommonHostID() const | 
|---|
| 40 | { | 
|---|
| 41 | return host_fqdn_id; | 
|---|
| 42 | } | 
|---|
| 43 |  | 
|---|
| 44 | private: | 
|---|
| 45 | using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>; | 
|---|
| 46 |  | 
|---|
| 47 | /// Returns cached ZooKeeper session (possibly expired). | 
|---|
| 48 | ZooKeeperPtr tryGetZooKeeper() const; | 
|---|
| 49 | /// If necessary, creates a new session and caches it. | 
|---|
| 50 | ZooKeeperPtr getAndSetZooKeeper(); | 
|---|
| 51 |  | 
|---|
| 52 | void processTasks(); | 
|---|
| 53 |  | 
|---|
| 54 | /// Reads entry and check that the host belongs to host list of the task | 
|---|
| 55 | /// Returns true and sets current_task if entry parsed and the check is passed | 
|---|
| 56 | bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); | 
|---|
| 57 |  | 
|---|
| 58 | void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper); | 
|---|
| 59 |  | 
|---|
| 60 | /// Check that query should be executed on leader replica only | 
|---|
| 61 | bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage) const; | 
|---|
| 62 |  | 
|---|
| 63 | /// Check that shard has consistent config with table | 
|---|
| 64 | void checkShardConfig(const String & table, const DDLTask & taks, StoragePtr storage) const; | 
|---|
| 65 |  | 
|---|
| 66 | /// Executes query only on leader replica in case of replicated table. | 
|---|
| 67 | /// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard. | 
|---|
| 68 | /// Most of these queries can be executed on non-leader replica, but actually they still send | 
|---|
| 69 | /// query via RemoteBlockOutputStream to leader, so to avoid such "2-phase" query execution we | 
|---|
| 70 | /// execute query directly on leader. | 
|---|
| 71 | bool tryExecuteQueryOnLeaderReplica( | 
|---|
| 72 | DDLTask & task, | 
|---|
| 73 | StoragePtr storage, | 
|---|
| 74 | const String & rewritten_query, | 
|---|
| 75 | const String & node_path, | 
|---|
| 76 | const ZooKeeperPtr & zookeeper); | 
|---|
| 77 |  | 
|---|
| 78 | void parseQueryAndResolveHost(DDLTask & task); | 
|---|
| 79 |  | 
|---|
| 80 | bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status); | 
|---|
| 81 |  | 
|---|
| 82 | /// Checks and cleanups queue's nodes | 
|---|
| 83 | void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper); | 
|---|
| 84 |  | 
|---|
| 85 | /// Init task node | 
|---|
| 86 | void createStatusDirs(const std::string & node_name, const ZooKeeperPtr & zookeeper); | 
|---|
| 87 |  | 
|---|
| 88 |  | 
|---|
| 89 | void runMainThread(); | 
|---|
| 90 | void runCleanupThread(); | 
|---|
| 91 |  | 
|---|
| 92 | void attachToThreadGroup(); | 
|---|
| 93 |  | 
|---|
| 94 | private: | 
|---|
| 95 | Context & context; | 
|---|
| 96 | Logger * log; | 
|---|
| 97 | std::unique_ptr<Context> current_context; | 
|---|
| 98 |  | 
|---|
| 99 | std::string host_fqdn;      /// current host domain name | 
|---|
| 100 | std::string host_fqdn_id;   /// host_name:port | 
|---|
| 101 | std::string queue_dir;      /// dir with queue of queries | 
|---|
| 102 |  | 
|---|
| 103 | /// Name of last task that was skipped or successfully executed | 
|---|
| 104 | std::string last_processed_task_name; | 
|---|
| 105 |  | 
|---|
| 106 | mutable std::mutex zookeeper_mutex; | 
|---|
| 107 | ZooKeeperPtr current_zookeeper; | 
|---|
| 108 |  | 
|---|
| 109 | /// Save state of executed task to avoid duplicate execution on ZK error | 
|---|
| 110 | using DDLTaskPtr = std::unique_ptr<DDLTask>; | 
|---|
| 111 | DDLTaskPtr current_task; | 
|---|
| 112 |  | 
|---|
| 113 | std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>(); | 
|---|
| 114 | std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>(); | 
|---|
| 115 | std::atomic<bool> stop_flag{false}; | 
|---|
| 116 |  | 
|---|
| 117 | ThreadFromGlobalPool main_thread; | 
|---|
| 118 | ThreadFromGlobalPool cleanup_thread; | 
|---|
| 119 |  | 
|---|
| 120 | /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago | 
|---|
| 121 | Int64 cleanup_delay_period = 60; // minute (in seconds) | 
|---|
| 122 | /// Delete node if its age is greater than that | 
|---|
| 123 | Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds) | 
|---|
| 124 | /// How many tasks could be in the queue | 
|---|
| 125 | size_t max_tasks_in_queue = 1000; | 
|---|
| 126 |  | 
|---|
| 127 | ThreadGroupStatusPtr thread_group; | 
|---|
| 128 |  | 
|---|
| 129 | friend class DDLQueryStatusInputStream; | 
|---|
| 130 | friend struct DDLTask; | 
|---|
| 131 | }; | 
|---|
| 132 |  | 
|---|
| 133 |  | 
|---|
| 134 | } | 
|---|
| 135 |  | 
|---|