| 1 | #include <mysqlxx/PoolWithFailover.h> |
| 2 | |
| 3 | |
| 4 | /// Duplicate of code from StringUtils.h. Copied here for less dependencies. |
| 5 | static bool startsWith(const std::string & s, const char * prefix) |
| 6 | { |
| 7 | return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix)); |
| 8 | } |
| 9 | |
| 10 | |
| 11 | using namespace mysqlxx; |
| 12 | |
| 13 | PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg, |
| 14 | const std::string & config_name, const unsigned default_connections, |
| 15 | const unsigned max_connections, const size_t max_tries) |
| 16 | : max_tries(max_tries) |
| 17 | { |
| 18 | if (cfg.has(config_name + ".replica" )) |
| 19 | { |
| 20 | Poco::Util::AbstractConfiguration::Keys replica_keys; |
| 21 | cfg.keys(config_name, replica_keys); |
| 22 | for (const auto & replica_config_key : replica_keys) |
| 23 | { |
| 24 | /// There could be another elements in the same level in configuration file, like "password", "port"... |
| 25 | if (startsWith(replica_config_key, "replica" )) |
| 26 | { |
| 27 | std::string replica_name = config_name + "." + replica_config_key; |
| 28 | |
| 29 | int priority = cfg.getInt(replica_name + ".priority" , 0); |
| 30 | |
| 31 | replicas_by_priority[priority].emplace_back( |
| 32 | std::make_shared<Pool>(cfg, replica_name, default_connections, max_connections, config_name.c_str())); |
| 33 | } |
| 34 | } |
| 35 | } |
| 36 | else |
| 37 | { |
| 38 | replicas_by_priority[0].emplace_back( |
| 39 | std::make_shared<Pool>(cfg, config_name, default_connections, max_connections)); |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsigned default_connections, |
| 44 | const unsigned max_connections, const size_t max_tries) |
| 45 | : PoolWithFailover{ |
| 46 | Poco::Util::Application::instance().config(), config_name, |
| 47 | default_connections, max_connections, max_tries} |
| 48 | {} |
| 49 | |
| 50 | PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) |
| 51 | : max_tries{other.max_tries} |
| 52 | { |
| 53 | for (const auto & priority_replicas : other.replicas_by_priority) |
| 54 | { |
| 55 | Replicas replicas; |
| 56 | replicas.reserve(priority_replicas.second.size()); |
| 57 | for (const auto & pool : priority_replicas.second) |
| 58 | replicas.emplace_back(std::make_shared<Pool>(*pool)); |
| 59 | replicas_by_priority.emplace(priority_replicas.first, std::move(replicas)); |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | PoolWithFailover::Entry PoolWithFailover::Get() |
| 64 | { |
| 65 | Poco::Util::Application & app = Poco::Util::Application::instance(); |
| 66 | std::lock_guard<std::mutex> locker(mutex); |
| 67 | |
| 68 | /// If we cannot connect to some replica due to pool overflow, than we will wait and connect. |
| 69 | PoolPtr * full_pool = nullptr; |
| 70 | |
| 71 | for (size_t try_no = 0; try_no < max_tries; ++try_no) |
| 72 | { |
| 73 | full_pool = nullptr; |
| 74 | |
| 75 | for (auto & priority_replicas : replicas_by_priority) |
| 76 | { |
| 77 | Replicas & replicas = priority_replicas.second; |
| 78 | for (size_t i = 0, size = replicas.size(); i < size; ++i) |
| 79 | { |
| 80 | PoolPtr & pool = replicas[i]; |
| 81 | |
| 82 | try |
| 83 | { |
| 84 | Entry entry = pool->tryGet(); |
| 85 | |
| 86 | if (!entry.isNull()) |
| 87 | { |
| 88 | /// Move all traversed replicas to the end of queue. |
| 89 | /// (No need to move replicas with another priority) |
| 90 | std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end()); |
| 91 | |
| 92 | return entry; |
| 93 | } |
| 94 | } |
| 95 | catch (const Poco::Exception & e) |
| 96 | { |
| 97 | if (e.displayText().find("mysqlxx::Pool is full" ) != std::string::npos) /// NOTE: String comparison is trashy code. |
| 98 | { |
| 99 | full_pool = &pool; |
| 100 | } |
| 101 | |
| 102 | app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText()); |
| 103 | continue; |
| 104 | } |
| 105 | |
| 106 | app.logger().warning("Connection to " + pool->getDescription() + " failed." ); |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times" ); |
| 111 | } |
| 112 | |
| 113 | if (full_pool) |
| 114 | { |
| 115 | app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription()); |
| 116 | return (*full_pool)->Get(); |
| 117 | } |
| 118 | |
| 119 | std::stringstream message; |
| 120 | message << "Connections to all replicas failed: " ; |
| 121 | for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it) |
| 122 | for (auto jt = it->second.begin(); jt != it->second.end(); ++jt) |
| 123 | message << (it == replicas_by_priority.begin() && jt == it->second.begin() ? "" : ", " ) << (*jt)->getDescription(); |
| 124 | |
| 125 | throw Poco::Exception(message.str()); |
| 126 | } |
| 127 | |