1 | #include <mysqlxx/PoolWithFailover.h> |
2 | |
3 | |
4 | /// Duplicate of code from StringUtils.h. Copied here for less dependencies. |
5 | static bool startsWith(const std::string & s, const char * prefix) |
6 | { |
7 | return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix)); |
8 | } |
9 | |
10 | |
11 | using namespace mysqlxx; |
12 | |
13 | PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg, |
14 | const std::string & config_name, const unsigned default_connections, |
15 | const unsigned max_connections, const size_t max_tries) |
16 | : max_tries(max_tries) |
17 | { |
18 | if (cfg.has(config_name + ".replica" )) |
19 | { |
20 | Poco::Util::AbstractConfiguration::Keys replica_keys; |
21 | cfg.keys(config_name, replica_keys); |
22 | for (const auto & replica_config_key : replica_keys) |
23 | { |
24 | /// There could be another elements in the same level in configuration file, like "password", "port"... |
25 | if (startsWith(replica_config_key, "replica" )) |
26 | { |
27 | std::string replica_name = config_name + "." + replica_config_key; |
28 | |
29 | int priority = cfg.getInt(replica_name + ".priority" , 0); |
30 | |
31 | replicas_by_priority[priority].emplace_back( |
32 | std::make_shared<Pool>(cfg, replica_name, default_connections, max_connections, config_name.c_str())); |
33 | } |
34 | } |
35 | } |
36 | else |
37 | { |
38 | replicas_by_priority[0].emplace_back( |
39 | std::make_shared<Pool>(cfg, config_name, default_connections, max_connections)); |
40 | } |
41 | } |
42 | |
43 | PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsigned default_connections, |
44 | const unsigned max_connections, const size_t max_tries) |
45 | : PoolWithFailover{ |
46 | Poco::Util::Application::instance().config(), config_name, |
47 | default_connections, max_connections, max_tries} |
48 | {} |
49 | |
50 | PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) |
51 | : max_tries{other.max_tries} |
52 | { |
53 | for (const auto & priority_replicas : other.replicas_by_priority) |
54 | { |
55 | Replicas replicas; |
56 | replicas.reserve(priority_replicas.second.size()); |
57 | for (const auto & pool : priority_replicas.second) |
58 | replicas.emplace_back(std::make_shared<Pool>(*pool)); |
59 | replicas_by_priority.emplace(priority_replicas.first, std::move(replicas)); |
60 | } |
61 | } |
62 | |
63 | PoolWithFailover::Entry PoolWithFailover::Get() |
64 | { |
65 | Poco::Util::Application & app = Poco::Util::Application::instance(); |
66 | std::lock_guard<std::mutex> locker(mutex); |
67 | |
68 | /// If we cannot connect to some replica due to pool overflow, than we will wait and connect. |
69 | PoolPtr * full_pool = nullptr; |
70 | |
71 | for (size_t try_no = 0; try_no < max_tries; ++try_no) |
72 | { |
73 | full_pool = nullptr; |
74 | |
75 | for (auto & priority_replicas : replicas_by_priority) |
76 | { |
77 | Replicas & replicas = priority_replicas.second; |
78 | for (size_t i = 0, size = replicas.size(); i < size; ++i) |
79 | { |
80 | PoolPtr & pool = replicas[i]; |
81 | |
82 | try |
83 | { |
84 | Entry entry = pool->tryGet(); |
85 | |
86 | if (!entry.isNull()) |
87 | { |
88 | /// Move all traversed replicas to the end of queue. |
89 | /// (No need to move replicas with another priority) |
90 | std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end()); |
91 | |
92 | return entry; |
93 | } |
94 | } |
95 | catch (const Poco::Exception & e) |
96 | { |
97 | if (e.displayText().find("mysqlxx::Pool is full" ) != std::string::npos) /// NOTE: String comparison is trashy code. |
98 | { |
99 | full_pool = &pool; |
100 | } |
101 | |
102 | app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText()); |
103 | continue; |
104 | } |
105 | |
106 | app.logger().warning("Connection to " + pool->getDescription() + " failed." ); |
107 | } |
108 | } |
109 | |
110 | app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times" ); |
111 | } |
112 | |
113 | if (full_pool) |
114 | { |
115 | app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription()); |
116 | return (*full_pool)->Get(); |
117 | } |
118 | |
119 | std::stringstream message; |
120 | message << "Connections to all replicas failed: " ; |
121 | for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it) |
122 | for (auto jt = it->second.begin(); jt != it->second.end(); ++jt) |
123 | message << (it == replicas_by_priority.begin() && jt == it->second.begin() ? "" : ", " ) << (*jt)->getDescription(); |
124 | |
125 | throw Poco::Exception(message.str()); |
126 | } |
127 | |