| 1 | #include <Access/QuotaContext.h> | 
|---|
| 2 | #include <Access/QuotaContextFactory.h> | 
|---|
| 3 | #include <Access/AccessControlManager.h> | 
|---|
| 4 | #include <Common/Exception.h> | 
|---|
| 5 | #include <Common/thread_local_rng.h> | 
|---|
| 6 | #include <ext/range.h> | 
|---|
| 7 | #include <boost/range/adaptor/map.hpp> | 
|---|
| 8 | #include <boost/range/algorithm/copy.hpp> | 
|---|
| 9 | #include <boost/range/algorithm/lower_bound.hpp> | 
|---|
| 10 | #include <boost/range/algorithm/stable_sort.hpp> | 
|---|
| 11 | #include <boost/range/algorithm_ext/erase.hpp> | 
|---|
| 12 |  | 
|---|
| 13 |  | 
|---|
| 14 | namespace DB | 
|---|
| 15 | { | 
|---|
| 16 | namespace ErrorCodes | 
|---|
| 17 | { | 
|---|
| 18 | extern const int QUOTA_REQUIRES_CLIENT_KEY; | 
|---|
| 19 | } | 
|---|
| 20 |  | 
|---|
| 21 |  | 
|---|
| 22 | namespace | 
|---|
| 23 | { | 
|---|
| 24 | std::chrono::system_clock::duration randomDuration(std::chrono::seconds max) | 
|---|
| 25 | { | 
|---|
| 26 | auto count = std::chrono::duration_cast<std::chrono::system_clock::duration>(max).count(); | 
|---|
| 27 | std::uniform_int_distribution<Int64> distribution{0, count - 1}; | 
|---|
| 28 | return std::chrono::system_clock::duration(distribution(thread_local_rng)); | 
|---|
| 29 | } | 
|---|
| 30 | } | 
|---|
| 31 |  | 
|---|
| 32 |  | 
|---|
| 33 | void QuotaContextFactory::QuotaInfo::setQuota(const QuotaPtr & quota_, const UUID & quota_id_) | 
|---|
| 34 | { | 
|---|
| 35 | quota = quota_; | 
|---|
| 36 | quota_id = quota_id_; | 
|---|
| 37 |  | 
|---|
| 38 | boost::range::copy(quota->roles, std::inserter(roles, roles.end())); | 
|---|
| 39 | all_roles = quota->all_roles; | 
|---|
| 40 | boost::range::copy(quota->except_roles, std::inserter(except_roles, except_roles.end())); | 
|---|
| 41 |  | 
|---|
| 42 | rebuildAllIntervals(); | 
|---|
| 43 | } | 
|---|
| 44 |  | 
|---|
| 45 |  | 
|---|
| 46 | bool QuotaContextFactory::QuotaInfo::canUseWithContext(const QuotaContext & context) const | 
|---|
| 47 | { | 
|---|
| 48 | if (roles.count(context.user_name)) | 
|---|
| 49 | return true; | 
|---|
| 50 |  | 
|---|
| 51 | if (all_roles && !except_roles.count(context.user_name)) | 
|---|
| 52 | return true; | 
|---|
| 53 |  | 
|---|
| 54 | return false; | 
|---|
| 55 | } | 
|---|
| 56 |  | 
|---|
| 57 |  | 
|---|
| 58 | String QuotaContextFactory::QuotaInfo::calculateKey(const QuotaContext & context) const | 
|---|
| 59 | { | 
|---|
| 60 | using KeyType = Quota::KeyType; | 
|---|
| 61 | switch (quota->key_type) | 
|---|
| 62 | { | 
|---|
| 63 | case KeyType::NONE: | 
|---|
| 64 | return ""; | 
|---|
| 65 | case KeyType::USER_NAME: | 
|---|
| 66 | return context.user_name; | 
|---|
| 67 | case KeyType::IP_ADDRESS: | 
|---|
| 68 | return context.address.toString(); | 
|---|
| 69 | case KeyType::CLIENT_KEY: | 
|---|
| 70 | { | 
|---|
| 71 | if (!context.client_key.empty()) | 
|---|
| 72 | return context.client_key; | 
|---|
| 73 | throw Exception( | 
|---|
| 74 | "Quota "+ quota->getName() + " (for user "+ context.user_name + ") requires a client supplied key.", | 
|---|
| 75 | ErrorCodes::QUOTA_REQUIRES_CLIENT_KEY); | 
|---|
| 76 | } | 
|---|
| 77 | case KeyType::CLIENT_KEY_OR_USER_NAME: | 
|---|
| 78 | { | 
|---|
| 79 | if (!context.client_key.empty()) | 
|---|
| 80 | return context.client_key; | 
|---|
| 81 | return context.user_name; | 
|---|
| 82 | } | 
|---|
| 83 | case KeyType::CLIENT_KEY_OR_IP_ADDRESS: | 
|---|
| 84 | { | 
|---|
| 85 | if (!context.client_key.empty()) | 
|---|
| 86 | return context.client_key; | 
|---|
| 87 | return context.address.toString(); | 
|---|
| 88 | } | 
|---|
| 89 | } | 
|---|
| 90 | __builtin_unreachable(); | 
|---|
| 91 | } | 
|---|
| 92 |  | 
|---|
| 93 |  | 
|---|
| 94 | std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::getOrBuildIntervals(const String & key) | 
|---|
| 95 | { | 
|---|
| 96 | auto it = key_to_intervals.find(key); | 
|---|
| 97 | if (it != key_to_intervals.end()) | 
|---|
| 98 | return it->second; | 
|---|
| 99 | return rebuildIntervals(key); | 
|---|
| 100 | } | 
|---|
| 101 |  | 
|---|
| 102 |  | 
|---|
| 103 | void QuotaContextFactory::QuotaInfo::rebuildAllIntervals() | 
|---|
| 104 | { | 
|---|
| 105 | for (const String & key : key_to_intervals | boost::adaptors::map_keys) | 
|---|
| 106 | rebuildIntervals(key); | 
|---|
| 107 | } | 
|---|
| 108 |  | 
|---|
| 109 |  | 
|---|
| 110 | std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::rebuildIntervals(const String & key) | 
|---|
| 111 | { | 
|---|
| 112 | auto new_intervals = std::make_shared<Intervals>(); | 
|---|
| 113 | new_intervals->quota_name = quota->getName(); | 
|---|
| 114 | new_intervals->quota_id = quota_id; | 
|---|
| 115 | new_intervals->quota_key = key; | 
|---|
| 116 | auto & intervals = new_intervals->intervals; | 
|---|
| 117 | intervals.reserve(quota->all_limits.size()); | 
|---|
| 118 | constexpr size_t MAX_RESOURCE_TYPE = Quota::MAX_RESOURCE_TYPE; | 
|---|
| 119 | for (const auto & limits : quota->all_limits) | 
|---|
| 120 | { | 
|---|
| 121 | intervals.emplace_back(); | 
|---|
| 122 | auto & interval = intervals.back(); | 
|---|
| 123 | interval.duration = limits.duration; | 
|---|
| 124 | std::chrono::system_clock::time_point end_of_interval{}; | 
|---|
| 125 | interval.randomize_interval = limits.randomize_interval; | 
|---|
| 126 | if (limits.randomize_interval) | 
|---|
| 127 | end_of_interval += randomDuration(limits.duration); | 
|---|
| 128 | interval.end_of_interval = end_of_interval.time_since_epoch(); | 
|---|
| 129 | for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) | 
|---|
| 130 | { | 
|---|
| 131 | interval.max[resource_type] = limits.max[resource_type]; | 
|---|
| 132 | interval.used[resource_type] = 0; | 
|---|
| 133 | } | 
|---|
| 134 | } | 
|---|
| 135 |  | 
|---|
| 136 | /// Order intervals by durations from largest to smallest. | 
|---|
| 137 | /// To report first about largest interval on what quota was exceeded. | 
|---|
| 138 | struct GreaterByDuration | 
|---|
| 139 | { | 
|---|
| 140 | bool operator()(const Interval & lhs, const Interval & rhs) const { return lhs.duration > rhs.duration; } | 
|---|
| 141 | }; | 
|---|
| 142 | boost::range::stable_sort(intervals, GreaterByDuration{}); | 
|---|
| 143 |  | 
|---|
| 144 | auto it = key_to_intervals.find(key); | 
|---|
| 145 | if (it == key_to_intervals.end()) | 
|---|
| 146 | { | 
|---|
| 147 | /// Just put new intervals into the map. | 
|---|
| 148 | key_to_intervals.try_emplace(key, new_intervals); | 
|---|
| 149 | } | 
|---|
| 150 | else | 
|---|
| 151 | { | 
|---|
| 152 | /// We need to keep usage information from the old intervals. | 
|---|
| 153 | const auto & old_intervals = it->second->intervals; | 
|---|
| 154 | for (auto & new_interval : new_intervals->intervals) | 
|---|
| 155 | { | 
|---|
| 156 | /// Check if an interval with the same duration is already in use. | 
|---|
| 157 | auto lower_bound = boost::range::lower_bound(old_intervals, new_interval, GreaterByDuration{}); | 
|---|
| 158 | if ((lower_bound == old_intervals.end()) || (lower_bound->duration != new_interval.duration)) | 
|---|
| 159 | continue; | 
|---|
| 160 |  | 
|---|
| 161 | /// Found an interval with the same duration, we need to copy its usage information to `result`. | 
|---|
| 162 | auto & current_interval = *lower_bound; | 
|---|
| 163 | for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) | 
|---|
| 164 | { | 
|---|
| 165 | new_interval.used[resource_type].store(current_interval.used[resource_type].load()); | 
|---|
| 166 | new_interval.end_of_interval.store(current_interval.end_of_interval.load()); | 
|---|
| 167 | } | 
|---|
| 168 | } | 
|---|
| 169 | it->second = new_intervals; | 
|---|
| 170 | } | 
|---|
| 171 |  | 
|---|
| 172 | return new_intervals; | 
|---|
| 173 | } | 
|---|
| 174 |  | 
|---|
| 175 |  | 
|---|
| 176 | QuotaContextFactory::QuotaContextFactory(const AccessControlManager & access_control_manager_) | 
|---|
| 177 | : access_control_manager(access_control_manager_) | 
|---|
| 178 | { | 
|---|
| 179 | } | 
|---|
| 180 |  | 
|---|
| 181 |  | 
|---|
| 182 | QuotaContextFactory::~QuotaContextFactory() | 
|---|
| 183 | { | 
|---|
| 184 | } | 
|---|
| 185 |  | 
|---|
| 186 |  | 
|---|
| 187 | std::shared_ptr<QuotaContext> QuotaContextFactory::createContext(const String & user_name, const Poco::Net::IPAddress & address, const String & client_key) | 
|---|
| 188 | { | 
|---|
| 189 | std::lock_guard lock{mutex}; | 
|---|
| 190 | ensureAllQuotasRead(); | 
|---|
| 191 | auto context = ext::shared_ptr_helper<QuotaContext>::create(user_name, address, client_key); | 
|---|
| 192 | contexts.push_back(context); | 
|---|
| 193 | chooseQuotaForContext(context); | 
|---|
| 194 | return context; | 
|---|
| 195 | } | 
|---|
| 196 |  | 
|---|
| 197 |  | 
|---|
| 198 | void QuotaContextFactory::ensureAllQuotasRead() | 
|---|
| 199 | { | 
|---|
| 200 | /// `mutex` is already locked. | 
|---|
| 201 | if (all_quotas_read) | 
|---|
| 202 | return; | 
|---|
| 203 | all_quotas_read = true; | 
|---|
| 204 |  | 
|---|
| 205 | subscription = access_control_manager.subscribeForChanges<Quota>( | 
|---|
| 206 | [&](const UUID & id, const AccessEntityPtr & entity) | 
|---|
| 207 | { | 
|---|
| 208 | if (entity) | 
|---|
| 209 | quotaAddedOrChanged(id, typeid_cast<QuotaPtr>(entity)); | 
|---|
| 210 | else | 
|---|
| 211 | quotaRemoved(id); | 
|---|
| 212 | }); | 
|---|
| 213 |  | 
|---|
| 214 | for (const UUID & quota_id : access_control_manager.findAll<Quota>()) | 
|---|
| 215 | { | 
|---|
| 216 | auto quota = access_control_manager.tryRead<Quota>(quota_id); | 
|---|
| 217 | if (quota) | 
|---|
| 218 | all_quotas.emplace(quota_id, QuotaInfo(quota, quota_id)); | 
|---|
| 219 | } | 
|---|
| 220 | } | 
|---|
| 221 |  | 
|---|
| 222 |  | 
|---|
| 223 | void QuotaContextFactory::quotaAddedOrChanged(const UUID & quota_id, const std::shared_ptr<const Quota> & new_quota) | 
|---|
| 224 | { | 
|---|
| 225 | std::lock_guard lock{mutex}; | 
|---|
| 226 | auto it = all_quotas.find(quota_id); | 
|---|
| 227 | if (it == all_quotas.end()) | 
|---|
| 228 | { | 
|---|
| 229 | it = all_quotas.emplace(quota_id, QuotaInfo(new_quota, quota_id)).first; | 
|---|
| 230 | } | 
|---|
| 231 | else | 
|---|
| 232 | { | 
|---|
| 233 | if (it->second.quota == new_quota) | 
|---|
| 234 | return; | 
|---|
| 235 | } | 
|---|
| 236 |  | 
|---|
| 237 | auto & info = it->second; | 
|---|
| 238 | info.setQuota(new_quota, quota_id); | 
|---|
| 239 | chooseQuotaForAllContexts(); | 
|---|
| 240 | } | 
|---|
| 241 |  | 
|---|
| 242 |  | 
|---|
| 243 | void QuotaContextFactory::quotaRemoved(const UUID & quota_id) | 
|---|
| 244 | { | 
|---|
| 245 | std::lock_guard lock{mutex}; | 
|---|
| 246 | all_quotas.erase(quota_id); | 
|---|
| 247 | chooseQuotaForAllContexts(); | 
|---|
| 248 | } | 
|---|
| 249 |  | 
|---|
| 250 |  | 
|---|
| 251 | void QuotaContextFactory::chooseQuotaForAllContexts() | 
|---|
| 252 | { | 
|---|
| 253 | /// `mutex` is already locked. | 
|---|
| 254 | boost::range::remove_erase_if( | 
|---|
| 255 | contexts, | 
|---|
| 256 | [&](const std::weak_ptr<QuotaContext> & weak) | 
|---|
| 257 | { | 
|---|
| 258 | auto context = weak.lock(); | 
|---|
| 259 | if (!context) | 
|---|
| 260 | return true; // remove from the `contexts` list. | 
|---|
| 261 | chooseQuotaForContext(context); | 
|---|
| 262 | return false; // keep in the `contexts` list. | 
|---|
| 263 | }); | 
|---|
| 264 | } | 
|---|
| 265 |  | 
|---|
| 266 | void QuotaContextFactory::chooseQuotaForContext(const std::shared_ptr<QuotaContext> & context) | 
|---|
| 267 | { | 
|---|
| 268 | /// `mutex` is already locked. | 
|---|
| 269 | std::shared_ptr<const Intervals> intervals; | 
|---|
| 270 | for (auto & info : all_quotas | boost::adaptors::map_values) | 
|---|
| 271 | { | 
|---|
| 272 | if (info.canUseWithContext(*context)) | 
|---|
| 273 | { | 
|---|
| 274 | String key = info.calculateKey(*context); | 
|---|
| 275 | intervals = info.getOrBuildIntervals(key); | 
|---|
| 276 | break; | 
|---|
| 277 | } | 
|---|
| 278 | } | 
|---|
| 279 |  | 
|---|
| 280 | if (!intervals) | 
|---|
| 281 | intervals = std::make_shared<Intervals>(); /// No quota == no limits. | 
|---|
| 282 |  | 
|---|
| 283 | std::atomic_store(&context->atomic_intervals, intervals); | 
|---|
| 284 | } | 
|---|
| 285 |  | 
|---|
| 286 |  | 
|---|
| 287 | std::vector<QuotaUsageInfo> QuotaContextFactory::getUsageInfo() const | 
|---|
| 288 | { | 
|---|
| 289 | std::lock_guard lock{mutex}; | 
|---|
| 290 | std::vector<QuotaUsageInfo> all_infos; | 
|---|
| 291 | auto current_time = std::chrono::system_clock::now(); | 
|---|
| 292 | for (const auto & info : all_quotas | boost::adaptors::map_values) | 
|---|
| 293 | { | 
|---|
| 294 | for (const auto & intervals : info.key_to_intervals | boost::adaptors::map_values) | 
|---|
| 295 | all_infos.push_back(intervals->getUsageInfo(current_time)); | 
|---|
| 296 | } | 
|---|
| 297 | return all_infos; | 
|---|
| 298 | } | 
|---|
| 299 | } | 
|---|
| 300 |  | 
|---|