1 | #pragma once |
2 | |
3 | #include <Common/PoolBase.h> |
4 | |
5 | #include <Client/Connection.h> |
6 | #include <IO/ConnectionTimeouts.h> |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | /** Interface for connection pools. |
12 | * |
13 | * Usage (using the usual `ConnectionPool` example) |
14 | * ConnectionPool pool(...); |
15 | * |
16 | * void thread() |
17 | * { |
18 | * auto connection = pool.get(); |
19 | * connection->sendQuery("SELECT 'Hello, world!' AS world"); |
20 | * } |
21 | */ |
22 | |
23 | class IConnectionPool : private boost::noncopyable |
24 | { |
25 | public: |
26 | using Entry = PoolBase<Connection>::Entry; |
27 | |
28 | public: |
29 | virtual ~IConnectionPool() {} |
30 | |
31 | /// Selects the connection to work. |
32 | /// If force_connected is false, the client must manually ensure that returned connection is good. |
33 | virtual Entry get(const ConnectionTimeouts & timeouts, |
34 | const Settings * settings = nullptr, |
35 | bool force_connected = true) = 0; |
36 | }; |
37 | |
38 | using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>; |
39 | using ConnectionPoolPtrs = std::vector<ConnectionPoolPtr>; |
40 | |
41 | /** A common connection pool, without fault tolerance. |
42 | */ |
43 | class ConnectionPool : public IConnectionPool, private PoolBase<Connection> |
44 | { |
45 | public: |
46 | using Entry = IConnectionPool::Entry; |
47 | using Base = PoolBase<Connection>; |
48 | |
49 | ConnectionPool(unsigned max_connections_, |
50 | const String & host_, |
51 | UInt16 port_, |
52 | const String & default_database_, |
53 | const String & user_, |
54 | const String & password_, |
55 | const String & client_name_ = "client" , |
56 | Protocol::Compression compression_ = Protocol::Compression::Enable, |
57 | Protocol::Secure secure_ = Protocol::Secure::Disable) |
58 | : Base(max_connections_, |
59 | &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")" )), |
60 | host(host_), |
61 | port(port_), |
62 | default_database(default_database_), |
63 | user(user_), |
64 | password(password_), |
65 | client_name(client_name_), |
66 | compression(compression_), |
67 | secure{secure_} |
68 | { |
69 | } |
70 | |
71 | Entry get(const ConnectionTimeouts & timeouts, |
72 | const Settings * settings = nullptr, |
73 | bool force_connected = true) override |
74 | { |
75 | Entry entry; |
76 | if (settings) |
77 | entry = Base::get(settings->connection_pool_max_wait_ms.totalMilliseconds()); |
78 | else |
79 | entry = Base::get(-1); |
80 | |
81 | if (force_connected) |
82 | entry->forceConnected(timeouts); |
83 | |
84 | return entry; |
85 | } |
86 | |
87 | const std::string & getHost() const |
88 | { |
89 | return host; |
90 | } |
91 | std::string getDescription() const |
92 | { |
93 | return host + ":" + toString(port); |
94 | } |
95 | |
96 | protected: |
97 | /** Creates a new object to put in the pool. */ |
98 | ConnectionPtr allocObject() override |
99 | { |
100 | return std::make_shared<Connection>( |
101 | host, port, |
102 | default_database, user, password, |
103 | client_name, compression, secure); |
104 | } |
105 | |
106 | private: |
107 | String host; |
108 | UInt16 port; |
109 | String default_database; |
110 | String user; |
111 | String password; |
112 | |
113 | String client_name; |
114 | Protocol::Compression compression; /// Whether to compress data when interacting with the server. |
115 | Protocol::Secure secure; /// Whether to encrypt data when interacting with the server. |
116 | |
117 | }; |
118 | |
119 | } |
120 | |