1 | #include <Client/ConnectionPoolWithFailover.h> |
2 | |
3 | #include <Poco/Net/NetException.h> |
4 | #include <Poco/Net/DNS.h> |
5 | |
6 | #include <Common/BitHelpers.h> |
7 | #include <Common/getFQDNOrHostName.h> |
8 | #include <Common/isLocalAddress.h> |
9 | #include <Common/ProfileEvents.h> |
10 | #include <Core/Settings.h> |
11 | |
12 | #include <IO/ConnectionTimeouts.h> |
13 | |
14 | |
15 | namespace ProfileEvents |
16 | { |
17 | extern const Event DistributedConnectionMissingTable; |
18 | extern const Event DistributedConnectionStaleReplica; |
19 | } |
20 | |
21 | namespace DB |
22 | { |
23 | |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int NETWORK_ERROR; |
27 | extern const int SOCKET_TIMEOUT; |
28 | extern const int LOGICAL_ERROR; |
29 | } |
30 | |
31 | |
32 | ConnectionPoolWithFailover::ConnectionPoolWithFailover( |
33 | ConnectionPoolPtrs nested_pools_, |
34 | LoadBalancing load_balancing, |
35 | time_t decrease_error_period_, |
36 | size_t max_error_cap_) |
37 | : Base(std::move(nested_pools_), decrease_error_period_, max_error_cap_, &Logger::get("ConnectionPoolWithFailover" )) |
38 | , default_load_balancing(load_balancing) |
39 | { |
40 | const std::string & local_hostname = getFQDNOrHostName(); |
41 | |
42 | hostname_differences.resize(nested_pools.size()); |
43 | for (size_t i = 0; i < nested_pools.size(); ++i) |
44 | { |
45 | ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]); |
46 | hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost()); |
47 | } |
48 | } |
49 | |
50 | IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts, |
51 | const Settings * settings, |
52 | bool /*force_connected*/) |
53 | { |
54 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
55 | { |
56 | return tryGetEntry(pool, timeouts, fail_message, settings); |
57 | }; |
58 | |
59 | GetPriorityFunc get_priority; |
60 | switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing) |
61 | { |
62 | case LoadBalancing::NEAREST_HOSTNAME: |
63 | get_priority = [&](size_t i) { return hostname_differences[i]; }; |
64 | break; |
65 | case LoadBalancing::IN_ORDER: |
66 | get_priority = [](size_t i) { return i; }; |
67 | break; |
68 | case LoadBalancing::RANDOM: |
69 | break; |
70 | case LoadBalancing::FIRST_OR_RANDOM: |
71 | get_priority = [](size_t i) -> size_t { return i >= 1; }; |
72 | break; |
73 | } |
74 | |
75 | return Base::get(try_get_entry, get_priority); |
76 | } |
77 | |
78 | ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const |
79 | { |
80 | const Base::PoolStates states = getPoolStates(); |
81 | const Base::NestedPools pools = nested_pools; |
82 | assert(states.size() == pools.size()); |
83 | |
84 | ConnectionPoolWithFailover::Status result; |
85 | result.reserve(states.size()); |
86 | const time_t since_last_error_decrease = time(nullptr) - last_error_decrease_time; |
87 | |
88 | for (size_t i = 0; i < states.size(); ++i) |
89 | { |
90 | const auto rounds_to_zero_errors = states[i].error_count ? bitScanReverse(states[i].error_count) + 1 : 0; |
91 | const auto seconds_to_zero_errors = std::max(static_cast<time_t>(0), rounds_to_zero_errors * decrease_error_period - since_last_error_decrease); |
92 | |
93 | result.emplace_back(NestedPoolStatus{ |
94 | pools[i].get(), |
95 | states[i].error_count, |
96 | std::chrono::seconds{seconds_to_zero_errors} |
97 | }); |
98 | } |
99 | |
100 | return result; |
101 | } |
102 | |
103 | std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts, |
104 | const Settings * settings, |
105 | PoolMode pool_mode) |
106 | { |
107 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
108 | { |
109 | return tryGetEntry(pool, timeouts, fail_message, settings); |
110 | }; |
111 | |
112 | std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry); |
113 | |
114 | std::vector<Entry> entries; |
115 | entries.reserve(results.size()); |
116 | for (auto & result : results) |
117 | entries.emplace_back(std::move(result.entry)); |
118 | return entries; |
119 | } |
120 | |
121 | std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction( |
122 | const ConnectionTimeouts & timeouts, |
123 | const Settings * settings, |
124 | PoolMode pool_mode) |
125 | { |
126 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
127 | { |
128 | return tryGetEntry(pool, timeouts, fail_message, settings); |
129 | }; |
130 | |
131 | return getManyImpl(settings, pool_mode, try_get_entry); |
132 | } |
133 | |
134 | std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked( |
135 | const ConnectionTimeouts & timeouts, |
136 | const Settings * settings, PoolMode pool_mode, |
137 | const QualifiedTableName & table_to_check) |
138 | { |
139 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
140 | { |
141 | return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check); |
142 | }; |
143 | |
144 | return getManyImpl(settings, pool_mode, try_get_entry); |
145 | } |
146 | |
147 | std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl( |
148 | const Settings * settings, |
149 | PoolMode pool_mode, |
150 | const TryGetEntryFunc & try_get_entry) |
151 | { |
152 | size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1; |
153 | size_t max_tries = (settings ? |
154 | size_t{settings->connections_with_failover_max_tries} : |
155 | size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES}); |
156 | size_t max_entries; |
157 | if (pool_mode == PoolMode::GET_ALL) |
158 | { |
159 | min_entries = nested_pools.size(); |
160 | max_entries = nested_pools.size(); |
161 | } |
162 | else if (pool_mode == PoolMode::GET_ONE) |
163 | max_entries = 1; |
164 | else if (pool_mode == PoolMode::GET_MANY) |
165 | max_entries = settings ? size_t(settings->max_parallel_replicas) : 1; |
166 | else |
167 | throw DB::Exception("Unknown pool allocation mode" , DB::ErrorCodes::LOGICAL_ERROR); |
168 | |
169 | GetPriorityFunc get_priority; |
170 | switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing) |
171 | { |
172 | case LoadBalancing::NEAREST_HOSTNAME: |
173 | get_priority = [&](size_t i) { return hostname_differences[i]; }; |
174 | break; |
175 | case LoadBalancing::IN_ORDER: |
176 | get_priority = [](size_t i) { return i; }; |
177 | break; |
178 | case LoadBalancing::RANDOM: |
179 | break; |
180 | case LoadBalancing::FIRST_OR_RANDOM: |
181 | get_priority = [](size_t i) -> size_t { return i >= 1; }; |
182 | break; |
183 | } |
184 | |
185 | bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true; |
186 | |
187 | return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas); |
188 | } |
189 | |
190 | ConnectionPoolWithFailover::TryResult |
191 | ConnectionPoolWithFailover::tryGetEntry( |
192 | IConnectionPool & pool, |
193 | const ConnectionTimeouts & timeouts, |
194 | std::string & fail_message, |
195 | const Settings * settings, |
196 | const QualifiedTableName * table_to_check) |
197 | { |
198 | TryResult result; |
199 | try |
200 | { |
201 | result.entry = pool.get(timeouts, settings, /* force_connected = */ false); |
202 | |
203 | UInt64 server_revision = 0; |
204 | if (table_to_check) |
205 | server_revision = result.entry->getServerRevision(timeouts); |
206 | |
207 | if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) |
208 | { |
209 | result.entry->forceConnected(timeouts); |
210 | result.is_usable = true; |
211 | result.is_up_to_date = true; |
212 | return result; |
213 | } |
214 | |
215 | /// Only status of the remote table corresponding to the Distributed table is taken into account. |
216 | /// TODO: request status for joined tables also. |
217 | TablesStatusRequest status_request; |
218 | status_request.tables.emplace(*table_to_check); |
219 | |
220 | TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request); |
221 | auto table_status_it = status_response.table_states_by_id.find(*table_to_check); |
222 | if (table_status_it == status_response.table_states_by_id.end()) |
223 | { |
224 | fail_message = "There is no table " + table_to_check->database + "." + table_to_check->table |
225 | + " on server: " + result.entry->getDescription(); |
226 | LOG_WARNING(log, fail_message); |
227 | ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable); |
228 | |
229 | return result; |
230 | } |
231 | |
232 | result.is_usable = true; |
233 | |
234 | UInt64 max_allowed_delay = settings ? UInt64(settings->max_replica_delay_for_distributed_queries) : 0; |
235 | if (!max_allowed_delay) |
236 | { |
237 | result.is_up_to_date = true; |
238 | return result; |
239 | } |
240 | |
241 | UInt32 delay = table_status_it->second.absolute_delay; |
242 | |
243 | if (delay < max_allowed_delay) |
244 | result.is_up_to_date = true; |
245 | else |
246 | { |
247 | result.is_up_to_date = false; |
248 | result.staleness = delay; |
249 | |
250 | LOG_TRACE( |
251 | log, "Server " << result.entry->getDescription() << " has unacceptable replica delay " |
252 | << "for table " << table_to_check->database << "." << table_to_check->table |
253 | << ": " << delay); |
254 | ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica); |
255 | } |
256 | } |
257 | catch (const Exception & e) |
258 | { |
259 | if (e.code() != ErrorCodes::NETWORK_ERROR && e.code() != ErrorCodes::SOCKET_TIMEOUT |
260 | && e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) |
261 | throw; |
262 | |
263 | fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false); |
264 | |
265 | if (!result.entry.isNull()) |
266 | { |
267 | result.entry->disconnect(); |
268 | result.reset(); |
269 | } |
270 | } |
271 | return result; |
272 | } |
273 | |
274 | } |
275 | |