1#include <Client/ConnectionPoolWithFailover.h>
2
3#include <Poco/Net/NetException.h>
4#include <Poco/Net/DNS.h>
5
6#include <Common/BitHelpers.h>
7#include <Common/getFQDNOrHostName.h>
8#include <Common/isLocalAddress.h>
9#include <Common/ProfileEvents.h>
10#include <Core/Settings.h>
11
12#include <IO/ConnectionTimeouts.h>
13
14
15namespace ProfileEvents
16{
17 extern const Event DistributedConnectionMissingTable;
18 extern const Event DistributedConnectionStaleReplica;
19}
20
21namespace DB
22{
23
24namespace ErrorCodes
25{
26 extern const int NETWORK_ERROR;
27 extern const int SOCKET_TIMEOUT;
28 extern const int LOGICAL_ERROR;
29}
30
31
32ConnectionPoolWithFailover::ConnectionPoolWithFailover(
33 ConnectionPoolPtrs nested_pools_,
34 LoadBalancing load_balancing,
35 time_t decrease_error_period_,
36 size_t max_error_cap_)
37 : Base(std::move(nested_pools_), decrease_error_period_, max_error_cap_, &Logger::get("ConnectionPoolWithFailover"))
38 , default_load_balancing(load_balancing)
39{
40 const std::string & local_hostname = getFQDNOrHostName();
41
42 hostname_differences.resize(nested_pools.size());
43 for (size_t i = 0; i < nested_pools.size(); ++i)
44 {
45 ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
46 hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost());
47 }
48}
49
50IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts,
51 const Settings * settings,
52 bool /*force_connected*/)
53{
54 TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
55 {
56 return tryGetEntry(pool, timeouts, fail_message, settings);
57 };
58
59 GetPriorityFunc get_priority;
60 switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing)
61 {
62 case LoadBalancing::NEAREST_HOSTNAME:
63 get_priority = [&](size_t i) { return hostname_differences[i]; };
64 break;
65 case LoadBalancing::IN_ORDER:
66 get_priority = [](size_t i) { return i; };
67 break;
68 case LoadBalancing::RANDOM:
69 break;
70 case LoadBalancing::FIRST_OR_RANDOM:
71 get_priority = [](size_t i) -> size_t { return i >= 1; };
72 break;
73 }
74
75 return Base::get(try_get_entry, get_priority);
76}
77
78ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
79{
80 const Base::PoolStates states = getPoolStates();
81 const Base::NestedPools pools = nested_pools;
82 assert(states.size() == pools.size());
83
84 ConnectionPoolWithFailover::Status result;
85 result.reserve(states.size());
86 const time_t since_last_error_decrease = time(nullptr) - last_error_decrease_time;
87
88 for (size_t i = 0; i < states.size(); ++i)
89 {
90 const auto rounds_to_zero_errors = states[i].error_count ? bitScanReverse(states[i].error_count) + 1 : 0;
91 const auto seconds_to_zero_errors = std::max(static_cast<time_t>(0), rounds_to_zero_errors * decrease_error_period - since_last_error_decrease);
92
93 result.emplace_back(NestedPoolStatus{
94 pools[i].get(),
95 states[i].error_count,
96 std::chrono::seconds{seconds_to_zero_errors}
97 });
98 }
99
100 return result;
101}
102
103std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
104 const Settings * settings,
105 PoolMode pool_mode)
106{
107 TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
108 {
109 return tryGetEntry(pool, timeouts, fail_message, settings);
110 };
111
112 std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
113
114 std::vector<Entry> entries;
115 entries.reserve(results.size());
116 for (auto & result : results)
117 entries.emplace_back(std::move(result.entry));
118 return entries;
119}
120
121std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(
122 const ConnectionTimeouts & timeouts,
123 const Settings * settings,
124 PoolMode pool_mode)
125{
126 TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
127 {
128 return tryGetEntry(pool, timeouts, fail_message, settings);
129 };
130
131 return getManyImpl(settings, pool_mode, try_get_entry);
132}
133
134std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
135 const ConnectionTimeouts & timeouts,
136 const Settings * settings, PoolMode pool_mode,
137 const QualifiedTableName & table_to_check)
138{
139 TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
140 {
141 return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check);
142 };
143
144 return getManyImpl(settings, pool_mode, try_get_entry);
145}
146
147std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
148 const Settings * settings,
149 PoolMode pool_mode,
150 const TryGetEntryFunc & try_get_entry)
151{
152 size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
153 size_t max_tries = (settings ?
154 size_t{settings->connections_with_failover_max_tries} :
155 size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
156 size_t max_entries;
157 if (pool_mode == PoolMode::GET_ALL)
158 {
159 min_entries = nested_pools.size();
160 max_entries = nested_pools.size();
161 }
162 else if (pool_mode == PoolMode::GET_ONE)
163 max_entries = 1;
164 else if (pool_mode == PoolMode::GET_MANY)
165 max_entries = settings ? size_t(settings->max_parallel_replicas) : 1;
166 else
167 throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR);
168
169 GetPriorityFunc get_priority;
170 switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing)
171 {
172 case LoadBalancing::NEAREST_HOSTNAME:
173 get_priority = [&](size_t i) { return hostname_differences[i]; };
174 break;
175 case LoadBalancing::IN_ORDER:
176 get_priority = [](size_t i) { return i; };
177 break;
178 case LoadBalancing::RANDOM:
179 break;
180 case LoadBalancing::FIRST_OR_RANDOM:
181 get_priority = [](size_t i) -> size_t { return i >= 1; };
182 break;
183 }
184
185 bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;
186
187 return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas);
188}
189
190ConnectionPoolWithFailover::TryResult
191ConnectionPoolWithFailover::tryGetEntry(
192 IConnectionPool & pool,
193 const ConnectionTimeouts & timeouts,
194 std::string & fail_message,
195 const Settings * settings,
196 const QualifiedTableName * table_to_check)
197{
198 TryResult result;
199 try
200 {
201 result.entry = pool.get(timeouts, settings, /* force_connected = */ false);
202
203 UInt64 server_revision = 0;
204 if (table_to_check)
205 server_revision = result.entry->getServerRevision(timeouts);
206
207 if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
208 {
209 result.entry->forceConnected(timeouts);
210 result.is_usable = true;
211 result.is_up_to_date = true;
212 return result;
213 }
214
215 /// Only status of the remote table corresponding to the Distributed table is taken into account.
216 /// TODO: request status for joined tables also.
217 TablesStatusRequest status_request;
218 status_request.tables.emplace(*table_to_check);
219
220 TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request);
221 auto table_status_it = status_response.table_states_by_id.find(*table_to_check);
222 if (table_status_it == status_response.table_states_by_id.end())
223 {
224 fail_message = "There is no table " + table_to_check->database + "." + table_to_check->table
225 + " on server: " + result.entry->getDescription();
226 LOG_WARNING(log, fail_message);
227 ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
228
229 return result;
230 }
231
232 result.is_usable = true;
233
234 UInt64 max_allowed_delay = settings ? UInt64(settings->max_replica_delay_for_distributed_queries) : 0;
235 if (!max_allowed_delay)
236 {
237 result.is_up_to_date = true;
238 return result;
239 }
240
241 UInt32 delay = table_status_it->second.absolute_delay;
242
243 if (delay < max_allowed_delay)
244 result.is_up_to_date = true;
245 else
246 {
247 result.is_up_to_date = false;
248 result.staleness = delay;
249
250 LOG_TRACE(
251 log, "Server " << result.entry->getDescription() << " has unacceptable replica delay "
252 << "for table " << table_to_check->database << "." << table_to_check->table
253 << ": " << delay);
254 ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
255 }
256 }
257 catch (const Exception & e)
258 {
259 if (e.code() != ErrorCodes::NETWORK_ERROR && e.code() != ErrorCodes::SOCKET_TIMEOUT
260 && e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
261 throw;
262
263 fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false);
264
265 if (!result.entry.isNull())
266 {
267 result.entry->disconnect();
268 result.reset();
269 }
270 }
271 return result;
272}
273
274}
275