1 | #include <Access/QuotaContext.h> |
2 | #include <Access/QuotaContextFactory.h> |
3 | #include <Access/AccessControlManager.h> |
4 | #include <Common/Exception.h> |
5 | #include <Common/thread_local_rng.h> |
6 | #include <ext/range.h> |
7 | #include <boost/range/adaptor/map.hpp> |
8 | #include <boost/range/algorithm/copy.hpp> |
9 | #include <boost/range/algorithm/lower_bound.hpp> |
10 | #include <boost/range/algorithm/stable_sort.hpp> |
11 | #include <boost/range/algorithm_ext/erase.hpp> |
12 | |
13 | |
14 | namespace DB |
15 | { |
16 | namespace ErrorCodes |
17 | { |
18 | extern const int QUOTA_REQUIRES_CLIENT_KEY; |
19 | } |
20 | |
21 | |
22 | namespace |
23 | { |
24 | std::chrono::system_clock::duration randomDuration(std::chrono::seconds max) |
25 | { |
26 | auto count = std::chrono::duration_cast<std::chrono::system_clock::duration>(max).count(); |
27 | std::uniform_int_distribution<Int64> distribution{0, count - 1}; |
28 | return std::chrono::system_clock::duration(distribution(thread_local_rng)); |
29 | } |
30 | } |
31 | |
32 | |
33 | void QuotaContextFactory::QuotaInfo::setQuota(const QuotaPtr & quota_, const UUID & quota_id_) |
34 | { |
35 | quota = quota_; |
36 | quota_id = quota_id_; |
37 | |
38 | boost::range::copy(quota->roles, std::inserter(roles, roles.end())); |
39 | all_roles = quota->all_roles; |
40 | boost::range::copy(quota->except_roles, std::inserter(except_roles, except_roles.end())); |
41 | |
42 | rebuildAllIntervals(); |
43 | } |
44 | |
45 | |
46 | bool QuotaContextFactory::QuotaInfo::canUseWithContext(const QuotaContext & context) const |
47 | { |
48 | if (roles.count(context.user_name)) |
49 | return true; |
50 | |
51 | if (all_roles && !except_roles.count(context.user_name)) |
52 | return true; |
53 | |
54 | return false; |
55 | } |
56 | |
57 | |
58 | String QuotaContextFactory::QuotaInfo::calculateKey(const QuotaContext & context) const |
59 | { |
60 | using KeyType = Quota::KeyType; |
61 | switch (quota->key_type) |
62 | { |
63 | case KeyType::NONE: |
64 | return "" ; |
65 | case KeyType::USER_NAME: |
66 | return context.user_name; |
67 | case KeyType::IP_ADDRESS: |
68 | return context.address.toString(); |
69 | case KeyType::CLIENT_KEY: |
70 | { |
71 | if (!context.client_key.empty()) |
72 | return context.client_key; |
73 | throw Exception( |
74 | "Quota " + quota->getName() + " (for user " + context.user_name + ") requires a client supplied key." , |
75 | ErrorCodes::QUOTA_REQUIRES_CLIENT_KEY); |
76 | } |
77 | case KeyType::CLIENT_KEY_OR_USER_NAME: |
78 | { |
79 | if (!context.client_key.empty()) |
80 | return context.client_key; |
81 | return context.user_name; |
82 | } |
83 | case KeyType::CLIENT_KEY_OR_IP_ADDRESS: |
84 | { |
85 | if (!context.client_key.empty()) |
86 | return context.client_key; |
87 | return context.address.toString(); |
88 | } |
89 | } |
90 | __builtin_unreachable(); |
91 | } |
92 | |
93 | |
94 | std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::getOrBuildIntervals(const String & key) |
95 | { |
96 | auto it = key_to_intervals.find(key); |
97 | if (it != key_to_intervals.end()) |
98 | return it->second; |
99 | return rebuildIntervals(key); |
100 | } |
101 | |
102 | |
103 | void QuotaContextFactory::QuotaInfo::rebuildAllIntervals() |
104 | { |
105 | for (const String & key : key_to_intervals | boost::adaptors::map_keys) |
106 | rebuildIntervals(key); |
107 | } |
108 | |
109 | |
110 | std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::rebuildIntervals(const String & key) |
111 | { |
112 | auto new_intervals = std::make_shared<Intervals>(); |
113 | new_intervals->quota_name = quota->getName(); |
114 | new_intervals->quota_id = quota_id; |
115 | new_intervals->quota_key = key; |
116 | auto & intervals = new_intervals->intervals; |
117 | intervals.reserve(quota->all_limits.size()); |
118 | constexpr size_t MAX_RESOURCE_TYPE = Quota::MAX_RESOURCE_TYPE; |
119 | for (const auto & limits : quota->all_limits) |
120 | { |
121 | intervals.emplace_back(); |
122 | auto & interval = intervals.back(); |
123 | interval.duration = limits.duration; |
124 | std::chrono::system_clock::time_point end_of_interval{}; |
125 | interval.randomize_interval = limits.randomize_interval; |
126 | if (limits.randomize_interval) |
127 | end_of_interval += randomDuration(limits.duration); |
128 | interval.end_of_interval = end_of_interval.time_since_epoch(); |
129 | for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) |
130 | { |
131 | interval.max[resource_type] = limits.max[resource_type]; |
132 | interval.used[resource_type] = 0; |
133 | } |
134 | } |
135 | |
136 | /// Order intervals by durations from largest to smallest. |
137 | /// To report first about largest interval on what quota was exceeded. |
138 | struct GreaterByDuration |
139 | { |
140 | bool operator()(const Interval & lhs, const Interval & rhs) const { return lhs.duration > rhs.duration; } |
141 | }; |
142 | boost::range::stable_sort(intervals, GreaterByDuration{}); |
143 | |
144 | auto it = key_to_intervals.find(key); |
145 | if (it == key_to_intervals.end()) |
146 | { |
147 | /// Just put new intervals into the map. |
148 | key_to_intervals.try_emplace(key, new_intervals); |
149 | } |
150 | else |
151 | { |
152 | /// We need to keep usage information from the old intervals. |
153 | const auto & old_intervals = it->second->intervals; |
154 | for (auto & new_interval : new_intervals->intervals) |
155 | { |
156 | /// Check if an interval with the same duration is already in use. |
157 | auto lower_bound = boost::range::lower_bound(old_intervals, new_interval, GreaterByDuration{}); |
158 | if ((lower_bound == old_intervals.end()) || (lower_bound->duration != new_interval.duration)) |
159 | continue; |
160 | |
161 | /// Found an interval with the same duration, we need to copy its usage information to `result`. |
162 | auto & current_interval = *lower_bound; |
163 | for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) |
164 | { |
165 | new_interval.used[resource_type].store(current_interval.used[resource_type].load()); |
166 | new_interval.end_of_interval.store(current_interval.end_of_interval.load()); |
167 | } |
168 | } |
169 | it->second = new_intervals; |
170 | } |
171 | |
172 | return new_intervals; |
173 | } |
174 | |
175 | |
176 | QuotaContextFactory::QuotaContextFactory(const AccessControlManager & access_control_manager_) |
177 | : access_control_manager(access_control_manager_) |
178 | { |
179 | } |
180 | |
181 | |
182 | QuotaContextFactory::~QuotaContextFactory() |
183 | { |
184 | } |
185 | |
186 | |
187 | std::shared_ptr<QuotaContext> QuotaContextFactory::createContext(const String & user_name, const Poco::Net::IPAddress & address, const String & client_key) |
188 | { |
189 | std::lock_guard lock{mutex}; |
190 | ensureAllQuotasRead(); |
191 | auto context = ext::shared_ptr_helper<QuotaContext>::create(user_name, address, client_key); |
192 | contexts.push_back(context); |
193 | chooseQuotaForContext(context); |
194 | return context; |
195 | } |
196 | |
197 | |
198 | void QuotaContextFactory::ensureAllQuotasRead() |
199 | { |
200 | /// `mutex` is already locked. |
201 | if (all_quotas_read) |
202 | return; |
203 | all_quotas_read = true; |
204 | |
205 | subscription = access_control_manager.subscribeForChanges<Quota>( |
206 | [&](const UUID & id, const AccessEntityPtr & entity) |
207 | { |
208 | if (entity) |
209 | quotaAddedOrChanged(id, typeid_cast<QuotaPtr>(entity)); |
210 | else |
211 | quotaRemoved(id); |
212 | }); |
213 | |
214 | for (const UUID & quota_id : access_control_manager.findAll<Quota>()) |
215 | { |
216 | auto quota = access_control_manager.tryRead<Quota>(quota_id); |
217 | if (quota) |
218 | all_quotas.emplace(quota_id, QuotaInfo(quota, quota_id)); |
219 | } |
220 | } |
221 | |
222 | |
223 | void QuotaContextFactory::quotaAddedOrChanged(const UUID & quota_id, const std::shared_ptr<const Quota> & new_quota) |
224 | { |
225 | std::lock_guard lock{mutex}; |
226 | auto it = all_quotas.find(quota_id); |
227 | if (it == all_quotas.end()) |
228 | { |
229 | it = all_quotas.emplace(quota_id, QuotaInfo(new_quota, quota_id)).first; |
230 | } |
231 | else |
232 | { |
233 | if (it->second.quota == new_quota) |
234 | return; |
235 | } |
236 | |
237 | auto & info = it->second; |
238 | info.setQuota(new_quota, quota_id); |
239 | chooseQuotaForAllContexts(); |
240 | } |
241 | |
242 | |
243 | void QuotaContextFactory::quotaRemoved(const UUID & quota_id) |
244 | { |
245 | std::lock_guard lock{mutex}; |
246 | all_quotas.erase(quota_id); |
247 | chooseQuotaForAllContexts(); |
248 | } |
249 | |
250 | |
251 | void QuotaContextFactory::chooseQuotaForAllContexts() |
252 | { |
253 | /// `mutex` is already locked. |
254 | boost::range::remove_erase_if( |
255 | contexts, |
256 | [&](const std::weak_ptr<QuotaContext> & weak) |
257 | { |
258 | auto context = weak.lock(); |
259 | if (!context) |
260 | return true; // remove from the `contexts` list. |
261 | chooseQuotaForContext(context); |
262 | return false; // keep in the `contexts` list. |
263 | }); |
264 | } |
265 | |
266 | void QuotaContextFactory::chooseQuotaForContext(const std::shared_ptr<QuotaContext> & context) |
267 | { |
268 | /// `mutex` is already locked. |
269 | std::shared_ptr<const Intervals> intervals; |
270 | for (auto & info : all_quotas | boost::adaptors::map_values) |
271 | { |
272 | if (info.canUseWithContext(*context)) |
273 | { |
274 | String key = info.calculateKey(*context); |
275 | intervals = info.getOrBuildIntervals(key); |
276 | break; |
277 | } |
278 | } |
279 | |
280 | if (!intervals) |
281 | intervals = std::make_shared<Intervals>(); /// No quota == no limits. |
282 | |
283 | std::atomic_store(&context->atomic_intervals, intervals); |
284 | } |
285 | |
286 | |
287 | std::vector<QuotaUsageInfo> QuotaContextFactory::getUsageInfo() const |
288 | { |
289 | std::lock_guard lock{mutex}; |
290 | std::vector<QuotaUsageInfo> all_infos; |
291 | auto current_time = std::chrono::system_clock::now(); |
292 | for (const auto & info : all_quotas | boost::adaptors::map_values) |
293 | { |
294 | for (const auto & intervals : info.key_to_intervals | boost::adaptors::map_values) |
295 | all_infos.push_back(intervals->getUsageInfo(current_time)); |
296 | } |
297 | return all_infos; |
298 | } |
299 | } |
300 | |