| 1 | #pragma once |
| 2 | |
| 3 | #include <map> |
| 4 | #include <Core/Settings.h> |
| 5 | #include <Client/ConnectionPool.h> |
| 6 | #include <Client/ConnectionPoolWithFailover.h> |
| 7 | #include <Poco/Net/SocketAddress.h> |
| 8 | |
| 9 | namespace DB |
| 10 | { |
| 11 | |
| 12 | /// Cluster contains connection pools to each node |
| 13 | /// With the local nodes, the connection is not established, but the request is executed directly. |
| 14 | /// Therefore we store only the number of local nodes |
| 15 | /// In the config, the cluster includes nodes <node> or <shard> |
| 16 | class Cluster |
| 17 | { |
| 18 | public: |
| 19 | Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name); |
| 20 | |
| 21 | /// Construct a cluster by the names of shards and replicas. |
| 22 | /// Local are treated as well as remote ones if treat_local_as_remote is true. |
| 23 | /// 'clickhouse_port' - port that this server instance listen for queries. |
| 24 | /// This parameter is needed only to check that some address is local (points to ourself). |
| 25 | Cluster(const Settings & settings, const std::vector<std::vector<String>> & names, |
| 26 | const String & username, const String & password, |
| 27 | UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false); |
| 28 | |
| 29 | Cluster(const Cluster &) = delete; |
| 30 | Cluster & operator=(const Cluster &) = delete; |
| 31 | |
| 32 | /// is used to set a limit on the size of the timeout |
| 33 | static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); |
| 34 | |
| 35 | public: |
| 36 | struct Address |
| 37 | { |
| 38 | /** In configuration file, |
| 39 | * addresses are located either in <node> elements: |
| 40 | * <node> |
| 41 | * <host>example01-01-1</host> |
| 42 | * <port>9000</port> |
| 43 | * <!-- <user>, <password>, <default_database> if needed --> |
| 44 | * </node> |
| 45 | * ... |
| 46 | * or in <shard> and inside in <replica> elements: |
| 47 | * <shard> |
| 48 | * <replica> |
| 49 | * <host>example01-01-1</host> |
| 50 | * <port>9000</port> |
| 51 | * <!-- <user>, <password>, <default_database>. <secure> if needed --> |
| 52 | * </replica> |
| 53 | * </shard> |
| 54 | */ |
| 55 | |
| 56 | String host_name; |
| 57 | UInt16 port; |
| 58 | String user; |
| 59 | String password; |
| 60 | /// This database is selected when no database is specified for Distributed table |
| 61 | String default_database; |
| 62 | /// The locality is determined at the initialization, and is not changed even if DNS is changed |
| 63 | bool is_local = false; |
| 64 | bool user_specified = false; |
| 65 | |
| 66 | Protocol::Compression compression = Protocol::Compression::Enable; |
| 67 | Protocol::Secure secure = Protocol::Secure::Disable; |
| 68 | |
| 69 | Address() = default; |
| 70 | Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix); |
| 71 | Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_ = false); |
| 72 | |
| 73 | /// Returns 'escaped_host_name:port' |
| 74 | String toString() const; |
| 75 | |
| 76 | /// Returns 'host_name:port' |
| 77 | String readableString() const; |
| 78 | |
| 79 | static String toString(const String & host_name, UInt16 port); |
| 80 | |
| 81 | static std::pair<String, UInt16> fromString(const String & host_port_string); |
| 82 | |
| 83 | /// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database |
| 84 | String toFullString() const; |
| 85 | static Address fromFullString(const String & address_full_string); |
| 86 | |
| 87 | /// Returns resolved address if it does resolve. |
| 88 | std::optional<Poco::Net::SocketAddress> getResolvedAddress() const; |
| 89 | |
| 90 | auto tuple() const { return std::tie(host_name, port, secure, user, password, default_database); } |
| 91 | bool operator==(const Address & other) const { return tuple() == other.tuple(); } |
| 92 | |
| 93 | private: |
| 94 | bool isLocal(UInt16 clickhouse_port) const; |
| 95 | }; |
| 96 | |
| 97 | using Addresses = std::vector<Address>; |
| 98 | using AddressesWithFailover = std::vector<Addresses>; |
| 99 | |
| 100 | struct ShardInfo |
| 101 | { |
| 102 | public: |
| 103 | bool isLocal() const { return !local_addresses.empty(); } |
| 104 | bool hasRemoteConnections() const { return local_addresses.size() != per_replica_pools.size(); } |
| 105 | size_t getLocalNodeCount() const { return local_addresses.size(); } |
| 106 | bool hasInternalReplication() const { return has_internal_replication; } |
| 107 | |
| 108 | public: |
| 109 | /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication |
| 110 | std::string dir_name_for_internal_replication; |
| 111 | /// Number of the shard, the indexation begins with 1 |
| 112 | UInt32 shard_num = 0; |
| 113 | UInt32 weight = 1; |
| 114 | Addresses local_addresses; |
| 115 | /// nullptr if there are no remote addresses |
| 116 | ConnectionPoolWithFailoverPtr pool; |
| 117 | /// Connection pool for each replica, contains nullptr for local replicas |
| 118 | ConnectionPoolPtrs per_replica_pools; |
| 119 | bool has_internal_replication = false; |
| 120 | }; |
| 121 | |
| 122 | using ShardsInfo = std::vector<ShardInfo>; |
| 123 | |
| 124 | String getHashOfAddresses() const { return hash_of_addresses; } |
| 125 | const ShardsInfo & getShardsInfo() const { return shards_info; } |
| 126 | const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; } |
| 127 | |
| 128 | const ShardInfo & getAnyShardInfo() const |
| 129 | { |
| 130 | if (shards_info.empty()) |
| 131 | throw Exception("Cluster is empty" , ErrorCodes::LOGICAL_ERROR); |
| 132 | return shards_info.front(); |
| 133 | } |
| 134 | |
| 135 | /// The number of remote shards. |
| 136 | size_t getRemoteShardCount() const { return remote_shard_count; } |
| 137 | |
| 138 | /// The number of clickhouse nodes located locally |
| 139 | /// we access the local nodes directly. |
| 140 | size_t getLocalShardCount() const { return local_shard_count; } |
| 141 | |
| 142 | /// The number of all shards. |
| 143 | size_t getShardCount() const { return shards_info.size(); } |
| 144 | |
| 145 | /// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster. |
| 146 | std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const; |
| 147 | |
| 148 | /// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster. |
| 149 | std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const; |
| 150 | |
| 151 | private: |
| 152 | using SlotToShard = std::vector<UInt64>; |
| 153 | SlotToShard slot_to_shard; |
| 154 | |
| 155 | public: |
| 156 | const SlotToShard & getSlotToShard() const { return slot_to_shard; } |
| 157 | |
| 158 | private: |
| 159 | void initMisc(); |
| 160 | |
| 161 | /// For getClusterWithMultipleShards implementation. |
| 162 | Cluster(const Cluster & from, const std::vector<size_t> & indices); |
| 163 | |
| 164 | String hash_of_addresses; |
| 165 | /// Description of the cluster shards. |
| 166 | ShardsInfo shards_info; |
| 167 | /// Any remote shard. |
| 168 | ShardInfo * any_remote_shard_info = nullptr; |
| 169 | |
| 170 | /// Non-empty is either addresses or addresses_with_failover. |
| 171 | /// The size and order of the elements in the corresponding array corresponds to shards_info. |
| 172 | |
| 173 | /// An array of shards. For each shard, an array of replica addresses (servers that are considered identical). |
| 174 | AddressesWithFailover addresses_with_failover; |
| 175 | |
| 176 | size_t remote_shard_count = 0; |
| 177 | size_t local_shard_count = 0; |
| 178 | }; |
| 179 | |
| 180 | using ClusterPtr = std::shared_ptr<Cluster>; |
| 181 | |
| 182 | |
| 183 | class Clusters |
| 184 | { |
| 185 | public: |
| 186 | Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers" ); |
| 187 | |
| 188 | Clusters(const Clusters &) = delete; |
| 189 | Clusters & operator=(const Clusters &) = delete; |
| 190 | |
| 191 | ClusterPtr getCluster(const std::string & cluster_name) const; |
| 192 | void setCluster(const String & cluster_name, const ClusterPtr & cluster); |
| 193 | |
| 194 | void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name); |
| 195 | |
| 196 | public: |
| 197 | using Impl = std::map<String, ClusterPtr>; |
| 198 | |
| 199 | Impl getContainer() const; |
| 200 | |
| 201 | protected: |
| 202 | Impl impl; |
| 203 | mutable std::mutex mutex; |
| 204 | }; |
| 205 | |
| 206 | using ClustersPtr = std::shared_ptr<Clusters>; |
| 207 | |
| 208 | } |
| 209 | |