| 1 | #pragma once |
| 2 | |
| 3 | #include <time.h> |
| 4 | #include <cstdlib> |
| 5 | #include <climits> |
| 6 | #include <random> |
| 7 | #include <functional> |
| 8 | #include <common/Types.h> |
| 9 | #include <ext/scope_guard.h> |
| 10 | #include <Core/Types.h> |
| 11 | #include <Common/PoolBase.h> |
| 12 | #include <Common/ProfileEvents.h> |
| 13 | #include <Common/NetException.h> |
| 14 | #include <Common/Exception.h> |
| 15 | #include <Common/randomSeed.h> |
| 16 | |
| 17 | |
| 18 | namespace DB |
| 19 | { |
| 20 | namespace ErrorCodes |
| 21 | { |
| 22 | extern const int ALL_CONNECTION_TRIES_FAILED; |
| 23 | extern const int ALL_REPLICAS_ARE_STALE; |
| 24 | extern const int LOGICAL_ERROR; |
| 25 | } |
| 26 | } |
| 27 | |
| 28 | namespace ProfileEvents |
| 29 | { |
| 30 | extern const Event DistributedConnectionFailTry; |
| 31 | extern const Event DistributedConnectionFailAtAll; |
| 32 | } |
| 33 | |
| 34 | /// This class provides a pool with fault tolerance. It is used for pooling of connections to replicated DB. |
| 35 | /// Initialized by several PoolBase objects. |
| 36 | /// When a connection is requested, tries to create or choose an alive connection from one of the nested pools. |
| 37 | /// Pools are tried in the order consistent with lexicographical order of (error count, priority, random number) tuples. |
| 38 | /// Number of tries for a single pool is limited by max_tries parameter. |
| 39 | /// The client can set nested pool priority by passing a GetPriority functor. |
| 40 | /// |
| 41 | /// NOTE: if one of the nested pools blocks because it is empty, this pool will also block. |
| 42 | /// |
| 43 | /// The client must provide a TryGetEntryFunc functor, which should perform a single try to get a connection from a nested pool. |
| 44 | /// This functor can also check if the connection satisfies some eligibility criterion (e.g. check if |
| 45 | /// the replica is up-to-date). |
| 46 | |
| 47 | template <typename TNestedPool> |
| 48 | class PoolWithFailoverBase : private boost::noncopyable |
| 49 | { |
| 50 | public: |
| 51 | using NestedPool = TNestedPool; |
| 52 | using NestedPoolPtr = std::shared_ptr<NestedPool>; |
| 53 | using Entry = typename NestedPool::Entry; |
| 54 | using NestedPools = std::vector<NestedPoolPtr>; |
| 55 | |
| 56 | PoolWithFailoverBase( |
| 57 | NestedPools nested_pools_, |
| 58 | time_t decrease_error_period_, |
| 59 | size_t max_error_cap_, |
| 60 | Logger * log_) |
| 61 | : nested_pools(std::move(nested_pools_)) |
| 62 | , decrease_error_period(decrease_error_period_) |
| 63 | , max_error_cap(max_error_cap_) |
| 64 | , shared_pool_states(nested_pools.size()) |
| 65 | , log(log_) |
| 66 | { |
| 67 | } |
| 68 | |
| 69 | struct TryResult |
| 70 | { |
| 71 | TryResult() = default; |
| 72 | |
| 73 | explicit TryResult(Entry entry_) |
| 74 | : entry(std::move(entry_)) |
| 75 | , is_usable(true) |
| 76 | , is_up_to_date(true) |
| 77 | { |
| 78 | } |
| 79 | |
| 80 | void reset() |
| 81 | { |
| 82 | entry = Entry(); |
| 83 | is_usable = false; |
| 84 | is_up_to_date = false; |
| 85 | staleness = 0.0; |
| 86 | } |
| 87 | |
| 88 | Entry entry; |
| 89 | bool is_usable = false; /// If false, the entry is unusable for current request |
| 90 | /// (but may be usable for other requests, so error counts are not incremented) |
| 91 | bool is_up_to_date = false; /// If true, the entry is a connection to up-to-date replica. |
| 92 | double staleness = 0.0; /// Helps choosing the "least stale" option when all replicas are stale. |
| 93 | }; |
| 94 | |
| 95 | /// This functor must be provided by a client. It must perform a single try that takes a connection |
| 96 | /// from the provided pool and checks that it is good. |
| 97 | using TryGetEntryFunc = std::function<TryResult(NestedPool & pool, std::string & fail_message)>; |
| 98 | |
| 99 | /// The client can provide this functor to affect load balancing - the index of a pool is passed to |
| 100 | /// this functor. The pools with lower result value will be tried first. |
| 101 | using GetPriorityFunc = std::function<size_t(size_t index)>; |
| 102 | |
| 103 | /// Returns a single connection. |
| 104 | Entry get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority = GetPriorityFunc()); |
| 105 | |
| 106 | |
| 107 | /// Returns at least min_entries and at most max_entries connections (at most one connection per nested pool). |
| 108 | /// The method will throw if it is unable to get min_entries alive connections or |
| 109 | /// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas. |
| 110 | std::vector<TryResult> getMany( |
| 111 | size_t min_entries, size_t max_entries, size_t max_tries, |
| 112 | const TryGetEntryFunc & try_get_entry, |
| 113 | const GetPriorityFunc & get_priority = GetPriorityFunc(), |
| 114 | bool fallback_to_stale_replicas = true); |
| 115 | |
| 116 | void reportError(const Entry & entry); |
| 117 | |
| 118 | protected: |
| 119 | struct PoolState; |
| 120 | |
| 121 | using PoolStates = std::vector<PoolState>; |
| 122 | |
| 123 | /// This function returns a copy of pool states to avoid race conditions when modifying shared pool states. |
| 124 | PoolStates updatePoolStates(); |
| 125 | PoolStates getPoolStates() const; |
| 126 | |
| 127 | NestedPools nested_pools; |
| 128 | |
| 129 | const time_t decrease_error_period; |
| 130 | const size_t max_error_cap; |
| 131 | |
| 132 | mutable std::mutex pool_states_mutex; |
| 133 | PoolStates shared_pool_states; |
| 134 | /// The time when error counts were last decreased. |
| 135 | time_t last_error_decrease_time = 0; |
| 136 | |
| 137 | Logger * log; |
| 138 | }; |
| 139 | |
| 140 | template <typename TNestedPool> |
| 141 | typename TNestedPool::Entry |
| 142 | PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority) |
| 143 | { |
| 144 | std::vector<TryResult> results = getMany(1, 1, 1, try_get_entry, get_priority); |
| 145 | if (results.empty() || results[0].entry.isNull()) |
| 146 | throw DB::Exception( |
| 147 | "PoolWithFailoverBase::getMany() returned less than min_entries entries." , |
| 148 | DB::ErrorCodes::LOGICAL_ERROR); |
| 149 | return results[0].entry; |
| 150 | } |
| 151 | |
| 152 | template <typename TNestedPool> |
| 153 | std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult> |
| 154 | PoolWithFailoverBase<TNestedPool>::getMany( |
| 155 | size_t min_entries, size_t max_entries, size_t max_tries, |
| 156 | const TryGetEntryFunc & try_get_entry, |
| 157 | const GetPriorityFunc & get_priority, |
| 158 | bool fallback_to_stale_replicas) |
| 159 | { |
| 160 | /// Update random numbers and error counts. |
| 161 | PoolStates pool_states = updatePoolStates(); |
| 162 | if (get_priority) |
| 163 | { |
| 164 | for (size_t i = 0; i < pool_states.size(); ++i) |
| 165 | pool_states[i].priority = get_priority(i); |
| 166 | } |
| 167 | |
| 168 | struct ShuffledPool |
| 169 | { |
| 170 | NestedPool * pool{}; |
| 171 | const PoolState * state{}; |
| 172 | size_t index = 0; |
| 173 | size_t error_count = 0; |
| 174 | }; |
| 175 | |
| 176 | /// Sort the pools into order in which they will be tried (based on respective PoolStates). |
| 177 | std::vector<ShuffledPool> shuffled_pools; |
| 178 | shuffled_pools.reserve(nested_pools.size()); |
| 179 | for (size_t i = 0; i < nested_pools.size(); ++i) |
| 180 | shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, 0}); |
| 181 | std::sort( |
| 182 | shuffled_pools.begin(), shuffled_pools.end(), |
| 183 | [](const ShuffledPool & lhs, const ShuffledPool & rhs) |
| 184 | { |
| 185 | return PoolState::compare(*lhs.state, *rhs.state); |
| 186 | }); |
| 187 | |
| 188 | /// We will try to get a connection from each pool until a connection is produced or max_tries is reached. |
| 189 | std::vector<TryResult> try_results(shuffled_pools.size()); |
| 190 | size_t entries_count = 0; |
| 191 | size_t usable_count = 0; |
| 192 | size_t up_to_date_count = 0; |
| 193 | size_t failed_pools_count = 0; |
| 194 | |
| 195 | /// At exit update shared error counts with error counts occurred during this call. |
| 196 | SCOPE_EXIT( |
| 197 | { |
| 198 | std::lock_guard lock(pool_states_mutex); |
| 199 | for (const ShuffledPool & pool: shuffled_pools) |
| 200 | { |
| 201 | auto & pool_state = shared_pool_states[pool.index]; |
| 202 | pool_state.error_count = std::min<UInt64>(max_error_cap, pool_state.error_count + pool.error_count); |
| 203 | } |
| 204 | }); |
| 205 | |
| 206 | std::string fail_messages; |
| 207 | bool finished = false; |
| 208 | while (!finished) |
| 209 | { |
| 210 | for (size_t i = 0; i < shuffled_pools.size(); ++i) |
| 211 | { |
| 212 | if (up_to_date_count >= max_entries /// Already enough good entries. |
| 213 | || entries_count + failed_pools_count >= nested_pools.size()) /// No more good entries will be produced. |
| 214 | { |
| 215 | finished = true; |
| 216 | break; |
| 217 | } |
| 218 | |
| 219 | ShuffledPool & shuffled_pool = shuffled_pools[i]; |
| 220 | TryResult & result = try_results[i]; |
| 221 | if (shuffled_pool.error_count >= max_tries || !result.entry.isNull()) |
| 222 | continue; |
| 223 | |
| 224 | std::string fail_message; |
| 225 | result = try_get_entry(*shuffled_pool.pool, fail_message); |
| 226 | |
| 227 | if (!fail_message.empty()) |
| 228 | fail_messages += fail_message + '\n'; |
| 229 | |
| 230 | if (!result.entry.isNull()) |
| 231 | { |
| 232 | ++entries_count; |
| 233 | if (result.is_usable) |
| 234 | { |
| 235 | ++usable_count; |
| 236 | if (result.is_up_to_date) |
| 237 | ++up_to_date_count; |
| 238 | } |
| 239 | } |
| 240 | else |
| 241 | { |
| 242 | LOG_WARNING(log, "Connection failed at try №" |
| 243 | << (shuffled_pool.error_count + 1) << ", reason: " << fail_message); |
| 244 | ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry); |
| 245 | |
| 246 | shuffled_pool.error_count = std::min(max_error_cap, shuffled_pool.error_count + 1); |
| 247 | |
| 248 | if (shuffled_pool.error_count >= max_tries) |
| 249 | { |
| 250 | ++failed_pools_count; |
| 251 | ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll); |
| 252 | } |
| 253 | } |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | if (usable_count < min_entries) |
| 258 | throw DB::NetException( |
| 259 | "All connection tries failed. Log: \n\n" + fail_messages + "\n" , |
| 260 | DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED); |
| 261 | |
| 262 | try_results.erase( |
| 263 | std::remove_if( |
| 264 | try_results.begin(), try_results.end(), |
| 265 | [](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }), |
| 266 | try_results.end()); |
| 267 | |
| 268 | /// Sort so that preferred items are near the beginning. |
| 269 | std::stable_sort( |
| 270 | try_results.begin(), try_results.end(), |
| 271 | [](const TryResult & left, const TryResult & right) |
| 272 | { |
| 273 | return std::forward_as_tuple(!left.is_up_to_date, left.staleness) |
| 274 | < std::forward_as_tuple(!right.is_up_to_date, right.staleness); |
| 275 | }); |
| 276 | |
| 277 | if (up_to_date_count >= min_entries) |
| 278 | { |
| 279 | /// There is enough up-to-date entries. |
| 280 | try_results.resize(up_to_date_count); |
| 281 | } |
| 282 | else if (fallback_to_stale_replicas) |
| 283 | { |
| 284 | /// There is not enough up-to-date entries but we are allowed to return stale entries. |
| 285 | /// Gather all up-to-date ones and least-bad stale ones. |
| 286 | |
| 287 | size_t size = std::min(try_results.size(), max_entries); |
| 288 | try_results.resize(size); |
| 289 | } |
| 290 | else |
| 291 | throw DB::Exception( |
| 292 | "Could not find enough connections to up-to-date replicas. Got: " + std::to_string(up_to_date_count) |
| 293 | + ", needed: " + std::to_string(min_entries), |
| 294 | DB::ErrorCodes::ALL_REPLICAS_ARE_STALE); |
| 295 | |
| 296 | return try_results; |
| 297 | } |
| 298 | |
| 299 | template <typename TNestedPool> |
| 300 | void PoolWithFailoverBase<TNestedPool>::reportError(const Entry & entry) |
| 301 | { |
| 302 | for (size_t i = 0; i < nested_pools.size(); ++i) |
| 303 | { |
| 304 | if (nested_pools[i]->contains(entry)) |
| 305 | { |
| 306 | std::lock_guard lock(pool_states_mutex); |
| 307 | auto & pool_state = shared_pool_states[i]; |
| 308 | pool_state.error_count = std::min(max_error_cap, pool_state.error_count + 1); |
| 309 | return; |
| 310 | } |
| 311 | } |
| 312 | throw DB::Exception("Can't find pool to report error" , DB::ErrorCodes::LOGICAL_ERROR); |
| 313 | } |
| 314 | |
| 315 | template <typename TNestedPool> |
| 316 | struct PoolWithFailoverBase<TNestedPool>::PoolState |
| 317 | { |
| 318 | UInt64 error_count = 0; |
| 319 | Int64 priority = 0; |
| 320 | UInt32 random = 0; |
| 321 | |
| 322 | void randomize() |
| 323 | { |
| 324 | random = rng(); |
| 325 | } |
| 326 | |
| 327 | static bool compare(const PoolState & lhs, const PoolState & rhs) |
| 328 | { |
| 329 | return std::forward_as_tuple(lhs.error_count, lhs.priority, lhs.random) |
| 330 | < std::forward_as_tuple(rhs.error_count, rhs.priority, rhs.random); |
| 331 | } |
| 332 | |
| 333 | private: |
| 334 | std::minstd_rand rng = std::minstd_rand(randomSeed()); |
| 335 | }; |
| 336 | |
| 337 | template <typename TNestedPool> |
| 338 | typename PoolWithFailoverBase<TNestedPool>::PoolStates |
| 339 | PoolWithFailoverBase<TNestedPool>::updatePoolStates() |
| 340 | { |
| 341 | PoolStates result; |
| 342 | result.reserve(nested_pools.size()); |
| 343 | |
| 344 | { |
| 345 | std::lock_guard lock(pool_states_mutex); |
| 346 | |
| 347 | for (auto & state : shared_pool_states) |
| 348 | state.randomize(); |
| 349 | |
| 350 | time_t current_time = time(nullptr); |
| 351 | |
| 352 | if (last_error_decrease_time) |
| 353 | { |
| 354 | time_t delta = current_time - last_error_decrease_time; |
| 355 | |
| 356 | if (delta >= 0) |
| 357 | { |
| 358 | /// Divide error counts by 2 every decrease_error_period seconds. |
| 359 | size_t shift_amount = delta / decrease_error_period; |
| 360 | /// Update time but don't do it more often than once a period. |
| 361 | /// Else if the function is called often enough, error count will never decrease. |
| 362 | if (shift_amount) |
| 363 | last_error_decrease_time = current_time; |
| 364 | |
| 365 | if (shift_amount >= sizeof(UInt64) * CHAR_BIT) |
| 366 | { |
| 367 | for (auto & state : shared_pool_states) |
| 368 | state.error_count = 0; |
| 369 | } |
| 370 | else if (shift_amount) |
| 371 | { |
| 372 | for (auto & state : shared_pool_states) |
| 373 | state.error_count >>= shift_amount; |
| 374 | } |
| 375 | } |
| 376 | } |
| 377 | else |
| 378 | last_error_decrease_time = current_time; |
| 379 | |
| 380 | result.assign(shared_pool_states.begin(), shared_pool_states.end()); |
| 381 | } |
| 382 | return result; |
| 383 | } |
| 384 | |
| 385 | template <typename TNestedPool> |
| 386 | typename PoolWithFailoverBase<TNestedPool>::PoolStates |
| 387 | PoolWithFailoverBase<TNestedPool>::getPoolStates() const |
| 388 | { |
| 389 | std::lock_guard lock(pool_states_mutex); |
| 390 | return shared_pool_states; |
| 391 | } |
| 392 | |