1 | #if __has_include(<mysql.h>) |
2 | #include <mysql.h> |
3 | #include <mysqld_error.h> |
4 | #else |
5 | #include <mysql/mysql.h> |
6 | #include <mysql/mysqld_error.h> |
7 | #endif |
8 | |
9 | #include <mysqlxx/Pool.h> |
10 | |
11 | #include <common/sleep.h> |
12 | |
13 | #include <Poco/Util/Application.h> |
14 | #include <Poco/Util/LayeredConfiguration.h> |
15 | |
16 | |
17 | namespace mysqlxx |
18 | { |
19 | |
20 | void Pool::Entry::incrementRefCount() |
21 | { |
22 | if (!data) |
23 | return; |
24 | ++data->ref_count; |
25 | mysql_thread_init(); |
26 | } |
27 | |
28 | void Pool::Entry::decrementRefCount() |
29 | { |
30 | if (!data) |
31 | return; |
32 | --data->ref_count; |
33 | mysql_thread_end(); |
34 | } |
35 | |
36 | |
37 | Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name, |
38 | unsigned default_connections_, unsigned max_connections_, |
39 | const char * parent_config_name_) |
40 | : default_connections(default_connections_), max_connections(max_connections_) |
41 | { |
42 | server = cfg.getString(config_name + ".host" ); |
43 | |
44 | if (parent_config_name_) |
45 | { |
46 | const std::string parent_config_name(parent_config_name_); |
47 | db = cfg.getString(config_name + ".db" , cfg.getString(parent_config_name + ".db" , "" )); |
48 | user = cfg.has(config_name + ".user" ) |
49 | ? cfg.getString(config_name + ".user" ) |
50 | : cfg.getString(parent_config_name + ".user" ); |
51 | password = cfg.has(config_name + ".password" ) |
52 | ? cfg.getString(config_name + ".password" ) |
53 | : cfg.getString(parent_config_name + ".password" ); |
54 | |
55 | if (!cfg.has(config_name + ".port" ) && !cfg.has(config_name + ".socket" ) |
56 | && !cfg.has(parent_config_name + ".port" ) && !cfg.has(parent_config_name + ".socket" )) |
57 | throw Poco::Exception("mysqlxx::Pool configuration: expected port or socket" ); |
58 | |
59 | port = cfg.has(config_name + ".port" ) |
60 | ? cfg.getInt(config_name + ".port" ) |
61 | : cfg.getInt(parent_config_name + ".port" , 0); |
62 | socket = cfg.has(config_name + ".socket" ) |
63 | ? cfg.getString(config_name + ".socket" ) |
64 | : cfg.getString(parent_config_name + ".socket" , "" ); |
65 | ssl_ca = cfg.has(config_name + ".ssl_ca" ) |
66 | ? cfg.getString(config_name + ".ssl_ca" ) |
67 | : cfg.getString(parent_config_name + ".ssl_ca" , "" ); |
68 | ssl_cert = cfg.has(config_name + ".ssl_cert" ) |
69 | ? cfg.getString(config_name + ".ssl_cert" ) |
70 | : cfg.getString(parent_config_name + ".ssl_cert" , "" ); |
71 | ssl_key = cfg.has(config_name + ".ssl_key" ) |
72 | ? cfg.getString(config_name + ".ssl_key" ) |
73 | : cfg.getString(parent_config_name + ".ssl_key" , "" ); |
74 | |
75 | enable_local_infile = cfg.getBool(config_name + ".enable_local_infile" , |
76 | cfg.getBool(parent_config_name + ".enable_local_infile" , MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE)); |
77 | } |
78 | else |
79 | { |
80 | db = cfg.getString(config_name + ".db" , "" ); |
81 | user = cfg.getString(config_name + ".user" ); |
82 | password = cfg.getString(config_name + ".password" ); |
83 | |
84 | if (!cfg.has(config_name + ".port" ) && !cfg.has(config_name + ".socket" )) |
85 | throw Poco::Exception("mysqlxx::Pool configuration: expected port or socket" ); |
86 | |
87 | port = cfg.getInt(config_name + ".port" , 0); |
88 | socket = cfg.getString(config_name + ".socket" , "" ); |
89 | ssl_ca = cfg.getString(config_name + ".ssl_ca" , "" ); |
90 | ssl_cert = cfg.getString(config_name + ".ssl_cert" , "" ); |
91 | ssl_key = cfg.getString(config_name + ".ssl_key" , "" ); |
92 | |
93 | enable_local_infile = cfg.getBool( |
94 | config_name + ".enable_local_infile" , MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE); |
95 | } |
96 | |
97 | connect_timeout = cfg.getInt(config_name + ".connect_timeout" , |
98 | cfg.getInt("mysql_connect_timeout" , |
99 | MYSQLXX_DEFAULT_TIMEOUT)); |
100 | |
101 | rw_timeout = |
102 | cfg.getInt(config_name + ".rw_timeout" , |
103 | cfg.getInt("mysql_rw_timeout" , |
104 | MYSQLXX_DEFAULT_RW_TIMEOUT)); |
105 | } |
106 | |
107 | |
108 | Pool::~Pool() |
109 | { |
110 | std::lock_guard<std::mutex> lock(mutex); |
111 | |
112 | for (auto & connection : connections) |
113 | delete static_cast<Connection *>(connection); |
114 | } |
115 | |
116 | |
117 | Pool::Entry Pool::Get() |
118 | { |
119 | std::unique_lock<std::mutex> lock(mutex); |
120 | |
121 | initialize(); |
122 | for (;;) |
123 | { |
124 | for (auto & connection : connections) |
125 | { |
126 | if (connection->ref_count == 0) |
127 | return Entry(connection, this); |
128 | } |
129 | |
130 | if (connections.size() < static_cast<size_t>(max_connections)) |
131 | { |
132 | Connection * conn = allocConnection(); |
133 | if (conn) |
134 | return Entry(conn, this); |
135 | } |
136 | |
137 | lock.unlock(); |
138 | sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); |
139 | lock.lock(); |
140 | } |
141 | } |
142 | |
143 | |
144 | Pool::Entry Pool::tryGet() |
145 | { |
146 | std::lock_guard<std::mutex> lock(mutex); |
147 | |
148 | initialize(); |
149 | |
150 | /// Searching for connection which was established but wasn't used. |
151 | for (auto & connection : connections) |
152 | { |
153 | if (connection->ref_count == 0) |
154 | { |
155 | Entry res(connection, this); |
156 | return res.tryForceConnected() ? res : Entry(); |
157 | } |
158 | } |
159 | |
160 | /// Throws if pool is overflowed. |
161 | if (connections.size() >= max_connections) |
162 | throw Poco::Exception("mysqlxx::Pool is full" ); |
163 | |
164 | /// Allocates new connection. |
165 | Connection * conn = allocConnection(true); |
166 | if (conn) |
167 | return Entry(conn, this); |
168 | |
169 | return Entry(); |
170 | } |
171 | |
172 | |
173 | void Pool::Entry::disconnect() |
174 | { |
175 | if (data) |
176 | { |
177 | decrementRefCount(); |
178 | data->conn.disconnect(); |
179 | } |
180 | } |
181 | |
182 | |
183 | void Pool::Entry::forceConnected() const |
184 | { |
185 | if (data == nullptr) |
186 | throw Poco::RuntimeException("Tried to access NULL database connection." ); |
187 | |
188 | Poco::Util::Application & app = Poco::Util::Application::instance(); |
189 | if (data->conn.ping()) |
190 | return; |
191 | |
192 | bool first = true; |
193 | do |
194 | { |
195 | if (first) |
196 | first = false; |
197 | else |
198 | sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); |
199 | |
200 | app.logger().information("MYSQL: Reconnecting to " + pool->description); |
201 | data->conn.connect( |
202 | pool->db.c_str(), |
203 | pool->server.c_str(), |
204 | pool->user.c_str(), |
205 | pool->password.c_str(), |
206 | pool->port, |
207 | pool->socket.c_str(), |
208 | pool->ssl_ca.c_str(), |
209 | pool->ssl_cert.c_str(), |
210 | pool->ssl_key.c_str(), |
211 | pool->connect_timeout, |
212 | pool->rw_timeout, |
213 | pool->enable_local_infile); |
214 | } |
215 | while (!data->conn.ping()); |
216 | } |
217 | |
218 | |
219 | void Pool::initialize() |
220 | { |
221 | if (!initialized) |
222 | { |
223 | description = db + "@" + server + ":" + std::to_string(port) + " as user " + user; |
224 | |
225 | for (unsigned i = 0; i < default_connections; ++i) |
226 | allocConnection(); |
227 | |
228 | initialized = true; |
229 | } |
230 | } |
231 | |
232 | |
233 | Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time) |
234 | { |
235 | Poco::Util::Application & app = Poco::Util::Application::instance(); |
236 | |
237 | std::unique_ptr<Connection> conn(new Connection); |
238 | |
239 | try |
240 | { |
241 | app.logger().information("MYSQL: Connecting to " + description); |
242 | |
243 | conn->conn.connect( |
244 | db.c_str(), |
245 | server.c_str(), |
246 | user.c_str(), |
247 | password.c_str(), |
248 | port, |
249 | socket.c_str(), |
250 | ssl_ca.c_str(), |
251 | ssl_cert.c_str(), |
252 | ssl_key.c_str(), |
253 | connect_timeout, |
254 | rw_timeout, |
255 | enable_local_infile); |
256 | } |
257 | catch (mysqlxx::ConnectionFailed & e) |
258 | { |
259 | if ((!was_successful && !dont_throw_if_failed_first_time) |
260 | || e.errnum() == ER_ACCESS_DENIED_ERROR |
261 | || e.errnum() == ER_DBACCESS_DENIED_ERROR |
262 | || e.errnum() == ER_BAD_DB_ERROR) |
263 | { |
264 | app.logger().error(e.what()); |
265 | throw; |
266 | } |
267 | else |
268 | { |
269 | app.logger().error(e.what()); |
270 | return nullptr; |
271 | } |
272 | } |
273 | |
274 | was_successful = true; |
275 | auto * connection = conn.release(); |
276 | connections.push_back(connection); |
277 | return connection; |
278 | } |
279 | |
280 | } |
281 | |