| 1 | #pragma once |
| 2 | #include <Interpreters/Context.h> |
| 3 | #include <Interpreters/Cluster.h> |
| 4 | #include <DataStreams/BlockIO.h> |
| 5 | #include <Common/CurrentThread.h> |
| 6 | #include <Common/ThreadPool.h> |
| 7 | #include <common/logger_useful.h> |
| 8 | #include <Storages/IStorage.h> |
| 9 | |
| 10 | #include <atomic> |
| 11 | #include <chrono> |
| 12 | #include <condition_variable> |
| 13 | #include <mutex> |
| 14 | #include <thread> |
| 15 | |
| 16 | namespace DB |
| 17 | { |
| 18 | |
| 19 | class ASTAlterQuery; |
| 20 | struct DDLLogEntry; |
| 21 | struct DDLTask; |
| 22 | |
| 23 | |
| 24 | /// Pushes distributed DDL query to the queue |
| 25 | BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, NameSet && query_databases); |
| 26 | |
| 27 | |
| 28 | class DDLWorker |
| 29 | { |
| 30 | public: |
| 31 | DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix); |
| 32 | ~DDLWorker(); |
| 33 | |
| 34 | /// Pushes query into DDL queue, returns path to created node |
| 35 | String enqueueQuery(DDLLogEntry & entry); |
| 36 | |
| 37 | /// Host ID (name:port) for logging purposes |
| 38 | /// Note that in each task hosts are identified individually by name:port from initiator server cluster config |
| 39 | std::string getCommonHostID() const |
| 40 | { |
| 41 | return host_fqdn_id; |
| 42 | } |
| 43 | |
| 44 | private: |
| 45 | using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>; |
| 46 | |
| 47 | /// Returns cached ZooKeeper session (possibly expired). |
| 48 | ZooKeeperPtr tryGetZooKeeper() const; |
| 49 | /// If necessary, creates a new session and caches it. |
| 50 | ZooKeeperPtr getAndSetZooKeeper(); |
| 51 | |
| 52 | void processTasks(); |
| 53 | |
| 54 | /// Reads entry and check that the host belongs to host list of the task |
| 55 | /// Returns true and sets current_task if entry parsed and the check is passed |
| 56 | bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); |
| 57 | |
| 58 | void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper); |
| 59 | |
| 60 | /// Check that query should be executed on leader replica only |
| 61 | bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage) const; |
| 62 | |
| 63 | /// Check that shard has consistent config with table |
| 64 | void checkShardConfig(const String & table, const DDLTask & taks, StoragePtr storage) const; |
| 65 | |
| 66 | /// Executes query only on leader replica in case of replicated table. |
| 67 | /// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard. |
| 68 | /// Most of these queries can be executed on non-leader replica, but actually they still send |
| 69 | /// query via RemoteBlockOutputStream to leader, so to avoid such "2-phase" query execution we |
| 70 | /// execute query directly on leader. |
| 71 | bool tryExecuteQueryOnLeaderReplica( |
| 72 | DDLTask & task, |
| 73 | StoragePtr storage, |
| 74 | const String & rewritten_query, |
| 75 | const String & node_path, |
| 76 | const ZooKeeperPtr & zookeeper); |
| 77 | |
| 78 | void parseQueryAndResolveHost(DDLTask & task); |
| 79 | |
| 80 | bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status); |
| 81 | |
| 82 | /// Checks and cleanups queue's nodes |
| 83 | void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper); |
| 84 | |
| 85 | /// Init task node |
| 86 | void createStatusDirs(const std::string & node_name, const ZooKeeperPtr & zookeeper); |
| 87 | |
| 88 | |
| 89 | void runMainThread(); |
| 90 | void runCleanupThread(); |
| 91 | |
| 92 | void attachToThreadGroup(); |
| 93 | |
| 94 | private: |
| 95 | Context & context; |
| 96 | Logger * log; |
| 97 | std::unique_ptr<Context> current_context; |
| 98 | |
| 99 | std::string host_fqdn; /// current host domain name |
| 100 | std::string host_fqdn_id; /// host_name:port |
| 101 | std::string queue_dir; /// dir with queue of queries |
| 102 | |
| 103 | /// Name of last task that was skipped or successfully executed |
| 104 | std::string last_processed_task_name; |
| 105 | |
| 106 | mutable std::mutex zookeeper_mutex; |
| 107 | ZooKeeperPtr current_zookeeper; |
| 108 | |
| 109 | /// Save state of executed task to avoid duplicate execution on ZK error |
| 110 | using DDLTaskPtr = std::unique_ptr<DDLTask>; |
| 111 | DDLTaskPtr current_task; |
| 112 | |
| 113 | std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>(); |
| 114 | std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>(); |
| 115 | std::atomic<bool> stop_flag{false}; |
| 116 | |
| 117 | ThreadFromGlobalPool main_thread; |
| 118 | ThreadFromGlobalPool cleanup_thread; |
| 119 | |
| 120 | /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago |
| 121 | Int64 cleanup_delay_period = 60; // minute (in seconds) |
| 122 | /// Delete node if its age is greater than that |
| 123 | Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds) |
| 124 | /// How many tasks could be in the queue |
| 125 | size_t max_tasks_in_queue = 1000; |
| 126 | |
| 127 | ThreadGroupStatusPtr thread_group; |
| 128 | |
| 129 | friend class DDLQueryStatusInputStream; |
| 130 | friend struct DDLTask; |
| 131 | }; |
| 132 | |
| 133 | |
| 134 | } |
| 135 | |