| 1 | #pragma once |
| 2 | |
| 3 | #include <Common/PoolBase.h> |
| 4 | |
| 5 | #include <Client/Connection.h> |
| 6 | #include <IO/ConnectionTimeouts.h> |
| 7 | |
| 8 | namespace DB |
| 9 | { |
| 10 | |
| 11 | /** Interface for connection pools. |
| 12 | * |
| 13 | * Usage (using the usual `ConnectionPool` example) |
| 14 | * ConnectionPool pool(...); |
| 15 | * |
| 16 | * void thread() |
| 17 | * { |
| 18 | * auto connection = pool.get(); |
| 19 | * connection->sendQuery("SELECT 'Hello, world!' AS world"); |
| 20 | * } |
| 21 | */ |
| 22 | |
| 23 | class IConnectionPool : private boost::noncopyable |
| 24 | { |
| 25 | public: |
| 26 | using Entry = PoolBase<Connection>::Entry; |
| 27 | |
| 28 | public: |
| 29 | virtual ~IConnectionPool() {} |
| 30 | |
| 31 | /// Selects the connection to work. |
| 32 | /// If force_connected is false, the client must manually ensure that returned connection is good. |
| 33 | virtual Entry get(const ConnectionTimeouts & timeouts, |
| 34 | const Settings * settings = nullptr, |
| 35 | bool force_connected = true) = 0; |
| 36 | }; |
| 37 | |
| 38 | using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>; |
| 39 | using ConnectionPoolPtrs = std::vector<ConnectionPoolPtr>; |
| 40 | |
| 41 | /** A common connection pool, without fault tolerance. |
| 42 | */ |
| 43 | class ConnectionPool : public IConnectionPool, private PoolBase<Connection> |
| 44 | { |
| 45 | public: |
| 46 | using Entry = IConnectionPool::Entry; |
| 47 | using Base = PoolBase<Connection>; |
| 48 | |
| 49 | ConnectionPool(unsigned max_connections_, |
| 50 | const String & host_, |
| 51 | UInt16 port_, |
| 52 | const String & default_database_, |
| 53 | const String & user_, |
| 54 | const String & password_, |
| 55 | const String & client_name_ = "client" , |
| 56 | Protocol::Compression compression_ = Protocol::Compression::Enable, |
| 57 | Protocol::Secure secure_ = Protocol::Secure::Disable) |
| 58 | : Base(max_connections_, |
| 59 | &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")" )), |
| 60 | host(host_), |
| 61 | port(port_), |
| 62 | default_database(default_database_), |
| 63 | user(user_), |
| 64 | password(password_), |
| 65 | client_name(client_name_), |
| 66 | compression(compression_), |
| 67 | secure{secure_} |
| 68 | { |
| 69 | } |
| 70 | |
| 71 | Entry get(const ConnectionTimeouts & timeouts, |
| 72 | const Settings * settings = nullptr, |
| 73 | bool force_connected = true) override |
| 74 | { |
| 75 | Entry entry; |
| 76 | if (settings) |
| 77 | entry = Base::get(settings->connection_pool_max_wait_ms.totalMilliseconds()); |
| 78 | else |
| 79 | entry = Base::get(-1); |
| 80 | |
| 81 | if (force_connected) |
| 82 | entry->forceConnected(timeouts); |
| 83 | |
| 84 | return entry; |
| 85 | } |
| 86 | |
| 87 | const std::string & getHost() const |
| 88 | { |
| 89 | return host; |
| 90 | } |
| 91 | std::string getDescription() const |
| 92 | { |
| 93 | return host + ":" + toString(port); |
| 94 | } |
| 95 | |
| 96 | protected: |
| 97 | /** Creates a new object to put in the pool. */ |
| 98 | ConnectionPtr allocObject() override |
| 99 | { |
| 100 | return std::make_shared<Connection>( |
| 101 | host, port, |
| 102 | default_database, user, password, |
| 103 | client_name, compression, secure); |
| 104 | } |
| 105 | |
| 106 | private: |
| 107 | String host; |
| 108 | UInt16 port; |
| 109 | String default_database; |
| 110 | String user; |
| 111 | String password; |
| 112 | |
| 113 | String client_name; |
| 114 | Protocol::Compression compression; /// Whether to compress data when interacting with the server. |
| 115 | Protocol::Secure secure; /// Whether to encrypt data when interacting with the server. |
| 116 | |
| 117 | }; |
| 118 | |
| 119 | } |
| 120 | |