1 | #pragma once |
2 | |
3 | #include <Common/PoolWithFailoverBase.h> |
4 | #include <Client/ConnectionPool.h> |
5 | |
6 | #include <chrono> |
7 | #include <vector> |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | /** Connection pool with fault tolerance. |
14 | * Initialized by several other IConnectionPools. |
15 | * When a connection is received, it tries to create or select a live connection from a pool, |
16 | * fetch them in some order, using no more than the specified number of attempts. |
17 | * Pools with fewer errors are preferred; |
18 | * pools with the same number of errors are tried in random order. |
19 | * |
20 | * Note: if one of the nested pools is blocked due to overflow, then this pool will also be blocked. |
21 | */ |
22 | |
23 | /// Specifies how many connections to return from ConnectionPoolWithFailover::getMany() method. |
24 | enum class PoolMode |
25 | { |
26 | /// Return exactly one connection. |
27 | GET_ONE = 0, |
28 | /// Return a number of connections, this number being determined by max_parallel_replicas setting. |
29 | GET_MANY, |
30 | /// Return a connection from each nested pool. |
31 | GET_ALL |
32 | }; |
33 | |
34 | class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailoverBase<IConnectionPool> |
35 | { |
36 | public: |
37 | ConnectionPoolWithFailover( |
38 | ConnectionPoolPtrs nested_pools_, |
39 | LoadBalancing load_balancing, |
40 | time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD, |
41 | size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT); |
42 | |
43 | using Entry = IConnectionPool::Entry; |
44 | |
45 | /** Allocates connection to work. */ |
46 | Entry get(const ConnectionTimeouts & timeouts, |
47 | const Settings * settings = nullptr, |
48 | bool force_connected = true) override; /// From IConnectionPool |
49 | |
50 | /** Allocates up to the specified number of connections to work. |
51 | * Connections provide access to different replicas of one shard. |
52 | */ |
53 | std::vector<Entry> getMany(const ConnectionTimeouts & timeouts, |
54 | const Settings * settings, PoolMode pool_mode); |
55 | |
56 | /// The same as getMany(), but return std::vector<TryResult>. |
57 | std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts, |
58 | const Settings * settings, PoolMode pool_mode); |
59 | |
60 | using Base = PoolWithFailoverBase<IConnectionPool>; |
61 | using TryResult = Base::TryResult; |
62 | |
63 | /// The same as getMany(), but check that replication delay for table_to_check is acceptable. |
64 | /// Delay threshold is taken from settings. |
65 | std::vector<TryResult> getManyChecked( |
66 | const ConnectionTimeouts & timeouts, |
67 | const Settings * settings, |
68 | PoolMode pool_mode, |
69 | const QualifiedTableName & table_to_check); |
70 | |
71 | struct NestedPoolStatus |
72 | { |
73 | const IConnectionPool * pool; |
74 | size_t error_count; |
75 | std::chrono::seconds estimated_recovery_time; |
76 | }; |
77 | |
78 | using Status = std::vector<NestedPoolStatus>; |
79 | Status getStatus() const; |
80 | |
81 | private: |
82 | /// Get the values of relevant settings and call Base::getMany() |
83 | std::vector<TryResult> getManyImpl( |
84 | const Settings * settings, |
85 | PoolMode pool_mode, |
86 | const TryGetEntryFunc & try_get_entry); |
87 | |
88 | /// Try to get a connection from the pool and check that it is good. |
89 | /// If table_to_check is not null and the check is enabled in settings, check that replication delay |
90 | /// for this table is not too large. |
91 | TryResult tryGetEntry( |
92 | IConnectionPool & pool, |
93 | const ConnectionTimeouts & timeouts, |
94 | std::string & fail_message, |
95 | const Settings * settings, |
96 | const QualifiedTableName * table_to_check = nullptr); |
97 | |
98 | private: |
99 | std::vector<size_t> hostname_differences; /// Distances from name of this host to the names of hosts of pools. |
100 | LoadBalancing default_load_balancing; |
101 | }; |
102 | |
103 | using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>; |
104 | using ConnectionPoolWithFailoverPtrs = std::vector<ConnectionPoolWithFailoverPtr>; |
105 | |
106 | } |
107 | |