1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <algorithm> |
20 | #include <atomic> |
21 | #include <chrono> |
22 | #include <thread> |
23 | |
24 | #include <folly/Likely.h> |
25 | #include <folly/Optional.h> |
26 | #include <folly/concurrency/CacheLocality.h> |
27 | |
28 | namespace folly { |
29 | |
30 | /** |
31 | * Thread-safe (atomic) token bucket implementation. |
32 | * |
33 | * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream |
34 | * of events with an average rate and some amount of burstiness. The canonical |
35 | * example is a packet switched network: the network can accept some number of |
36 | * bytes per second and the bytes come in finite packets (bursts). A token |
37 | * bucket stores up to a fixed number of tokens (the burst size). Some number |
38 | * of tokens are removed when an event occurs. The tokens are replenished at a |
39 | * fixed rate. Failure to allocate tokens implies resource is unavailable and |
40 | * caller needs to implement its own retry mechanism. For simple cases where |
41 | * caller is okay with a FIFO starvation-free scheduling behavior, there are |
42 | * also APIs to 'borrow' from the future effectively assigning a start time to |
43 | * the caller when it should proceed with using the resource. It is also |
44 | * possible to 'return' previously allocated tokens to make them available to |
45 | * other users. Returns in excess of burstSize are considered expired and |
46 | * will not be available to later callers. |
47 | * |
48 | * This implementation records the last time it was updated. This allows the |
49 | * token bucket to add tokens "just in time" when tokens are requested. |
50 | * |
51 | * The "dynamic" base variant allows the token generation rate and maximum |
52 | * burst size to change with every token consumption. |
53 | * |
54 | * @tparam Clock Clock type, must be steady i.e. monotonic. |
55 | */ |
56 | template <typename Clock = std::chrono::steady_clock> |
57 | class BasicDynamicTokenBucket { |
58 | static_assert(Clock::is_steady, "clock must be steady" ); |
59 | |
60 | public: |
61 | /** |
62 | * Constructor. |
63 | * |
64 | * @param zeroTime Initial time at which to consider the token bucket |
65 | * starting to fill. Defaults to 0, so by default token |
66 | * buckets are "full" after construction. |
67 | */ |
68 | explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept |
69 | : zeroTime_(zeroTime) {} |
70 | |
71 | /** |
72 | * Copy constructor. |
73 | * |
74 | * Thread-safe. (Copy constructors of derived classes may not be thread-safe |
75 | * however.) |
76 | */ |
77 | BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept |
78 | : zeroTime_(other.zeroTime_.load()) {} |
79 | |
80 | /** |
81 | * Copy-assignment operator. |
82 | * |
83 | * Warning: not thread safe for the object being assigned to (including |
84 | * self-assignment). Thread-safe for the other object. |
85 | */ |
86 | BasicDynamicTokenBucket& operator=( |
87 | const BasicDynamicTokenBucket& other) noexcept { |
88 | zeroTime_ = other.zeroTime_.load(); |
89 | return *this; |
90 | } |
91 | |
92 | /** |
93 | * Re-initialize token bucket. |
94 | * |
95 | * Thread-safe. |
96 | * |
97 | * @param zeroTime Initial time at which to consider the token bucket |
98 | * starting to fill. Defaults to 0, so by default token |
99 | * bucket is reset to "full". |
100 | */ |
101 | void reset(double zeroTime = 0) noexcept { |
102 | zeroTime_ = zeroTime; |
103 | } |
104 | |
105 | /** |
106 | * Returns the current time in seconds since Epoch. |
107 | */ |
108 | static double defaultClockNow() noexcept { |
109 | using dur = std::chrono::duration<double>; |
110 | auto const now = Clock::now().time_since_epoch(); |
111 | return std::chrono::duration_cast<dur>(now).count(); |
112 | } |
113 | |
114 | /** |
115 | * Attempts to consume some number of tokens. Tokens are first added to the |
116 | * bucket based on the time elapsed since the last attempt to consume tokens. |
117 | * Note: Attempts to consume more tokens than the burst size will always |
118 | * fail. |
119 | * |
120 | * Thread-safe. |
121 | * |
122 | * @param toConsume The number of tokens to consume. |
123 | * @param rate Number of tokens to generate per second. |
124 | * @param burstSize Maximum burst size. Must be greater than 0. |
125 | * @param nowInSeconds Current time in seconds. Should be monotonically |
126 | * increasing from the nowInSeconds specified in |
127 | * this token bucket's constructor. |
128 | * @return True if the rate limit check passed, false otherwise. |
129 | */ |
130 | bool consume( |
131 | double toConsume, |
132 | double rate, |
133 | double burstSize, |
134 | double nowInSeconds = defaultClockNow()) { |
135 | assert(rate > 0); |
136 | assert(burstSize > 0); |
137 | |
138 | if (nowInSeconds <= zeroTime_.load()) { |
139 | return 0; |
140 | } |
141 | |
142 | return consumeImpl( |
143 | rate, burstSize, nowInSeconds, [toConsume](double& tokens) { |
144 | if (tokens < toConsume) { |
145 | return false; |
146 | } |
147 | tokens -= toConsume; |
148 | return true; |
149 | }); |
150 | } |
151 | |
152 | /** |
153 | * Similar to consume, but always consumes some number of tokens. If the |
154 | * bucket contains enough tokens - consumes toConsume tokens. Otherwise the |
155 | * bucket is drained. |
156 | * |
157 | * Thread-safe. |
158 | * |
159 | * @param toConsume The number of tokens to consume. |
160 | * @param rate Number of tokens to generate per second. |
161 | * @param burstSize Maximum burst size. Must be greater than 0. |
162 | * @param nowInSeconds Current time in seconds. Should be monotonically |
163 | * increasing from the nowInSeconds specified in |
164 | * this token bucket's constructor. |
165 | * @return number of tokens that were consumed. |
166 | */ |
167 | double consumeOrDrain( |
168 | double toConsume, |
169 | double rate, |
170 | double burstSize, |
171 | double nowInSeconds = defaultClockNow()) { |
172 | assert(rate > 0); |
173 | assert(burstSize > 0); |
174 | |
175 | if (nowInSeconds <= zeroTime_.load()) { |
176 | return 0; |
177 | } |
178 | |
179 | double consumed; |
180 | consumeImpl( |
181 | rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) { |
182 | if (tokens < toConsume) { |
183 | consumed = tokens; |
184 | tokens = 0.0; |
185 | } else { |
186 | consumed = toConsume; |
187 | tokens -= toConsume; |
188 | } |
189 | return true; |
190 | }); |
191 | return consumed; |
192 | } |
193 | |
194 | /** |
195 | * Return extra tokens back to the bucket. This will move the zeroTime_ |
196 | * value back based on the rate. |
197 | * |
198 | * Thread-safe. |
199 | */ |
200 | void returnTokens(double tokensToReturn, double rate) { |
201 | assert(rate > 0); |
202 | assert(tokensToReturn > 0); |
203 | |
204 | returnTokensImpl(tokensToReturn, rate); |
205 | } |
206 | |
207 | /** |
208 | * Like consumeOrDrain but the call will always satisfy the asked for count. |
209 | * It does so by borrowing tokens from the future (zeroTime_ will move |
210 | * forward) if the currently available count isn't sufficient. |
211 | * |
212 | * Returns a folly::Optional<double>. The optional wont be set if the request |
213 | * cannot be satisfied: only case is when it is larger than burstSize. The |
214 | * value of the optional is a double indicating the time in seconds that the |
215 | * caller needs to wait at which the reservation becomes valid. The caller |
216 | * could simply sleep for the returned duration to smooth out the allocation |
217 | * to match the rate limiter or do some other computation in the meantime. In |
218 | * any case, any regular consume or consumeOrDrain calls will fail to allocate |
219 | * any tokens until the future time is reached. |
220 | * |
221 | * Note: It is assumed the caller will not ask for a very large count nor use |
222 | * it immediately (if not waiting inline) as that would break the burst |
223 | * prevention the limiter is meant to be used for. |
224 | * |
225 | * Thread-safe. |
226 | */ |
227 | Optional<double> consumeWithBorrowNonBlocking( |
228 | double toConsume, |
229 | double rate, |
230 | double burstSize, |
231 | double nowInSeconds = defaultClockNow()) { |
232 | assert(rate > 0); |
233 | assert(burstSize > 0); |
234 | |
235 | if (burstSize < toConsume) { |
236 | return folly::none; |
237 | } |
238 | |
239 | while (toConsume > 0) { |
240 | double consumed = |
241 | consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); |
242 | if (consumed > 0) { |
243 | toConsume -= consumed; |
244 | } else { |
245 | double zeroTimeNew = returnTokensImpl(-toConsume, rate); |
246 | double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); |
247 | return napTime; |
248 | } |
249 | } |
250 | return 0; |
251 | } |
252 | |
253 | /** |
254 | * Convenience wrapper around non-blocking borrow to sleep inline until |
255 | * reservation is valid. |
256 | */ |
257 | bool consumeWithBorrowAndWait( |
258 | double toConsume, |
259 | double rate, |
260 | double burstSize, |
261 | double nowInSeconds = defaultClockNow()) { |
262 | auto res = |
263 | consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); |
264 | if (res.value_or(0) > 0) { |
265 | int64_t napUSec = res.value() * 1000000; |
266 | std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); |
267 | } |
268 | return res.has_value(); |
269 | } |
270 | |
271 | /** |
272 | * Returns the number of tokens currently available. |
273 | * |
274 | * Thread-safe (but returned value may immediately be outdated). |
275 | */ |
276 | double available( |
277 | double rate, |
278 | double burstSize, |
279 | double nowInSeconds = defaultClockNow()) const noexcept { |
280 | assert(rate > 0); |
281 | assert(burstSize > 0); |
282 | |
283 | double zt = this->zeroTime_.load(); |
284 | if (nowInSeconds <= zt) { |
285 | return 0; |
286 | } |
287 | return std::min((nowInSeconds - zt) * rate, burstSize); |
288 | } |
289 | |
290 | private: |
291 | template <typename TCallback> |
292 | bool consumeImpl( |
293 | double rate, |
294 | double burstSize, |
295 | double nowInSeconds, |
296 | const TCallback& callback) { |
297 | auto zeroTimeOld = zeroTime_.load(); |
298 | double zeroTimeNew; |
299 | do { |
300 | auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); |
301 | if (!callback(tokens)) { |
302 | return false; |
303 | } |
304 | zeroTimeNew = nowInSeconds - tokens / rate; |
305 | } while ( |
306 | UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); |
307 | |
308 | return true; |
309 | } |
310 | |
311 | /** |
312 | * Adjust zeroTime based on rate and tokenCount and return the new value of |
313 | * zeroTime_. Note: Token count can be negative to move the zeroTime_ value |
314 | * into the future. |
315 | */ |
316 | double returnTokensImpl(double tokenCount, double rate) { |
317 | auto zeroTimeOld = zeroTime_.load(); |
318 | double zeroTimeNew; |
319 | do { |
320 | zeroTimeNew = zeroTimeOld - tokenCount / rate; |
321 | } while ( |
322 | UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); |
323 | return zeroTimeNew; |
324 | } |
325 | |
326 | alignas(hardware_destructive_interference_size) std::atomic<double> zeroTime_; |
327 | }; |
328 | |
329 | /** |
330 | * Specialization of BasicDynamicTokenBucket with a fixed token |
331 | * generation rate and a fixed maximum burst size. |
332 | */ |
333 | template <typename Clock = std::chrono::steady_clock> |
334 | class BasicTokenBucket { |
335 | static_assert(Clock::is_steady, "clock must be steady" ); |
336 | |
337 | private: |
338 | using Impl = BasicDynamicTokenBucket<Clock>; |
339 | |
340 | public: |
341 | /** |
342 | * Construct a token bucket with a specific maximum rate and burst size. |
343 | * |
344 | * @param genRate Number of tokens to generate per second. |
345 | * @param burstSize Maximum burst size. Must be greater than 0. |
346 | * @param zeroTime Initial time at which to consider the token bucket |
347 | * starting to fill. Defaults to 0, so by default token |
348 | * bucket is "full" after construction. |
349 | */ |
350 | BasicTokenBucket( |
351 | double genRate, |
352 | double burstSize, |
353 | double zeroTime = 0) noexcept |
354 | : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) { |
355 | assert(rate_ > 0); |
356 | assert(burstSize_ > 0); |
357 | } |
358 | |
359 | /** |
360 | * Copy constructor. |
361 | * |
362 | * Warning: not thread safe! |
363 | */ |
364 | BasicTokenBucket(const BasicTokenBucket& other) noexcept = default; |
365 | |
366 | /** |
367 | * Copy-assignment operator. |
368 | * |
369 | * Warning: not thread safe! |
370 | */ |
371 | BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default; |
372 | |
373 | /** |
374 | * Returns the current time in seconds since Epoch. |
375 | */ |
376 | static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) { |
377 | return Impl::defaultClockNow(); |
378 | } |
379 | |
380 | /** |
381 | * Change rate and burst size. |
382 | * |
383 | * Warning: not thread safe! |
384 | * |
385 | * @param genRate Number of tokens to generate per second. |
386 | * @param burstSize Maximum burst size. Must be greater than 0. |
387 | * @param nowInSeconds Current time in seconds. Should be monotonically |
388 | * increasing from the nowInSeconds specified in |
389 | * this token bucket's constructor. |
390 | */ |
391 | void reset( |
392 | double genRate, |
393 | double burstSize, |
394 | double nowInSeconds = defaultClockNow()) noexcept { |
395 | assert(genRate > 0); |
396 | assert(burstSize > 0); |
397 | const double availTokens = available(nowInSeconds); |
398 | rate_ = genRate; |
399 | burstSize_ = burstSize; |
400 | setCapacity(availTokens, nowInSeconds); |
401 | } |
402 | |
403 | /** |
404 | * Change number of tokens in bucket. |
405 | * |
406 | * Warning: not thread safe! |
407 | * |
408 | * @param tokens Desired number of tokens in bucket after the call. |
409 | * @param nowInSeconds Current time in seconds. Should be monotonically |
410 | * increasing from the nowInSeconds specified in |
411 | * this token bucket's constructor. |
412 | */ |
413 | void setCapacity(double tokens, double nowInSeconds) noexcept { |
414 | tokenBucket_.reset(nowInSeconds - tokens / rate_); |
415 | } |
416 | |
417 | /** |
418 | * Attempts to consume some number of tokens. Tokens are first added to the |
419 | * bucket based on the time elapsed since the last attempt to consume tokens. |
420 | * Note: Attempts to consume more tokens than the burst size will always |
421 | * fail. |
422 | * |
423 | * Thread-safe. |
424 | * |
425 | * @param toConsume The number of tokens to consume. |
426 | * @param nowInSeconds Current time in seconds. Should be monotonically |
427 | * increasing from the nowInSeconds specified in |
428 | * this token bucket's constructor. |
429 | * @return True if the rate limit check passed, false otherwise. |
430 | */ |
431 | bool consume(double toConsume, double nowInSeconds = defaultClockNow()) { |
432 | return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); |
433 | } |
434 | |
435 | /** |
436 | * Similar to consume, but always consumes some number of tokens. If the |
437 | * bucket contains enough tokens - consumes toConsume tokens. Otherwise the |
438 | * bucket is drained. |
439 | * |
440 | * Thread-safe. |
441 | * |
442 | * @param toConsume The number of tokens to consume. |
443 | * @param nowInSeconds Current time in seconds. Should be monotonically |
444 | * increasing from the nowInSeconds specified in |
445 | * this token bucket's constructor. |
446 | * @return number of tokens that were consumed. |
447 | */ |
448 | double consumeOrDrain( |
449 | double toConsume, |
450 | double nowInSeconds = defaultClockNow()) { |
451 | return tokenBucket_.consumeOrDrain( |
452 | toConsume, rate_, burstSize_, nowInSeconds); |
453 | } |
454 | |
455 | /** |
456 | * Returns extra token back to the bucket. |
457 | */ |
458 | void returnTokens(double tokensToReturn) { |
459 | return tokenBucket_.returnTokens(tokensToReturn, rate_); |
460 | } |
461 | |
462 | /** |
463 | * Reserve tokens and return time to wait for in order for the reservation to |
464 | * be compatible with the bucket configuration. |
465 | */ |
466 | Optional<double> consumeWithBorrowNonBlocking( |
467 | double toConsume, |
468 | double nowInSeconds = defaultClockNow()) { |
469 | return tokenBucket_.consumeWithBorrowNonBlocking( |
470 | toConsume, rate_, burstSize_, nowInSeconds); |
471 | } |
472 | |
473 | /** |
474 | * Reserve tokens. Blocks if need be until reservation is satisfied. |
475 | */ |
476 | bool consumeWithBorrowAndWait( |
477 | double toConsume, |
478 | double nowInSeconds = defaultClockNow()) { |
479 | return tokenBucket_.consumeWithBorrowAndWait( |
480 | toConsume, rate_, burstSize_, nowInSeconds); |
481 | } |
482 | |
483 | /** |
484 | * Returns the number of tokens currently available. |
485 | * |
486 | * Thread-safe (but returned value may immediately be outdated). |
487 | */ |
488 | double available(double nowInSeconds = defaultClockNow()) const { |
489 | return tokenBucket_.available(rate_, burstSize_, nowInSeconds); |
490 | } |
491 | |
492 | /** |
493 | * Returns the number of tokens generated per second. |
494 | * |
495 | * Thread-safe (but returned value may immediately be outdated). |
496 | */ |
497 | double rate() const noexcept { |
498 | return rate_; |
499 | } |
500 | |
501 | /** |
502 | * Returns the maximum burst size. |
503 | * |
504 | * Thread-safe (but returned value may immediately be outdated). |
505 | */ |
506 | double burst() const noexcept { |
507 | return burstSize_; |
508 | } |
509 | |
510 | private: |
511 | Impl tokenBucket_; |
512 | double rate_; |
513 | double burstSize_; |
514 | }; |
515 | |
516 | using TokenBucket = BasicTokenBucket<>; |
517 | using DynamicTokenBucket = BasicDynamicTokenBucket<>; |
518 | |
519 | } // namespace folly |
520 | |