1#pragma once
2
3#include <Common/PoolWithFailoverBase.h>
4#include <Client/ConnectionPool.h>
5
6#include <chrono>
7#include <vector>
8
9
10namespace DB
11{
12
13/** Connection pool with fault tolerance.
14 * Initialized by several other IConnectionPools.
15 * When a connection is received, it tries to create or select a live connection from a pool,
16 * fetch them in some order, using no more than the specified number of attempts.
17 * Pools with fewer errors are preferred;
18 * pools with the same number of errors are tried in random order.
19 *
20 * Note: if one of the nested pools is blocked due to overflow, then this pool will also be blocked.
21 */
22
23/// Specifies how many connections to return from ConnectionPoolWithFailover::getMany() method.
24enum class PoolMode
25{
26 /// Return exactly one connection.
27 GET_ONE = 0,
28 /// Return a number of connections, this number being determined by max_parallel_replicas setting.
29 GET_MANY,
30 /// Return a connection from each nested pool.
31 GET_ALL
32};
33
34class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailoverBase<IConnectionPool>
35{
36public:
37 ConnectionPoolWithFailover(
38 ConnectionPoolPtrs nested_pools_,
39 LoadBalancing load_balancing,
40 time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD,
41 size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
42
43 using Entry = IConnectionPool::Entry;
44
45 /** Allocates connection to work. */
46 Entry get(const ConnectionTimeouts & timeouts,
47 const Settings * settings = nullptr,
48 bool force_connected = true) override; /// From IConnectionPool
49
50 /** Allocates up to the specified number of connections to work.
51 * Connections provide access to different replicas of one shard.
52 */
53 std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
54 const Settings * settings, PoolMode pool_mode);
55
56 /// The same as getMany(), but return std::vector<TryResult>.
57 std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
58 const Settings * settings, PoolMode pool_mode);
59
60 using Base = PoolWithFailoverBase<IConnectionPool>;
61 using TryResult = Base::TryResult;
62
63 /// The same as getMany(), but check that replication delay for table_to_check is acceptable.
64 /// Delay threshold is taken from settings.
65 std::vector<TryResult> getManyChecked(
66 const ConnectionTimeouts & timeouts,
67 const Settings * settings,
68 PoolMode pool_mode,
69 const QualifiedTableName & table_to_check);
70
71 struct NestedPoolStatus
72 {
73 const IConnectionPool * pool;
74 size_t error_count;
75 std::chrono::seconds estimated_recovery_time;
76 };
77
78 using Status = std::vector<NestedPoolStatus>;
79 Status getStatus() const;
80
81private:
82 /// Get the values of relevant settings and call Base::getMany()
83 std::vector<TryResult> getManyImpl(
84 const Settings * settings,
85 PoolMode pool_mode,
86 const TryGetEntryFunc & try_get_entry);
87
88 /// Try to get a connection from the pool and check that it is good.
89 /// If table_to_check is not null and the check is enabled in settings, check that replication delay
90 /// for this table is not too large.
91 TryResult tryGetEntry(
92 IConnectionPool & pool,
93 const ConnectionTimeouts & timeouts,
94 std::string & fail_message,
95 const Settings * settings,
96 const QualifiedTableName * table_to_check = nullptr);
97
98private:
99 std::vector<size_t> hostname_differences; /// Distances from name of this host to the names of hosts of pools.
100 LoadBalancing default_load_balancing;
101};
102
103using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
104using ConnectionPoolWithFailoverPtrs = std::vector<ConnectionPoolWithFailoverPtr>;
105
106}
107