1#pragma once
2
3#include <Common/PoolBase.h>
4
5#include <Client/Connection.h>
6#include <IO/ConnectionTimeouts.h>
7
8namespace DB
9{
10
11/** Interface for connection pools.
12 *
13 * Usage (using the usual `ConnectionPool` example)
14 * ConnectionPool pool(...);
15 *
16 * void thread()
17 * {
18 * auto connection = pool.get();
19 * connection->sendQuery("SELECT 'Hello, world!' AS world");
20 * }
21 */
22
23class IConnectionPool : private boost::noncopyable
24{
25public:
26 using Entry = PoolBase<Connection>::Entry;
27
28public:
29 virtual ~IConnectionPool() {}
30
31 /// Selects the connection to work.
32 /// If force_connected is false, the client must manually ensure that returned connection is good.
33 virtual Entry get(const ConnectionTimeouts & timeouts,
34 const Settings * settings = nullptr,
35 bool force_connected = true) = 0;
36};
37
38using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
39using ConnectionPoolPtrs = std::vector<ConnectionPoolPtr>;
40
41/** A common connection pool, without fault tolerance.
42 */
43class ConnectionPool : public IConnectionPool, private PoolBase<Connection>
44{
45public:
46 using Entry = IConnectionPool::Entry;
47 using Base = PoolBase<Connection>;
48
49 ConnectionPool(unsigned max_connections_,
50 const String & host_,
51 UInt16 port_,
52 const String & default_database_,
53 const String & user_,
54 const String & password_,
55 const String & client_name_ = "client",
56 Protocol::Compression compression_ = Protocol::Compression::Enable,
57 Protocol::Secure secure_ = Protocol::Secure::Disable)
58 : Base(max_connections_,
59 &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
60 host(host_),
61 port(port_),
62 default_database(default_database_),
63 user(user_),
64 password(password_),
65 client_name(client_name_),
66 compression(compression_),
67 secure{secure_}
68 {
69 }
70
71 Entry get(const ConnectionTimeouts & timeouts,
72 const Settings * settings = nullptr,
73 bool force_connected = true) override
74 {
75 Entry entry;
76 if (settings)
77 entry = Base::get(settings->connection_pool_max_wait_ms.totalMilliseconds());
78 else
79 entry = Base::get(-1);
80
81 if (force_connected)
82 entry->forceConnected(timeouts);
83
84 return entry;
85 }
86
87 const std::string & getHost() const
88 {
89 return host;
90 }
91 std::string getDescription() const
92 {
93 return host + ":" + toString(port);
94 }
95
96protected:
97 /** Creates a new object to put in the pool. */
98 ConnectionPtr allocObject() override
99 {
100 return std::make_shared<Connection>(
101 host, port,
102 default_database, user, password,
103 client_name, compression, secure);
104 }
105
106private:
107 String host;
108 UInt16 port;
109 String default_database;
110 String user;
111 String password;
112
113 String client_name;
114 Protocol::Compression compression; /// Whether to compress data when interacting with the server.
115 Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
116
117};
118
119}
120