1#include <mysqlxx/PoolWithFailover.h>
2
3
4/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
5static bool startsWith(const std::string & s, const char * prefix)
6{
7 return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
8}
9
10
11using namespace mysqlxx;
12
13PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg,
14 const std::string & config_name, const unsigned default_connections,
15 const unsigned max_connections, const size_t max_tries)
16 : max_tries(max_tries)
17{
18 if (cfg.has(config_name + ".replica"))
19 {
20 Poco::Util::AbstractConfiguration::Keys replica_keys;
21 cfg.keys(config_name, replica_keys);
22 for (const auto & replica_config_key : replica_keys)
23 {
24 /// There could be another elements in the same level in configuration file, like "password", "port"...
25 if (startsWith(replica_config_key, "replica"))
26 {
27 std::string replica_name = config_name + "." + replica_config_key;
28
29 int priority = cfg.getInt(replica_name + ".priority", 0);
30
31 replicas_by_priority[priority].emplace_back(
32 std::make_shared<Pool>(cfg, replica_name, default_connections, max_connections, config_name.c_str()));
33 }
34 }
35 }
36 else
37 {
38 replicas_by_priority[0].emplace_back(
39 std::make_shared<Pool>(cfg, config_name, default_connections, max_connections));
40 }
41}
42
43PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsigned default_connections,
44 const unsigned max_connections, const size_t max_tries)
45 : PoolWithFailover{
46 Poco::Util::Application::instance().config(), config_name,
47 default_connections, max_connections, max_tries}
48{}
49
50PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
51 : max_tries{other.max_tries}
52{
53 for (const auto & priority_replicas : other.replicas_by_priority)
54 {
55 Replicas replicas;
56 replicas.reserve(priority_replicas.second.size());
57 for (const auto & pool : priority_replicas.second)
58 replicas.emplace_back(std::make_shared<Pool>(*pool));
59 replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
60 }
61}
62
63PoolWithFailover::Entry PoolWithFailover::Get()
64{
65 Poco::Util::Application & app = Poco::Util::Application::instance();
66 std::lock_guard<std::mutex> locker(mutex);
67
68 /// If we cannot connect to some replica due to pool overflow, than we will wait and connect.
69 PoolPtr * full_pool = nullptr;
70
71 for (size_t try_no = 0; try_no < max_tries; ++try_no)
72 {
73 full_pool = nullptr;
74
75 for (auto & priority_replicas : replicas_by_priority)
76 {
77 Replicas & replicas = priority_replicas.second;
78 for (size_t i = 0, size = replicas.size(); i < size; ++i)
79 {
80 PoolPtr & pool = replicas[i];
81
82 try
83 {
84 Entry entry = pool->tryGet();
85
86 if (!entry.isNull())
87 {
88 /// Move all traversed replicas to the end of queue.
89 /// (No need to move replicas with another priority)
90 std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
91
92 return entry;
93 }
94 }
95 catch (const Poco::Exception & e)
96 {
97 if (e.displayText().find("mysqlxx::Pool is full") != std::string::npos) /// NOTE: String comparison is trashy code.
98 {
99 full_pool = &pool;
100 }
101
102 app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText());
103 continue;
104 }
105
106 app.logger().warning("Connection to " + pool->getDescription() + " failed.");
107 }
108 }
109
110 app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times");
111 }
112
113 if (full_pool)
114 {
115 app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription());
116 return (*full_pool)->Get();
117 }
118
119 std::stringstream message;
120 message << "Connections to all replicas failed: ";
121 for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
122 for (auto jt = it->second.begin(); jt != it->second.end(); ++jt)
123 message << (it == replicas_by_priority.begin() && jt == it->second.begin() ? "" : ", ") << (*jt)->getDescription();
124
125 throw Poco::Exception(message.str());
126}
127