| 1 | #include <Access/QuotaContext.h> |
| 2 | #include <Access/QuotaContextFactory.h> |
| 3 | #include <Access/AccessControlManager.h> |
| 4 | #include <Common/Exception.h> |
| 5 | #include <Common/thread_local_rng.h> |
| 6 | #include <ext/range.h> |
| 7 | #include <boost/range/adaptor/map.hpp> |
| 8 | #include <boost/range/algorithm/copy.hpp> |
| 9 | #include <boost/range/algorithm/lower_bound.hpp> |
| 10 | #include <boost/range/algorithm/stable_sort.hpp> |
| 11 | #include <boost/range/algorithm_ext/erase.hpp> |
| 12 | |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | namespace ErrorCodes |
| 17 | { |
| 18 | extern const int QUOTA_REQUIRES_CLIENT_KEY; |
| 19 | } |
| 20 | |
| 21 | |
| 22 | namespace |
| 23 | { |
| 24 | std::chrono::system_clock::duration randomDuration(std::chrono::seconds max) |
| 25 | { |
| 26 | auto count = std::chrono::duration_cast<std::chrono::system_clock::duration>(max).count(); |
| 27 | std::uniform_int_distribution<Int64> distribution{0, count - 1}; |
| 28 | return std::chrono::system_clock::duration(distribution(thread_local_rng)); |
| 29 | } |
| 30 | } |
| 31 | |
| 32 | |
| 33 | void QuotaContextFactory::QuotaInfo::setQuota(const QuotaPtr & quota_, const UUID & quota_id_) |
| 34 | { |
| 35 | quota = quota_; |
| 36 | quota_id = quota_id_; |
| 37 | |
| 38 | boost::range::copy(quota->roles, std::inserter(roles, roles.end())); |
| 39 | all_roles = quota->all_roles; |
| 40 | boost::range::copy(quota->except_roles, std::inserter(except_roles, except_roles.end())); |
| 41 | |
| 42 | rebuildAllIntervals(); |
| 43 | } |
| 44 | |
| 45 | |
| 46 | bool QuotaContextFactory::QuotaInfo::canUseWithContext(const QuotaContext & context) const |
| 47 | { |
| 48 | if (roles.count(context.user_name)) |
| 49 | return true; |
| 50 | |
| 51 | if (all_roles && !except_roles.count(context.user_name)) |
| 52 | return true; |
| 53 | |
| 54 | return false; |
| 55 | } |
| 56 | |
| 57 | |
| 58 | String QuotaContextFactory::QuotaInfo::calculateKey(const QuotaContext & context) const |
| 59 | { |
| 60 | using KeyType = Quota::KeyType; |
| 61 | switch (quota->key_type) |
| 62 | { |
| 63 | case KeyType::NONE: |
| 64 | return "" ; |
| 65 | case KeyType::USER_NAME: |
| 66 | return context.user_name; |
| 67 | case KeyType::IP_ADDRESS: |
| 68 | return context.address.toString(); |
| 69 | case KeyType::CLIENT_KEY: |
| 70 | { |
| 71 | if (!context.client_key.empty()) |
| 72 | return context.client_key; |
| 73 | throw Exception( |
| 74 | "Quota " + quota->getName() + " (for user " + context.user_name + ") requires a client supplied key." , |
| 75 | ErrorCodes::QUOTA_REQUIRES_CLIENT_KEY); |
| 76 | } |
| 77 | case KeyType::CLIENT_KEY_OR_USER_NAME: |
| 78 | { |
| 79 | if (!context.client_key.empty()) |
| 80 | return context.client_key; |
| 81 | return context.user_name; |
| 82 | } |
| 83 | case KeyType::CLIENT_KEY_OR_IP_ADDRESS: |
| 84 | { |
| 85 | if (!context.client_key.empty()) |
| 86 | return context.client_key; |
| 87 | return context.address.toString(); |
| 88 | } |
| 89 | } |
| 90 | __builtin_unreachable(); |
| 91 | } |
| 92 | |
| 93 | |
| 94 | std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::getOrBuildIntervals(const String & key) |
| 95 | { |
| 96 | auto it = key_to_intervals.find(key); |
| 97 | if (it != key_to_intervals.end()) |
| 98 | return it->second; |
| 99 | return rebuildIntervals(key); |
| 100 | } |
| 101 | |
| 102 | |
| 103 | void QuotaContextFactory::QuotaInfo::rebuildAllIntervals() |
| 104 | { |
| 105 | for (const String & key : key_to_intervals | boost::adaptors::map_keys) |
| 106 | rebuildIntervals(key); |
| 107 | } |
| 108 | |
| 109 | |
| 110 | std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::rebuildIntervals(const String & key) |
| 111 | { |
| 112 | auto new_intervals = std::make_shared<Intervals>(); |
| 113 | new_intervals->quota_name = quota->getName(); |
| 114 | new_intervals->quota_id = quota_id; |
| 115 | new_intervals->quota_key = key; |
| 116 | auto & intervals = new_intervals->intervals; |
| 117 | intervals.reserve(quota->all_limits.size()); |
| 118 | constexpr size_t MAX_RESOURCE_TYPE = Quota::MAX_RESOURCE_TYPE; |
| 119 | for (const auto & limits : quota->all_limits) |
| 120 | { |
| 121 | intervals.emplace_back(); |
| 122 | auto & interval = intervals.back(); |
| 123 | interval.duration = limits.duration; |
| 124 | std::chrono::system_clock::time_point end_of_interval{}; |
| 125 | interval.randomize_interval = limits.randomize_interval; |
| 126 | if (limits.randomize_interval) |
| 127 | end_of_interval += randomDuration(limits.duration); |
| 128 | interval.end_of_interval = end_of_interval.time_since_epoch(); |
| 129 | for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) |
| 130 | { |
| 131 | interval.max[resource_type] = limits.max[resource_type]; |
| 132 | interval.used[resource_type] = 0; |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | /// Order intervals by durations from largest to smallest. |
| 137 | /// To report first about largest interval on what quota was exceeded. |
| 138 | struct GreaterByDuration |
| 139 | { |
| 140 | bool operator()(const Interval & lhs, const Interval & rhs) const { return lhs.duration > rhs.duration; } |
| 141 | }; |
| 142 | boost::range::stable_sort(intervals, GreaterByDuration{}); |
| 143 | |
| 144 | auto it = key_to_intervals.find(key); |
| 145 | if (it == key_to_intervals.end()) |
| 146 | { |
| 147 | /// Just put new intervals into the map. |
| 148 | key_to_intervals.try_emplace(key, new_intervals); |
| 149 | } |
| 150 | else |
| 151 | { |
| 152 | /// We need to keep usage information from the old intervals. |
| 153 | const auto & old_intervals = it->second->intervals; |
| 154 | for (auto & new_interval : new_intervals->intervals) |
| 155 | { |
| 156 | /// Check if an interval with the same duration is already in use. |
| 157 | auto lower_bound = boost::range::lower_bound(old_intervals, new_interval, GreaterByDuration{}); |
| 158 | if ((lower_bound == old_intervals.end()) || (lower_bound->duration != new_interval.duration)) |
| 159 | continue; |
| 160 | |
| 161 | /// Found an interval with the same duration, we need to copy its usage information to `result`. |
| 162 | auto & current_interval = *lower_bound; |
| 163 | for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) |
| 164 | { |
| 165 | new_interval.used[resource_type].store(current_interval.used[resource_type].load()); |
| 166 | new_interval.end_of_interval.store(current_interval.end_of_interval.load()); |
| 167 | } |
| 168 | } |
| 169 | it->second = new_intervals; |
| 170 | } |
| 171 | |
| 172 | return new_intervals; |
| 173 | } |
| 174 | |
| 175 | |
| 176 | QuotaContextFactory::QuotaContextFactory(const AccessControlManager & access_control_manager_) |
| 177 | : access_control_manager(access_control_manager_) |
| 178 | { |
| 179 | } |
| 180 | |
| 181 | |
| 182 | QuotaContextFactory::~QuotaContextFactory() |
| 183 | { |
| 184 | } |
| 185 | |
| 186 | |
| 187 | std::shared_ptr<QuotaContext> QuotaContextFactory::createContext(const String & user_name, const Poco::Net::IPAddress & address, const String & client_key) |
| 188 | { |
| 189 | std::lock_guard lock{mutex}; |
| 190 | ensureAllQuotasRead(); |
| 191 | auto context = ext::shared_ptr_helper<QuotaContext>::create(user_name, address, client_key); |
| 192 | contexts.push_back(context); |
| 193 | chooseQuotaForContext(context); |
| 194 | return context; |
| 195 | } |
| 196 | |
| 197 | |
| 198 | void QuotaContextFactory::ensureAllQuotasRead() |
| 199 | { |
| 200 | /// `mutex` is already locked. |
| 201 | if (all_quotas_read) |
| 202 | return; |
| 203 | all_quotas_read = true; |
| 204 | |
| 205 | subscription = access_control_manager.subscribeForChanges<Quota>( |
| 206 | [&](const UUID & id, const AccessEntityPtr & entity) |
| 207 | { |
| 208 | if (entity) |
| 209 | quotaAddedOrChanged(id, typeid_cast<QuotaPtr>(entity)); |
| 210 | else |
| 211 | quotaRemoved(id); |
| 212 | }); |
| 213 | |
| 214 | for (const UUID & quota_id : access_control_manager.findAll<Quota>()) |
| 215 | { |
| 216 | auto quota = access_control_manager.tryRead<Quota>(quota_id); |
| 217 | if (quota) |
| 218 | all_quotas.emplace(quota_id, QuotaInfo(quota, quota_id)); |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | |
| 223 | void QuotaContextFactory::quotaAddedOrChanged(const UUID & quota_id, const std::shared_ptr<const Quota> & new_quota) |
| 224 | { |
| 225 | std::lock_guard lock{mutex}; |
| 226 | auto it = all_quotas.find(quota_id); |
| 227 | if (it == all_quotas.end()) |
| 228 | { |
| 229 | it = all_quotas.emplace(quota_id, QuotaInfo(new_quota, quota_id)).first; |
| 230 | } |
| 231 | else |
| 232 | { |
| 233 | if (it->second.quota == new_quota) |
| 234 | return; |
| 235 | } |
| 236 | |
| 237 | auto & info = it->second; |
| 238 | info.setQuota(new_quota, quota_id); |
| 239 | chooseQuotaForAllContexts(); |
| 240 | } |
| 241 | |
| 242 | |
| 243 | void QuotaContextFactory::quotaRemoved(const UUID & quota_id) |
| 244 | { |
| 245 | std::lock_guard lock{mutex}; |
| 246 | all_quotas.erase(quota_id); |
| 247 | chooseQuotaForAllContexts(); |
| 248 | } |
| 249 | |
| 250 | |
| 251 | void QuotaContextFactory::chooseQuotaForAllContexts() |
| 252 | { |
| 253 | /// `mutex` is already locked. |
| 254 | boost::range::remove_erase_if( |
| 255 | contexts, |
| 256 | [&](const std::weak_ptr<QuotaContext> & weak) |
| 257 | { |
| 258 | auto context = weak.lock(); |
| 259 | if (!context) |
| 260 | return true; // remove from the `contexts` list. |
| 261 | chooseQuotaForContext(context); |
| 262 | return false; // keep in the `contexts` list. |
| 263 | }); |
| 264 | } |
| 265 | |
| 266 | void QuotaContextFactory::chooseQuotaForContext(const std::shared_ptr<QuotaContext> & context) |
| 267 | { |
| 268 | /// `mutex` is already locked. |
| 269 | std::shared_ptr<const Intervals> intervals; |
| 270 | for (auto & info : all_quotas | boost::adaptors::map_values) |
| 271 | { |
| 272 | if (info.canUseWithContext(*context)) |
| 273 | { |
| 274 | String key = info.calculateKey(*context); |
| 275 | intervals = info.getOrBuildIntervals(key); |
| 276 | break; |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | if (!intervals) |
| 281 | intervals = std::make_shared<Intervals>(); /// No quota == no limits. |
| 282 | |
| 283 | std::atomic_store(&context->atomic_intervals, intervals); |
| 284 | } |
| 285 | |
| 286 | |
| 287 | std::vector<QuotaUsageInfo> QuotaContextFactory::getUsageInfo() const |
| 288 | { |
| 289 | std::lock_guard lock{mutex}; |
| 290 | std::vector<QuotaUsageInfo> all_infos; |
| 291 | auto current_time = std::chrono::system_clock::now(); |
| 292 | for (const auto & info : all_quotas | boost::adaptors::map_values) |
| 293 | { |
| 294 | for (const auto & intervals : info.key_to_intervals | boost::adaptors::map_values) |
| 295 | all_infos.push_back(intervals->getUsageInfo(current_time)); |
| 296 | } |
| 297 | return all_infos; |
| 298 | } |
| 299 | } |
| 300 | |