1#if __has_include(<mysql.h>)
2#include <mysql.h>
3#include <mysqld_error.h>
4#else
5#include <mysql/mysql.h>
6#include <mysql/mysqld_error.h>
7#endif
8
9#include <mysqlxx/Pool.h>
10
11#include <common/sleep.h>
12
13#include <Poco/Util/Application.h>
14#include <Poco/Util/LayeredConfiguration.h>
15
16
17namespace mysqlxx
18{
19
20void Pool::Entry::incrementRefCount()
21{
22 if (!data)
23 return;
24 ++data->ref_count;
25 mysql_thread_init();
26}
27
28void Pool::Entry::decrementRefCount()
29{
30 if (!data)
31 return;
32 --data->ref_count;
33 mysql_thread_end();
34}
35
36
37Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
38 unsigned default_connections_, unsigned max_connections_,
39 const char * parent_config_name_)
40 : default_connections(default_connections_), max_connections(max_connections_)
41{
42 server = cfg.getString(config_name + ".host");
43
44 if (parent_config_name_)
45 {
46 const std::string parent_config_name(parent_config_name_);
47 db = cfg.getString(config_name + ".db", cfg.getString(parent_config_name + ".db", ""));
48 user = cfg.has(config_name + ".user")
49 ? cfg.getString(config_name + ".user")
50 : cfg.getString(parent_config_name + ".user");
51 password = cfg.has(config_name + ".password")
52 ? cfg.getString(config_name + ".password")
53 : cfg.getString(parent_config_name + ".password");
54
55 if (!cfg.has(config_name + ".port") && !cfg.has(config_name + ".socket")
56 && !cfg.has(parent_config_name + ".port") && !cfg.has(parent_config_name + ".socket"))
57 throw Poco::Exception("mysqlxx::Pool configuration: expected port or socket");
58
59 port = cfg.has(config_name + ".port")
60 ? cfg.getInt(config_name + ".port")
61 : cfg.getInt(parent_config_name + ".port", 0);
62 socket = cfg.has(config_name + ".socket")
63 ? cfg.getString(config_name + ".socket")
64 : cfg.getString(parent_config_name + ".socket", "");
65 ssl_ca = cfg.has(config_name + ".ssl_ca")
66 ? cfg.getString(config_name + ".ssl_ca")
67 : cfg.getString(parent_config_name + ".ssl_ca", "");
68 ssl_cert = cfg.has(config_name + ".ssl_cert")
69 ? cfg.getString(config_name + ".ssl_cert")
70 : cfg.getString(parent_config_name + ".ssl_cert", "");
71 ssl_key = cfg.has(config_name + ".ssl_key")
72 ? cfg.getString(config_name + ".ssl_key")
73 : cfg.getString(parent_config_name + ".ssl_key", "");
74
75 enable_local_infile = cfg.getBool(config_name + ".enable_local_infile",
76 cfg.getBool(parent_config_name + ".enable_local_infile", MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE));
77 }
78 else
79 {
80 db = cfg.getString(config_name + ".db", "");
81 user = cfg.getString(config_name + ".user");
82 password = cfg.getString(config_name + ".password");
83
84 if (!cfg.has(config_name + ".port") && !cfg.has(config_name + ".socket"))
85 throw Poco::Exception("mysqlxx::Pool configuration: expected port or socket");
86
87 port = cfg.getInt(config_name + ".port", 0);
88 socket = cfg.getString(config_name + ".socket", "");
89 ssl_ca = cfg.getString(config_name + ".ssl_ca", "");
90 ssl_cert = cfg.getString(config_name + ".ssl_cert", "");
91 ssl_key = cfg.getString(config_name + ".ssl_key", "");
92
93 enable_local_infile = cfg.getBool(
94 config_name + ".enable_local_infile", MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE);
95 }
96
97 connect_timeout = cfg.getInt(config_name + ".connect_timeout",
98 cfg.getInt("mysql_connect_timeout",
99 MYSQLXX_DEFAULT_TIMEOUT));
100
101 rw_timeout =
102 cfg.getInt(config_name + ".rw_timeout",
103 cfg.getInt("mysql_rw_timeout",
104 MYSQLXX_DEFAULT_RW_TIMEOUT));
105}
106
107
108Pool::~Pool()
109{
110 std::lock_guard<std::mutex> lock(mutex);
111
112 for (auto & connection : connections)
113 delete static_cast<Connection *>(connection);
114}
115
116
117Pool::Entry Pool::Get()
118{
119 std::unique_lock<std::mutex> lock(mutex);
120
121 initialize();
122 for (;;)
123 {
124 for (auto & connection : connections)
125 {
126 if (connection->ref_count == 0)
127 return Entry(connection, this);
128 }
129
130 if (connections.size() < static_cast<size_t>(max_connections))
131 {
132 Connection * conn = allocConnection();
133 if (conn)
134 return Entry(conn, this);
135 }
136
137 lock.unlock();
138 sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
139 lock.lock();
140 }
141}
142
143
144Pool::Entry Pool::tryGet()
145{
146 std::lock_guard<std::mutex> lock(mutex);
147
148 initialize();
149
150 /// Searching for connection which was established but wasn't used.
151 for (auto & connection : connections)
152 {
153 if (connection->ref_count == 0)
154 {
155 Entry res(connection, this);
156 return res.tryForceConnected() ? res : Entry();
157 }
158 }
159
160 /// Throws if pool is overflowed.
161 if (connections.size() >= max_connections)
162 throw Poco::Exception("mysqlxx::Pool is full");
163
164 /// Allocates new connection.
165 Connection * conn = allocConnection(true);
166 if (conn)
167 return Entry(conn, this);
168
169 return Entry();
170}
171
172
173void Pool::Entry::disconnect()
174{
175 if (data)
176 {
177 decrementRefCount();
178 data->conn.disconnect();
179 }
180}
181
182
183void Pool::Entry::forceConnected() const
184{
185 if (data == nullptr)
186 throw Poco::RuntimeException("Tried to access NULL database connection.");
187
188 Poco::Util::Application & app = Poco::Util::Application::instance();
189 if (data->conn.ping())
190 return;
191
192 bool first = true;
193 do
194 {
195 if (first)
196 first = false;
197 else
198 sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
199
200 app.logger().information("MYSQL: Reconnecting to " + pool->description);
201 data->conn.connect(
202 pool->db.c_str(),
203 pool->server.c_str(),
204 pool->user.c_str(),
205 pool->password.c_str(),
206 pool->port,
207 pool->socket.c_str(),
208 pool->ssl_ca.c_str(),
209 pool->ssl_cert.c_str(),
210 pool->ssl_key.c_str(),
211 pool->connect_timeout,
212 pool->rw_timeout,
213 pool->enable_local_infile);
214 }
215 while (!data->conn.ping());
216}
217
218
219void Pool::initialize()
220{
221 if (!initialized)
222 {
223 description = db + "@" + server + ":" + std::to_string(port) + " as user " + user;
224
225 for (unsigned i = 0; i < default_connections; ++i)
226 allocConnection();
227
228 initialized = true;
229 }
230}
231
232
233Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
234{
235 Poco::Util::Application & app = Poco::Util::Application::instance();
236
237 std::unique_ptr<Connection> conn(new Connection);
238
239 try
240 {
241 app.logger().information("MYSQL: Connecting to " + description);
242
243 conn->conn.connect(
244 db.c_str(),
245 server.c_str(),
246 user.c_str(),
247 password.c_str(),
248 port,
249 socket.c_str(),
250 ssl_ca.c_str(),
251 ssl_cert.c_str(),
252 ssl_key.c_str(),
253 connect_timeout,
254 rw_timeout,
255 enable_local_infile);
256 }
257 catch (mysqlxx::ConnectionFailed & e)
258 {
259 if ((!was_successful && !dont_throw_if_failed_first_time)
260 || e.errnum() == ER_ACCESS_DENIED_ERROR
261 || e.errnum() == ER_DBACCESS_DENIED_ERROR
262 || e.errnum() == ER_BAD_DB_ERROR)
263 {
264 app.logger().error(e.what());
265 throw;
266 }
267 else
268 {
269 app.logger().error(e.what());
270 return nullptr;
271 }
272 }
273
274 was_successful = true;
275 auto * connection = conn.release();
276 connections.push_back(connection);
277 return connection;
278}
279
280}
281