1 | #include <Interpreters/Cluster.h> |
2 | #include <common/SimpleCache.h> |
3 | #include <Common/DNSResolver.h> |
4 | #include <Common/escapeForFileName.h> |
5 | #include <Common/isLocalAddress.h> |
6 | #include <Common/StringUtils/StringUtils.h> |
7 | #include <Common/parseAddress.h> |
8 | #include <IO/HexWriteBuffer.h> |
9 | #include <IO/WriteHelpers.h> |
10 | #include <IO/ReadHelpers.h> |
11 | #include <Poco/Util/AbstractConfiguration.h> |
12 | #include <Poco/Util/Application.h> |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | namespace ErrorCodes |
18 | { |
19 | extern const int UNKNOWN_ELEMENT_IN_CONFIG; |
20 | extern const int EXCESSIVE_ELEMENT_IN_CONFIG; |
21 | extern const int LOGICAL_ERROR; |
22 | extern const int SHARD_HAS_NO_CONNECTIONS; |
23 | extern const int SYNTAX_ERROR; |
24 | } |
25 | |
26 | namespace |
27 | { |
28 | |
29 | /// Default shard weight. |
30 | static constexpr UInt32 default_weight = 1; |
31 | |
32 | inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::SocketAddress & resolved_address, UInt16 clickhouse_port) |
33 | { |
34 | /// If there is replica, for which: |
35 | /// - its port is the same that the server is listening; |
36 | /// - its host is resolved to set of addresses, one of which is the same as one of addresses of network interfaces of the server machine*; |
37 | /// then we must go to this shard without any inter-process communication. |
38 | /// |
39 | /// * - this criteria is somewhat approximate. |
40 | /// |
41 | /// Also, replica is considered non-local, if it has default database set |
42 | /// (only reason is to avoid query rewrite). |
43 | |
44 | return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port); |
45 | } |
46 | |
47 | } |
48 | |
49 | /// Implementation of Cluster::Address class |
50 | |
51 | std::optional<Poco::Net::SocketAddress> Cluster::Address::getResolvedAddress() const |
52 | { |
53 | try |
54 | { |
55 | return DNSResolver::instance().resolveAddress(host_name, port); |
56 | } |
57 | catch (...) |
58 | { |
59 | /// Failure in DNS resolution in cluster initialization is Ok. |
60 | tryLogCurrentException("Cluster" ); |
61 | return {}; |
62 | } |
63 | } |
64 | |
65 | |
66 | bool Cluster::Address::isLocal(UInt16 clickhouse_port) const |
67 | { |
68 | if (auto resolved = getResolvedAddress()) |
69 | return isLocalImpl(*this, *resolved, clickhouse_port); |
70 | return false; |
71 | } |
72 | |
73 | |
74 | Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix) |
75 | { |
76 | host_name = config.getString(config_prefix + ".host" ); |
77 | port = static_cast<UInt16>(config.getInt(config_prefix + ".port" )); |
78 | if (config.has(config_prefix + ".user" )) |
79 | user_specified = true; |
80 | |
81 | user = config.getString(config_prefix + ".user" , "default" ); |
82 | password = config.getString(config_prefix + ".password" , "" ); |
83 | default_database = config.getString(config_prefix + ".default_database" , "" ); |
84 | secure = config.getBool(config_prefix + ".secure" , false) ? Protocol::Secure::Enable : Protocol::Secure::Disable; |
85 | compression = config.getBool(config_prefix + ".compression" , true) ? Protocol::Compression::Enable : Protocol::Compression::Disable; |
86 | const char * port_type = secure == Protocol::Secure::Enable ? "tcp_port_secure" : "tcp_port" ; |
87 | is_local = isLocal(config.getInt(port_type, 0)); |
88 | } |
89 | |
90 | |
91 | Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_) |
92 | : user(user_), password(password_) |
93 | { |
94 | auto parsed_host_port = parseAddress(host_port_, clickhouse_port); |
95 | host_name = parsed_host_port.first; |
96 | port = parsed_host_port.second; |
97 | secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable; |
98 | is_local = isLocal(clickhouse_port); |
99 | } |
100 | |
101 | |
102 | String Cluster::Address::toString() const |
103 | { |
104 | return toString(host_name, port); |
105 | } |
106 | |
107 | String Cluster::Address::toString(const String & host_name, UInt16 port) |
108 | { |
109 | return escapeForFileName(host_name) + ':' + DB::toString(port); |
110 | } |
111 | |
112 | String Cluster::Address::readableString() const |
113 | { |
114 | String res; |
115 | |
116 | /// If it looks like IPv6 address add braces to avoid ambiguity in ipv6_host:port notation |
117 | if (host_name.find_first_of(':') != std::string::npos && !host_name.empty() && host_name.back() != ']') |
118 | res += '[' + host_name + ']'; |
119 | else |
120 | res += host_name; |
121 | |
122 | res += ':' + DB::toString(port); |
123 | return res; |
124 | } |
125 | |
126 | std::pair<String, UInt16> Cluster::Address::fromString(const String & host_port_string) |
127 | { |
128 | auto pos = host_port_string.find_last_of(':'); |
129 | if (pos == std::string::npos) |
130 | throw Exception("Incorrect <host>:<port> format " + host_port_string, ErrorCodes::SYNTAX_ERROR); |
131 | |
132 | return {unescapeForFileName(host_port_string.substr(0, pos)), parse<UInt16>(host_port_string.substr(pos + 1))}; |
133 | } |
134 | |
135 | |
136 | String Cluster::Address::toFullString() const |
137 | { |
138 | return |
139 | escapeForFileName(user) + |
140 | (password.empty() ? "" : (':' + escapeForFileName(password))) + '@' + |
141 | escapeForFileName(host_name) + ':' + |
142 | std::to_string(port) + |
143 | (default_database.empty() ? "" : ('#' + escapeForFileName(default_database))) |
144 | + ((secure == Protocol::Secure::Enable) ? "+secure" : "" ); |
145 | } |
146 | |
147 | Cluster::Address Cluster::Address::fromFullString(const String & full_string) |
148 | { |
149 | const char * address_begin = full_string.data(); |
150 | const char * address_end = address_begin + full_string.size(); |
151 | |
152 | Protocol::Secure secure = Protocol::Secure::Disable; |
153 | const char * secure_tag = "+secure" ; |
154 | if (endsWith(full_string, secure_tag)) |
155 | { |
156 | address_end -= strlen(secure_tag); |
157 | secure = Protocol::Secure::Enable; |
158 | } |
159 | |
160 | const char * user_pw_end = strchr(full_string.data(), '@'); |
161 | const char * colon = strchr(full_string.data(), ':'); |
162 | if (!user_pw_end || !colon) |
163 | throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR); |
164 | |
165 | const bool has_pw = colon < user_pw_end; |
166 | const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon; |
167 | if (!host_end) |
168 | throw Exception("Incorrect address '" + full_string + "', it does not contain port" , ErrorCodes::SYNTAX_ERROR); |
169 | |
170 | const char * has_db = strchr(full_string.data(), '#'); |
171 | const char * port_end = has_db ? has_db : address_end; |
172 | |
173 | Address address; |
174 | address.secure = secure; |
175 | address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1)); |
176 | address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end)); |
177 | address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end)); |
178 | address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string(); |
179 | address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string(); |
180 | return address; |
181 | } |
182 | |
183 | |
184 | /// Implementation of Clusters class |
185 | |
186 | Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name) |
187 | { |
188 | updateClusters(config, settings, config_name); |
189 | } |
190 | |
191 | |
192 | ClusterPtr Clusters::getCluster(const std::string & cluster_name) const |
193 | { |
194 | std::lock_guard lock(mutex); |
195 | |
196 | auto it = impl.find(cluster_name); |
197 | return (it != impl.end()) ? it->second : nullptr; |
198 | } |
199 | |
200 | |
201 | void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster) |
202 | { |
203 | std::lock_guard lock(mutex); |
204 | impl[cluster_name] = cluster; |
205 | } |
206 | |
207 | |
208 | void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name) |
209 | { |
210 | Poco::Util::AbstractConfiguration::Keys config_keys; |
211 | config.keys(config_name, config_keys); |
212 | |
213 | std::lock_guard lock(mutex); |
214 | |
215 | impl.clear(); |
216 | for (const auto & key : config_keys) |
217 | { |
218 | if (key.find('.') != String::npos) |
219 | throw Exception("Cluster names with dots are not supported: '" + key + "'" , ErrorCodes::SYNTAX_ERROR); |
220 | |
221 | impl.emplace(key, std::make_shared<Cluster>(config, settings, config_name + "." + key)); |
222 | } |
223 | } |
224 | |
225 | Clusters::Impl Clusters::getContainer() const |
226 | { |
227 | std::lock_guard lock(mutex); |
228 | /// The following line copies container of shared_ptrs to return value under lock |
229 | return impl; |
230 | } |
231 | |
232 | |
233 | /// Implementation of `Cluster` class |
234 | |
235 | Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name) |
236 | { |
237 | Poco::Util::AbstractConfiguration::Keys config_keys; |
238 | config.keys(cluster_name, config_keys); |
239 | |
240 | if (config_keys.empty()) |
241 | throw Exception("No cluster elements (shard, node) specified in config at path " + cluster_name, ErrorCodes::SHARD_HAS_NO_CONNECTIONS); |
242 | |
243 | const auto & config_prefix = cluster_name + "." ; |
244 | |
245 | UInt32 current_shard_num = 1; |
246 | |
247 | for (const auto & key : config_keys) |
248 | { |
249 | if (startsWith(key, "node" )) |
250 | { |
251 | /// Shard without replicas. |
252 | |
253 | Addresses addresses; |
254 | |
255 | const auto & prefix = config_prefix + key; |
256 | const auto weight = config.getInt(prefix + ".weight" , default_weight); |
257 | |
258 | addresses.emplace_back(config, prefix); |
259 | const auto & address = addresses.back(); |
260 | |
261 | ShardInfo info; |
262 | info.shard_num = current_shard_num; |
263 | info.weight = weight; |
264 | |
265 | if (address.is_local) |
266 | info.local_addresses.push_back(address); |
267 | |
268 | ConnectionPoolPtr pool = std::make_shared<ConnectionPool>( |
269 | settings.distributed_connections_pool_size, |
270 | address.host_name, address.port, |
271 | address.default_database, address.user, address.password, |
272 | "server" , address.compression, address.secure); |
273 | |
274 | info.pool = std::make_shared<ConnectionPoolWithFailover>( |
275 | ConnectionPoolPtrs{pool}, settings.load_balancing); |
276 | info.per_replica_pools = {std::move(pool)}; |
277 | |
278 | if (weight) |
279 | slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); |
280 | |
281 | shards_info.emplace_back(std::move(info)); |
282 | addresses_with_failover.emplace_back(std::move(addresses)); |
283 | } |
284 | else if (startsWith(key, "shard" )) |
285 | { |
286 | /// Shard with replicas. |
287 | |
288 | Poco::Util::AbstractConfiguration::Keys replica_keys; |
289 | config.keys(config_prefix + key, replica_keys); |
290 | |
291 | addresses_with_failover.emplace_back(); |
292 | Addresses & replica_addresses = addresses_with_failover.back(); |
293 | UInt32 current_replica_num = 1; |
294 | |
295 | const auto & partial_prefix = config_prefix + key + "." ; |
296 | const auto weight = config.getUInt(partial_prefix + ".weight" , default_weight); |
297 | |
298 | bool internal_replication = config.getBool(partial_prefix + ".internal_replication" , false); |
299 | |
300 | /// In case of internal_replication we will be appending names to dir_name_for_internal_replication |
301 | std::string dir_name_for_internal_replication; |
302 | |
303 | auto first = true; |
304 | for (const auto & replica_key : replica_keys) |
305 | { |
306 | if (startsWith(replica_key, "weight" ) || startsWith(replica_key, "internal_replication" )) |
307 | continue; |
308 | |
309 | if (startsWith(replica_key, "replica" )) |
310 | { |
311 | replica_addresses.emplace_back(config, partial_prefix + replica_key); |
312 | ++current_replica_num; |
313 | |
314 | if (!replica_addresses.back().is_local) |
315 | { |
316 | if (internal_replication) |
317 | { |
318 | auto dir_name = replica_addresses.back().toFullString(); |
319 | if (first) |
320 | dir_name_for_internal_replication = dir_name; |
321 | else |
322 | dir_name_for_internal_replication += "," + dir_name; |
323 | } |
324 | |
325 | if (first) first = false; |
326 | } |
327 | } |
328 | else |
329 | throw Exception("Unknown element in config: " + replica_key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); |
330 | } |
331 | |
332 | Addresses shard_local_addresses; |
333 | |
334 | ConnectionPoolPtrs all_replicas_pools; |
335 | all_replicas_pools.reserve(replica_addresses.size()); |
336 | |
337 | for (const auto & replica : replica_addresses) |
338 | { |
339 | auto replica_pool = std::make_shared<ConnectionPool>( |
340 | settings.distributed_connections_pool_size, |
341 | replica.host_name, replica.port, |
342 | replica.default_database, replica.user, replica.password, |
343 | "server" , replica.compression, replica.secure); |
344 | |
345 | all_replicas_pools.emplace_back(replica_pool); |
346 | if (replica.is_local) |
347 | shard_local_addresses.push_back(replica); |
348 | } |
349 | |
350 | ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>( |
351 | all_replicas_pools, settings.load_balancing, |
352 | settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap); |
353 | |
354 | if (weight) |
355 | slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); |
356 | |
357 | shards_info.push_back({std::move(dir_name_for_internal_replication), current_shard_num, weight, |
358 | std::move(shard_local_addresses), std::move(shard_pool), std::move(all_replicas_pools), internal_replication}); |
359 | } |
360 | else |
361 | throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); |
362 | |
363 | ++current_shard_num; |
364 | } |
365 | |
366 | if (addresses_with_failover.empty()) |
367 | throw Exception("There must be either 'node' or 'shard' elements in config" , ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); |
368 | |
369 | initMisc(); |
370 | } |
371 | |
372 | |
373 | Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String>> & names, |
374 | const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote, bool secure) |
375 | { |
376 | UInt32 current_shard_num = 1; |
377 | |
378 | for (const auto & shard : names) |
379 | { |
380 | Addresses current; |
381 | for (auto & replica : shard) |
382 | current.emplace_back(replica, username, password, clickhouse_port, secure); |
383 | |
384 | addresses_with_failover.emplace_back(current); |
385 | |
386 | Addresses shard_local_addresses; |
387 | ConnectionPoolPtrs all_replicas; |
388 | all_replicas.reserve(current.size()); |
389 | |
390 | for (const auto & replica : current) |
391 | { |
392 | auto replica_pool = std::make_shared<ConnectionPool>( |
393 | settings.distributed_connections_pool_size, |
394 | replica.host_name, replica.port, |
395 | replica.default_database, replica.user, replica.password, |
396 | "server" , replica.compression, replica.secure); |
397 | all_replicas.emplace_back(replica_pool); |
398 | if (replica.is_local && !treat_local_as_remote) |
399 | shard_local_addresses.push_back(replica); |
400 | } |
401 | |
402 | ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>( |
403 | all_replicas, settings.load_balancing, |
404 | settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap); |
405 | |
406 | slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size()); |
407 | shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool), |
408 | std::move(all_replicas), false}); |
409 | ++current_shard_num; |
410 | } |
411 | |
412 | initMisc(); |
413 | } |
414 | |
415 | |
416 | Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan & limit) |
417 | { |
418 | if (limit.totalMicroseconds() == 0) |
419 | return v; |
420 | else |
421 | return (v > limit) ? limit : v; |
422 | } |
423 | |
424 | |
425 | void Cluster::initMisc() |
426 | { |
427 | for (const auto & shard_info : shards_info) |
428 | { |
429 | if (!shard_info.isLocal() && !shard_info.hasRemoteConnections()) |
430 | throw Exception("Found shard without any specified connection" , |
431 | ErrorCodes::SHARD_HAS_NO_CONNECTIONS); |
432 | } |
433 | |
434 | for (const auto & shard_info : shards_info) |
435 | { |
436 | if (shard_info.isLocal()) |
437 | ++local_shard_count; |
438 | else |
439 | ++remote_shard_count; |
440 | } |
441 | |
442 | for (auto & shard_info : shards_info) |
443 | { |
444 | if (!shard_info.isLocal()) |
445 | { |
446 | any_remote_shard_info = &shard_info; |
447 | break; |
448 | } |
449 | } |
450 | } |
451 | |
452 | |
453 | std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const |
454 | { |
455 | return std::unique_ptr<Cluster>{ new Cluster(*this, {index}) }; |
456 | } |
457 | |
458 | std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector<size_t> & indices) const |
459 | { |
460 | return std::unique_ptr<Cluster>{ new Cluster(*this, indices) }; |
461 | } |
462 | |
463 | Cluster::Cluster(const Cluster & from, const std::vector<size_t> & indices) |
464 | : shards_info{} |
465 | { |
466 | for (size_t index : indices) |
467 | { |
468 | shards_info.emplace_back(from.shards_info.at(index)); |
469 | |
470 | if (!from.addresses_with_failover.empty()) |
471 | addresses_with_failover.emplace_back(from.addresses_with_failover.at(index)); |
472 | } |
473 | |
474 | initMisc(); |
475 | } |
476 | |
477 | } |
478 | |