1#include <Access/QuotaContext.h>
2#include <Access/QuotaContextFactory.h>
3#include <Access/AccessControlManager.h>
4#include <Common/Exception.h>
5#include <Common/thread_local_rng.h>
6#include <ext/range.h>
7#include <boost/range/adaptor/map.hpp>
8#include <boost/range/algorithm/copy.hpp>
9#include <boost/range/algorithm/lower_bound.hpp>
10#include <boost/range/algorithm/stable_sort.hpp>
11#include <boost/range/algorithm_ext/erase.hpp>
12
13
14namespace DB
15{
16namespace ErrorCodes
17{
18 extern const int QUOTA_REQUIRES_CLIENT_KEY;
19}
20
21
22namespace
23{
24 std::chrono::system_clock::duration randomDuration(std::chrono::seconds max)
25 {
26 auto count = std::chrono::duration_cast<std::chrono::system_clock::duration>(max).count();
27 std::uniform_int_distribution<Int64> distribution{0, count - 1};
28 return std::chrono::system_clock::duration(distribution(thread_local_rng));
29 }
30}
31
32
33void QuotaContextFactory::QuotaInfo::setQuota(const QuotaPtr & quota_, const UUID & quota_id_)
34{
35 quota = quota_;
36 quota_id = quota_id_;
37
38 boost::range::copy(quota->roles, std::inserter(roles, roles.end()));
39 all_roles = quota->all_roles;
40 boost::range::copy(quota->except_roles, std::inserter(except_roles, except_roles.end()));
41
42 rebuildAllIntervals();
43}
44
45
46bool QuotaContextFactory::QuotaInfo::canUseWithContext(const QuotaContext & context) const
47{
48 if (roles.count(context.user_name))
49 return true;
50
51 if (all_roles && !except_roles.count(context.user_name))
52 return true;
53
54 return false;
55}
56
57
58String QuotaContextFactory::QuotaInfo::calculateKey(const QuotaContext & context) const
59{
60 using KeyType = Quota::KeyType;
61 switch (quota->key_type)
62 {
63 case KeyType::NONE:
64 return "";
65 case KeyType::USER_NAME:
66 return context.user_name;
67 case KeyType::IP_ADDRESS:
68 return context.address.toString();
69 case KeyType::CLIENT_KEY:
70 {
71 if (!context.client_key.empty())
72 return context.client_key;
73 throw Exception(
74 "Quota " + quota->getName() + " (for user " + context.user_name + ") requires a client supplied key.",
75 ErrorCodes::QUOTA_REQUIRES_CLIENT_KEY);
76 }
77 case KeyType::CLIENT_KEY_OR_USER_NAME:
78 {
79 if (!context.client_key.empty())
80 return context.client_key;
81 return context.user_name;
82 }
83 case KeyType::CLIENT_KEY_OR_IP_ADDRESS:
84 {
85 if (!context.client_key.empty())
86 return context.client_key;
87 return context.address.toString();
88 }
89 }
90 __builtin_unreachable();
91}
92
93
94std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::getOrBuildIntervals(const String & key)
95{
96 auto it = key_to_intervals.find(key);
97 if (it != key_to_intervals.end())
98 return it->second;
99 return rebuildIntervals(key);
100}
101
102
103void QuotaContextFactory::QuotaInfo::rebuildAllIntervals()
104{
105 for (const String & key : key_to_intervals | boost::adaptors::map_keys)
106 rebuildIntervals(key);
107}
108
109
110std::shared_ptr<const QuotaContext::Intervals> QuotaContextFactory::QuotaInfo::rebuildIntervals(const String & key)
111{
112 auto new_intervals = std::make_shared<Intervals>();
113 new_intervals->quota_name = quota->getName();
114 new_intervals->quota_id = quota_id;
115 new_intervals->quota_key = key;
116 auto & intervals = new_intervals->intervals;
117 intervals.reserve(quota->all_limits.size());
118 constexpr size_t MAX_RESOURCE_TYPE = Quota::MAX_RESOURCE_TYPE;
119 for (const auto & limits : quota->all_limits)
120 {
121 intervals.emplace_back();
122 auto & interval = intervals.back();
123 interval.duration = limits.duration;
124 std::chrono::system_clock::time_point end_of_interval{};
125 interval.randomize_interval = limits.randomize_interval;
126 if (limits.randomize_interval)
127 end_of_interval += randomDuration(limits.duration);
128 interval.end_of_interval = end_of_interval.time_since_epoch();
129 for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
130 {
131 interval.max[resource_type] = limits.max[resource_type];
132 interval.used[resource_type] = 0;
133 }
134 }
135
136 /// Order intervals by durations from largest to smallest.
137 /// To report first about largest interval on what quota was exceeded.
138 struct GreaterByDuration
139 {
140 bool operator()(const Interval & lhs, const Interval & rhs) const { return lhs.duration > rhs.duration; }
141 };
142 boost::range::stable_sort(intervals, GreaterByDuration{});
143
144 auto it = key_to_intervals.find(key);
145 if (it == key_to_intervals.end())
146 {
147 /// Just put new intervals into the map.
148 key_to_intervals.try_emplace(key, new_intervals);
149 }
150 else
151 {
152 /// We need to keep usage information from the old intervals.
153 const auto & old_intervals = it->second->intervals;
154 for (auto & new_interval : new_intervals->intervals)
155 {
156 /// Check if an interval with the same duration is already in use.
157 auto lower_bound = boost::range::lower_bound(old_intervals, new_interval, GreaterByDuration{});
158 if ((lower_bound == old_intervals.end()) || (lower_bound->duration != new_interval.duration))
159 continue;
160
161 /// Found an interval with the same duration, we need to copy its usage information to `result`.
162 auto & current_interval = *lower_bound;
163 for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
164 {
165 new_interval.used[resource_type].store(current_interval.used[resource_type].load());
166 new_interval.end_of_interval.store(current_interval.end_of_interval.load());
167 }
168 }
169 it->second = new_intervals;
170 }
171
172 return new_intervals;
173}
174
175
176QuotaContextFactory::QuotaContextFactory(const AccessControlManager & access_control_manager_)
177 : access_control_manager(access_control_manager_)
178{
179}
180
181
182QuotaContextFactory::~QuotaContextFactory()
183{
184}
185
186
187std::shared_ptr<QuotaContext> QuotaContextFactory::createContext(const String & user_name, const Poco::Net::IPAddress & address, const String & client_key)
188{
189 std::lock_guard lock{mutex};
190 ensureAllQuotasRead();
191 auto context = ext::shared_ptr_helper<QuotaContext>::create(user_name, address, client_key);
192 contexts.push_back(context);
193 chooseQuotaForContext(context);
194 return context;
195}
196
197
198void QuotaContextFactory::ensureAllQuotasRead()
199{
200 /// `mutex` is already locked.
201 if (all_quotas_read)
202 return;
203 all_quotas_read = true;
204
205 subscription = access_control_manager.subscribeForChanges<Quota>(
206 [&](const UUID & id, const AccessEntityPtr & entity)
207 {
208 if (entity)
209 quotaAddedOrChanged(id, typeid_cast<QuotaPtr>(entity));
210 else
211 quotaRemoved(id);
212 });
213
214 for (const UUID & quota_id : access_control_manager.findAll<Quota>())
215 {
216 auto quota = access_control_manager.tryRead<Quota>(quota_id);
217 if (quota)
218 all_quotas.emplace(quota_id, QuotaInfo(quota, quota_id));
219 }
220}
221
222
223void QuotaContextFactory::quotaAddedOrChanged(const UUID & quota_id, const std::shared_ptr<const Quota> & new_quota)
224{
225 std::lock_guard lock{mutex};
226 auto it = all_quotas.find(quota_id);
227 if (it == all_quotas.end())
228 {
229 it = all_quotas.emplace(quota_id, QuotaInfo(new_quota, quota_id)).first;
230 }
231 else
232 {
233 if (it->second.quota == new_quota)
234 return;
235 }
236
237 auto & info = it->second;
238 info.setQuota(new_quota, quota_id);
239 chooseQuotaForAllContexts();
240}
241
242
243void QuotaContextFactory::quotaRemoved(const UUID & quota_id)
244{
245 std::lock_guard lock{mutex};
246 all_quotas.erase(quota_id);
247 chooseQuotaForAllContexts();
248}
249
250
251void QuotaContextFactory::chooseQuotaForAllContexts()
252{
253 /// `mutex` is already locked.
254 boost::range::remove_erase_if(
255 contexts,
256 [&](const std::weak_ptr<QuotaContext> & weak)
257 {
258 auto context = weak.lock();
259 if (!context)
260 return true; // remove from the `contexts` list.
261 chooseQuotaForContext(context);
262 return false; // keep in the `contexts` list.
263 });
264}
265
266void QuotaContextFactory::chooseQuotaForContext(const std::shared_ptr<QuotaContext> & context)
267{
268 /// `mutex` is already locked.
269 std::shared_ptr<const Intervals> intervals;
270 for (auto & info : all_quotas | boost::adaptors::map_values)
271 {
272 if (info.canUseWithContext(*context))
273 {
274 String key = info.calculateKey(*context);
275 intervals = info.getOrBuildIntervals(key);
276 break;
277 }
278 }
279
280 if (!intervals)
281 intervals = std::make_shared<Intervals>(); /// No quota == no limits.
282
283 std::atomic_store(&context->atomic_intervals, intervals);
284}
285
286
287std::vector<QuotaUsageInfo> QuotaContextFactory::getUsageInfo() const
288{
289 std::lock_guard lock{mutex};
290 std::vector<QuotaUsageInfo> all_infos;
291 auto current_time = std::chrono::system_clock::now();
292 for (const auto & info : all_quotas | boost::adaptors::map_values)
293 {
294 for (const auto & intervals : info.key_to_intervals | boost::adaptors::map_values)
295 all_infos.push_back(intervals->getUsageInfo(current_time));
296 }
297 return all_infos;
298}
299}
300