| 1 | #include <Client/ConnectionPoolWithFailover.h> |
| 2 | |
| 3 | #include <Poco/Net/NetException.h> |
| 4 | #include <Poco/Net/DNS.h> |
| 5 | |
| 6 | #include <Common/BitHelpers.h> |
| 7 | #include <Common/getFQDNOrHostName.h> |
| 8 | #include <Common/isLocalAddress.h> |
| 9 | #include <Common/ProfileEvents.h> |
| 10 | #include <Core/Settings.h> |
| 11 | |
| 12 | #include <IO/ConnectionTimeouts.h> |
| 13 | |
| 14 | |
| 15 | namespace ProfileEvents |
| 16 | { |
| 17 | extern const Event DistributedConnectionMissingTable; |
| 18 | extern const Event DistributedConnectionStaleReplica; |
| 19 | } |
| 20 | |
| 21 | namespace DB |
| 22 | { |
| 23 | |
| 24 | namespace ErrorCodes |
| 25 | { |
| 26 | extern const int NETWORK_ERROR; |
| 27 | extern const int SOCKET_TIMEOUT; |
| 28 | extern const int LOGICAL_ERROR; |
| 29 | } |
| 30 | |
| 31 | |
| 32 | ConnectionPoolWithFailover::ConnectionPoolWithFailover( |
| 33 | ConnectionPoolPtrs nested_pools_, |
| 34 | LoadBalancing load_balancing, |
| 35 | time_t decrease_error_period_, |
| 36 | size_t max_error_cap_) |
| 37 | : Base(std::move(nested_pools_), decrease_error_period_, max_error_cap_, &Logger::get("ConnectionPoolWithFailover" )) |
| 38 | , default_load_balancing(load_balancing) |
| 39 | { |
| 40 | const std::string & local_hostname = getFQDNOrHostName(); |
| 41 | |
| 42 | hostname_differences.resize(nested_pools.size()); |
| 43 | for (size_t i = 0; i < nested_pools.size(); ++i) |
| 44 | { |
| 45 | ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]); |
| 46 | hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost()); |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts, |
| 51 | const Settings * settings, |
| 52 | bool /*force_connected*/) |
| 53 | { |
| 54 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
| 55 | { |
| 56 | return tryGetEntry(pool, timeouts, fail_message, settings); |
| 57 | }; |
| 58 | |
| 59 | GetPriorityFunc get_priority; |
| 60 | switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing) |
| 61 | { |
| 62 | case LoadBalancing::NEAREST_HOSTNAME: |
| 63 | get_priority = [&](size_t i) { return hostname_differences[i]; }; |
| 64 | break; |
| 65 | case LoadBalancing::IN_ORDER: |
| 66 | get_priority = [](size_t i) { return i; }; |
| 67 | break; |
| 68 | case LoadBalancing::RANDOM: |
| 69 | break; |
| 70 | case LoadBalancing::FIRST_OR_RANDOM: |
| 71 | get_priority = [](size_t i) -> size_t { return i >= 1; }; |
| 72 | break; |
| 73 | } |
| 74 | |
| 75 | return Base::get(try_get_entry, get_priority); |
| 76 | } |
| 77 | |
| 78 | ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const |
| 79 | { |
| 80 | const Base::PoolStates states = getPoolStates(); |
| 81 | const Base::NestedPools pools = nested_pools; |
| 82 | assert(states.size() == pools.size()); |
| 83 | |
| 84 | ConnectionPoolWithFailover::Status result; |
| 85 | result.reserve(states.size()); |
| 86 | const time_t since_last_error_decrease = time(nullptr) - last_error_decrease_time; |
| 87 | |
| 88 | for (size_t i = 0; i < states.size(); ++i) |
| 89 | { |
| 90 | const auto rounds_to_zero_errors = states[i].error_count ? bitScanReverse(states[i].error_count) + 1 : 0; |
| 91 | const auto seconds_to_zero_errors = std::max(static_cast<time_t>(0), rounds_to_zero_errors * decrease_error_period - since_last_error_decrease); |
| 92 | |
| 93 | result.emplace_back(NestedPoolStatus{ |
| 94 | pools[i].get(), |
| 95 | states[i].error_count, |
| 96 | std::chrono::seconds{seconds_to_zero_errors} |
| 97 | }); |
| 98 | } |
| 99 | |
| 100 | return result; |
| 101 | } |
| 102 | |
| 103 | std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts, |
| 104 | const Settings * settings, |
| 105 | PoolMode pool_mode) |
| 106 | { |
| 107 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
| 108 | { |
| 109 | return tryGetEntry(pool, timeouts, fail_message, settings); |
| 110 | }; |
| 111 | |
| 112 | std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry); |
| 113 | |
| 114 | std::vector<Entry> entries; |
| 115 | entries.reserve(results.size()); |
| 116 | for (auto & result : results) |
| 117 | entries.emplace_back(std::move(result.entry)); |
| 118 | return entries; |
| 119 | } |
| 120 | |
| 121 | std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction( |
| 122 | const ConnectionTimeouts & timeouts, |
| 123 | const Settings * settings, |
| 124 | PoolMode pool_mode) |
| 125 | { |
| 126 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
| 127 | { |
| 128 | return tryGetEntry(pool, timeouts, fail_message, settings); |
| 129 | }; |
| 130 | |
| 131 | return getManyImpl(settings, pool_mode, try_get_entry); |
| 132 | } |
| 133 | |
| 134 | std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked( |
| 135 | const ConnectionTimeouts & timeouts, |
| 136 | const Settings * settings, PoolMode pool_mode, |
| 137 | const QualifiedTableName & table_to_check) |
| 138 | { |
| 139 | TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) |
| 140 | { |
| 141 | return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check); |
| 142 | }; |
| 143 | |
| 144 | return getManyImpl(settings, pool_mode, try_get_entry); |
| 145 | } |
| 146 | |
| 147 | std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl( |
| 148 | const Settings * settings, |
| 149 | PoolMode pool_mode, |
| 150 | const TryGetEntryFunc & try_get_entry) |
| 151 | { |
| 152 | size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1; |
| 153 | size_t max_tries = (settings ? |
| 154 | size_t{settings->connections_with_failover_max_tries} : |
| 155 | size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES}); |
| 156 | size_t max_entries; |
| 157 | if (pool_mode == PoolMode::GET_ALL) |
| 158 | { |
| 159 | min_entries = nested_pools.size(); |
| 160 | max_entries = nested_pools.size(); |
| 161 | } |
| 162 | else if (pool_mode == PoolMode::GET_ONE) |
| 163 | max_entries = 1; |
| 164 | else if (pool_mode == PoolMode::GET_MANY) |
| 165 | max_entries = settings ? size_t(settings->max_parallel_replicas) : 1; |
| 166 | else |
| 167 | throw DB::Exception("Unknown pool allocation mode" , DB::ErrorCodes::LOGICAL_ERROR); |
| 168 | |
| 169 | GetPriorityFunc get_priority; |
| 170 | switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing) |
| 171 | { |
| 172 | case LoadBalancing::NEAREST_HOSTNAME: |
| 173 | get_priority = [&](size_t i) { return hostname_differences[i]; }; |
| 174 | break; |
| 175 | case LoadBalancing::IN_ORDER: |
| 176 | get_priority = [](size_t i) { return i; }; |
| 177 | break; |
| 178 | case LoadBalancing::RANDOM: |
| 179 | break; |
| 180 | case LoadBalancing::FIRST_OR_RANDOM: |
| 181 | get_priority = [](size_t i) -> size_t { return i >= 1; }; |
| 182 | break; |
| 183 | } |
| 184 | |
| 185 | bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true; |
| 186 | |
| 187 | return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas); |
| 188 | } |
| 189 | |
| 190 | ConnectionPoolWithFailover::TryResult |
| 191 | ConnectionPoolWithFailover::tryGetEntry( |
| 192 | IConnectionPool & pool, |
| 193 | const ConnectionTimeouts & timeouts, |
| 194 | std::string & fail_message, |
| 195 | const Settings * settings, |
| 196 | const QualifiedTableName * table_to_check) |
| 197 | { |
| 198 | TryResult result; |
| 199 | try |
| 200 | { |
| 201 | result.entry = pool.get(timeouts, settings, /* force_connected = */ false); |
| 202 | |
| 203 | UInt64 server_revision = 0; |
| 204 | if (table_to_check) |
| 205 | server_revision = result.entry->getServerRevision(timeouts); |
| 206 | |
| 207 | if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) |
| 208 | { |
| 209 | result.entry->forceConnected(timeouts); |
| 210 | result.is_usable = true; |
| 211 | result.is_up_to_date = true; |
| 212 | return result; |
| 213 | } |
| 214 | |
| 215 | /// Only status of the remote table corresponding to the Distributed table is taken into account. |
| 216 | /// TODO: request status for joined tables also. |
| 217 | TablesStatusRequest status_request; |
| 218 | status_request.tables.emplace(*table_to_check); |
| 219 | |
| 220 | TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request); |
| 221 | auto table_status_it = status_response.table_states_by_id.find(*table_to_check); |
| 222 | if (table_status_it == status_response.table_states_by_id.end()) |
| 223 | { |
| 224 | fail_message = "There is no table " + table_to_check->database + "." + table_to_check->table |
| 225 | + " on server: " + result.entry->getDescription(); |
| 226 | LOG_WARNING(log, fail_message); |
| 227 | ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable); |
| 228 | |
| 229 | return result; |
| 230 | } |
| 231 | |
| 232 | result.is_usable = true; |
| 233 | |
| 234 | UInt64 max_allowed_delay = settings ? UInt64(settings->max_replica_delay_for_distributed_queries) : 0; |
| 235 | if (!max_allowed_delay) |
| 236 | { |
| 237 | result.is_up_to_date = true; |
| 238 | return result; |
| 239 | } |
| 240 | |
| 241 | UInt32 delay = table_status_it->second.absolute_delay; |
| 242 | |
| 243 | if (delay < max_allowed_delay) |
| 244 | result.is_up_to_date = true; |
| 245 | else |
| 246 | { |
| 247 | result.is_up_to_date = false; |
| 248 | result.staleness = delay; |
| 249 | |
| 250 | LOG_TRACE( |
| 251 | log, "Server " << result.entry->getDescription() << " has unacceptable replica delay " |
| 252 | << "for table " << table_to_check->database << "." << table_to_check->table |
| 253 | << ": " << delay); |
| 254 | ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica); |
| 255 | } |
| 256 | } |
| 257 | catch (const Exception & e) |
| 258 | { |
| 259 | if (e.code() != ErrorCodes::NETWORK_ERROR && e.code() != ErrorCodes::SOCKET_TIMEOUT |
| 260 | && e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) |
| 261 | throw; |
| 262 | |
| 263 | fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false); |
| 264 | |
| 265 | if (!result.entry.isNull()) |
| 266 | { |
| 267 | result.entry->disconnect(); |
| 268 | result.reset(); |
| 269 | } |
| 270 | } |
| 271 | return result; |
| 272 | } |
| 273 | |
| 274 | } |
| 275 | |