1#pragma once
2
3#include <map>
4#include <Core/Settings.h>
5#include <Client/ConnectionPool.h>
6#include <Client/ConnectionPoolWithFailover.h>
7#include <Poco/Net/SocketAddress.h>
8
9namespace DB
10{
11
12/// Cluster contains connection pools to each node
13/// With the local nodes, the connection is not established, but the request is executed directly.
14/// Therefore we store only the number of local nodes
15/// In the config, the cluster includes nodes <node> or <shard>
16class Cluster
17{
18public:
19 Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);
20
21 /// Construct a cluster by the names of shards and replicas.
22 /// Local are treated as well as remote ones if treat_local_as_remote is true.
23 /// 'clickhouse_port' - port that this server instance listen for queries.
24 /// This parameter is needed only to check that some address is local (points to ourself).
25 Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
26 const String & username, const String & password,
27 UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false);
28
29 Cluster(const Cluster &) = delete;
30 Cluster & operator=(const Cluster &) = delete;
31
32 /// is used to set a limit on the size of the timeout
33 static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
34
35public:
36 struct Address
37 {
38 /** In configuration file,
39 * addresses are located either in <node> elements:
40 * <node>
41 * <host>example01-01-1</host>
42 * <port>9000</port>
43 * <!-- <user>, <password>, <default_database> if needed -->
44 * </node>
45 * ...
46 * or in <shard> and inside in <replica> elements:
47 * <shard>
48 * <replica>
49 * <host>example01-01-1</host>
50 * <port>9000</port>
51 * <!-- <user>, <password>, <default_database>. <secure> if needed -->
52 * </replica>
53 * </shard>
54 */
55
56 String host_name;
57 UInt16 port;
58 String user;
59 String password;
60 /// This database is selected when no database is specified for Distributed table
61 String default_database;
62 /// The locality is determined at the initialization, and is not changed even if DNS is changed
63 bool is_local = false;
64 bool user_specified = false;
65
66 Protocol::Compression compression = Protocol::Compression::Enable;
67 Protocol::Secure secure = Protocol::Secure::Disable;
68
69 Address() = default;
70 Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
71 Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_ = false);
72
73 /// Returns 'escaped_host_name:port'
74 String toString() const;
75
76 /// Returns 'host_name:port'
77 String readableString() const;
78
79 static String toString(const String & host_name, UInt16 port);
80
81 static std::pair<String, UInt16> fromString(const String & host_port_string);
82
83 /// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
84 String toFullString() const;
85 static Address fromFullString(const String & address_full_string);
86
87 /// Returns resolved address if it does resolve.
88 std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
89
90 auto tuple() const { return std::tie(host_name, port, secure, user, password, default_database); }
91 bool operator==(const Address & other) const { return tuple() == other.tuple(); }
92
93 private:
94 bool isLocal(UInt16 clickhouse_port) const;
95 };
96
97 using Addresses = std::vector<Address>;
98 using AddressesWithFailover = std::vector<Addresses>;
99
100 struct ShardInfo
101 {
102 public:
103 bool isLocal() const { return !local_addresses.empty(); }
104 bool hasRemoteConnections() const { return local_addresses.size() != per_replica_pools.size(); }
105 size_t getLocalNodeCount() const { return local_addresses.size(); }
106 bool hasInternalReplication() const { return has_internal_replication; }
107
108 public:
109 /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
110 std::string dir_name_for_internal_replication;
111 /// Number of the shard, the indexation begins with 1
112 UInt32 shard_num = 0;
113 UInt32 weight = 1;
114 Addresses local_addresses;
115 /// nullptr if there are no remote addresses
116 ConnectionPoolWithFailoverPtr pool;
117 /// Connection pool for each replica, contains nullptr for local replicas
118 ConnectionPoolPtrs per_replica_pools;
119 bool has_internal_replication = false;
120 };
121
122 using ShardsInfo = std::vector<ShardInfo>;
123
124 String getHashOfAddresses() const { return hash_of_addresses; }
125 const ShardsInfo & getShardsInfo() const { return shards_info; }
126 const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
127
128 const ShardInfo & getAnyShardInfo() const
129 {
130 if (shards_info.empty())
131 throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
132 return shards_info.front();
133 }
134
135 /// The number of remote shards.
136 size_t getRemoteShardCount() const { return remote_shard_count; }
137
138 /// The number of clickhouse nodes located locally
139 /// we access the local nodes directly.
140 size_t getLocalShardCount() const { return local_shard_count; }
141
142 /// The number of all shards.
143 size_t getShardCount() const { return shards_info.size(); }
144
145 /// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster.
146 std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
147
148 /// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster.
149 std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
150
151private:
152 using SlotToShard = std::vector<UInt64>;
153 SlotToShard slot_to_shard;
154
155public:
156 const SlotToShard & getSlotToShard() const { return slot_to_shard; }
157
158private:
159 void initMisc();
160
161 /// For getClusterWithMultipleShards implementation.
162 Cluster(const Cluster & from, const std::vector<size_t> & indices);
163
164 String hash_of_addresses;
165 /// Description of the cluster shards.
166 ShardsInfo shards_info;
167 /// Any remote shard.
168 ShardInfo * any_remote_shard_info = nullptr;
169
170 /// Non-empty is either addresses or addresses_with_failover.
171 /// The size and order of the elements in the corresponding array corresponds to shards_info.
172
173 /// An array of shards. For each shard, an array of replica addresses (servers that are considered identical).
174 AddressesWithFailover addresses_with_failover;
175
176 size_t remote_shard_count = 0;
177 size_t local_shard_count = 0;
178};
179
180using ClusterPtr = std::shared_ptr<Cluster>;
181
182
183class Clusters
184{
185public:
186 Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
187
188 Clusters(const Clusters &) = delete;
189 Clusters & operator=(const Clusters &) = delete;
190
191 ClusterPtr getCluster(const std::string & cluster_name) const;
192 void setCluster(const String & cluster_name, const ClusterPtr & cluster);
193
194 void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
195
196public:
197 using Impl = std::map<String, ClusterPtr>;
198
199 Impl getContainer() const;
200
201protected:
202 Impl impl;
203 mutable std::mutex mutex;
204};
205
206using ClustersPtr = std::shared_ptr<Clusters>;
207
208}
209