1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <algorithm>
20#include <atomic>
21#include <chrono>
22#include <thread>
23
24#include <folly/Likely.h>
25#include <folly/Optional.h>
26#include <folly/concurrency/CacheLocality.h>
27
28namespace folly {
29
30/**
31 * Thread-safe (atomic) token bucket implementation.
32 *
33 * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
34 * of events with an average rate and some amount of burstiness. The canonical
35 * example is a packet switched network: the network can accept some number of
36 * bytes per second and the bytes come in finite packets (bursts). A token
37 * bucket stores up to a fixed number of tokens (the burst size). Some number
38 * of tokens are removed when an event occurs. The tokens are replenished at a
39 * fixed rate. Failure to allocate tokens implies resource is unavailable and
40 * caller needs to implement its own retry mechanism. For simple cases where
41 * caller is okay with a FIFO starvation-free scheduling behavior, there are
42 * also APIs to 'borrow' from the future effectively assigning a start time to
43 * the caller when it should proceed with using the resource. It is also
44 * possible to 'return' previously allocated tokens to make them available to
45 * other users. Returns in excess of burstSize are considered expired and
46 * will not be available to later callers.
47 *
48 * This implementation records the last time it was updated. This allows the
49 * token bucket to add tokens "just in time" when tokens are requested.
50 *
51 * The "dynamic" base variant allows the token generation rate and maximum
52 * burst size to change with every token consumption.
53 *
54 * @tparam Clock Clock type, must be steady i.e. monotonic.
55 */
56template <typename Clock = std::chrono::steady_clock>
57class BasicDynamicTokenBucket {
58 static_assert(Clock::is_steady, "clock must be steady");
59
60 public:
61 /**
62 * Constructor.
63 *
64 * @param zeroTime Initial time at which to consider the token bucket
65 * starting to fill. Defaults to 0, so by default token
66 * buckets are "full" after construction.
67 */
68 explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept
69 : zeroTime_(zeroTime) {}
70
71 /**
72 * Copy constructor.
73 *
74 * Thread-safe. (Copy constructors of derived classes may not be thread-safe
75 * however.)
76 */
77 BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept
78 : zeroTime_(other.zeroTime_.load()) {}
79
80 /**
81 * Copy-assignment operator.
82 *
83 * Warning: not thread safe for the object being assigned to (including
84 * self-assignment). Thread-safe for the other object.
85 */
86 BasicDynamicTokenBucket& operator=(
87 const BasicDynamicTokenBucket& other) noexcept {
88 zeroTime_ = other.zeroTime_.load();
89 return *this;
90 }
91
92 /**
93 * Re-initialize token bucket.
94 *
95 * Thread-safe.
96 *
97 * @param zeroTime Initial time at which to consider the token bucket
98 * starting to fill. Defaults to 0, so by default token
99 * bucket is reset to "full".
100 */
101 void reset(double zeroTime = 0) noexcept {
102 zeroTime_ = zeroTime;
103 }
104
105 /**
106 * Returns the current time in seconds since Epoch.
107 */
108 static double defaultClockNow() noexcept {
109 using dur = std::chrono::duration<double>;
110 auto const now = Clock::now().time_since_epoch();
111 return std::chrono::duration_cast<dur>(now).count();
112 }
113
114 /**
115 * Attempts to consume some number of tokens. Tokens are first added to the
116 * bucket based on the time elapsed since the last attempt to consume tokens.
117 * Note: Attempts to consume more tokens than the burst size will always
118 * fail.
119 *
120 * Thread-safe.
121 *
122 * @param toConsume The number of tokens to consume.
123 * @param rate Number of tokens to generate per second.
124 * @param burstSize Maximum burst size. Must be greater than 0.
125 * @param nowInSeconds Current time in seconds. Should be monotonically
126 * increasing from the nowInSeconds specified in
127 * this token bucket's constructor.
128 * @return True if the rate limit check passed, false otherwise.
129 */
130 bool consume(
131 double toConsume,
132 double rate,
133 double burstSize,
134 double nowInSeconds = defaultClockNow()) {
135 assert(rate > 0);
136 assert(burstSize > 0);
137
138 if (nowInSeconds <= zeroTime_.load()) {
139 return 0;
140 }
141
142 return consumeImpl(
143 rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
144 if (tokens < toConsume) {
145 return false;
146 }
147 tokens -= toConsume;
148 return true;
149 });
150 }
151
152 /**
153 * Similar to consume, but always consumes some number of tokens. If the
154 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
155 * bucket is drained.
156 *
157 * Thread-safe.
158 *
159 * @param toConsume The number of tokens to consume.
160 * @param rate Number of tokens to generate per second.
161 * @param burstSize Maximum burst size. Must be greater than 0.
162 * @param nowInSeconds Current time in seconds. Should be monotonically
163 * increasing from the nowInSeconds specified in
164 * this token bucket's constructor.
165 * @return number of tokens that were consumed.
166 */
167 double consumeOrDrain(
168 double toConsume,
169 double rate,
170 double burstSize,
171 double nowInSeconds = defaultClockNow()) {
172 assert(rate > 0);
173 assert(burstSize > 0);
174
175 if (nowInSeconds <= zeroTime_.load()) {
176 return 0;
177 }
178
179 double consumed;
180 consumeImpl(
181 rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
182 if (tokens < toConsume) {
183 consumed = tokens;
184 tokens = 0.0;
185 } else {
186 consumed = toConsume;
187 tokens -= toConsume;
188 }
189 return true;
190 });
191 return consumed;
192 }
193
194 /**
195 * Return extra tokens back to the bucket. This will move the zeroTime_
196 * value back based on the rate.
197 *
198 * Thread-safe.
199 */
200 void returnTokens(double tokensToReturn, double rate) {
201 assert(rate > 0);
202 assert(tokensToReturn > 0);
203
204 returnTokensImpl(tokensToReturn, rate);
205 }
206
207 /**
208 * Like consumeOrDrain but the call will always satisfy the asked for count.
209 * It does so by borrowing tokens from the future (zeroTime_ will move
210 * forward) if the currently available count isn't sufficient.
211 *
212 * Returns a folly::Optional<double>. The optional wont be set if the request
213 * cannot be satisfied: only case is when it is larger than burstSize. The
214 * value of the optional is a double indicating the time in seconds that the
215 * caller needs to wait at which the reservation becomes valid. The caller
216 * could simply sleep for the returned duration to smooth out the allocation
217 * to match the rate limiter or do some other computation in the meantime. In
218 * any case, any regular consume or consumeOrDrain calls will fail to allocate
219 * any tokens until the future time is reached.
220 *
221 * Note: It is assumed the caller will not ask for a very large count nor use
222 * it immediately (if not waiting inline) as that would break the burst
223 * prevention the limiter is meant to be used for.
224 *
225 * Thread-safe.
226 */
227 Optional<double> consumeWithBorrowNonBlocking(
228 double toConsume,
229 double rate,
230 double burstSize,
231 double nowInSeconds = defaultClockNow()) {
232 assert(rate > 0);
233 assert(burstSize > 0);
234
235 if (burstSize < toConsume) {
236 return folly::none;
237 }
238
239 while (toConsume > 0) {
240 double consumed =
241 consumeOrDrain(toConsume, rate, burstSize, nowInSeconds);
242 if (consumed > 0) {
243 toConsume -= consumed;
244 } else {
245 double zeroTimeNew = returnTokensImpl(-toConsume, rate);
246 double napTime = std::max(0.0, zeroTimeNew - nowInSeconds);
247 return napTime;
248 }
249 }
250 return 0;
251 }
252
253 /**
254 * Convenience wrapper around non-blocking borrow to sleep inline until
255 * reservation is valid.
256 */
257 bool consumeWithBorrowAndWait(
258 double toConsume,
259 double rate,
260 double burstSize,
261 double nowInSeconds = defaultClockNow()) {
262 auto res =
263 consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds);
264 if (res.value_or(0) > 0) {
265 int64_t napUSec = res.value() * 1000000;
266 std::this_thread::sleep_for(std::chrono::microseconds(napUSec));
267 }
268 return res.has_value();
269 }
270
271 /**
272 * Returns the number of tokens currently available.
273 *
274 * Thread-safe (but returned value may immediately be outdated).
275 */
276 double available(
277 double rate,
278 double burstSize,
279 double nowInSeconds = defaultClockNow()) const noexcept {
280 assert(rate > 0);
281 assert(burstSize > 0);
282
283 double zt = this->zeroTime_.load();
284 if (nowInSeconds <= zt) {
285 return 0;
286 }
287 return std::min((nowInSeconds - zt) * rate, burstSize);
288 }
289
290 private:
291 template <typename TCallback>
292 bool consumeImpl(
293 double rate,
294 double burstSize,
295 double nowInSeconds,
296 const TCallback& callback) {
297 auto zeroTimeOld = zeroTime_.load();
298 double zeroTimeNew;
299 do {
300 auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
301 if (!callback(tokens)) {
302 return false;
303 }
304 zeroTimeNew = nowInSeconds - tokens / rate;
305 } while (
306 UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
307
308 return true;
309 }
310
311 /**
312 * Adjust zeroTime based on rate and tokenCount and return the new value of
313 * zeroTime_. Note: Token count can be negative to move the zeroTime_ value
314 * into the future.
315 */
316 double returnTokensImpl(double tokenCount, double rate) {
317 auto zeroTimeOld = zeroTime_.load();
318 double zeroTimeNew;
319 do {
320 zeroTimeNew = zeroTimeOld - tokenCount / rate;
321 } while (
322 UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
323 return zeroTimeNew;
324 }
325
326 alignas(hardware_destructive_interference_size) std::atomic<double> zeroTime_;
327};
328
329/**
330 * Specialization of BasicDynamicTokenBucket with a fixed token
331 * generation rate and a fixed maximum burst size.
332 */
333template <typename Clock = std::chrono::steady_clock>
334class BasicTokenBucket {
335 static_assert(Clock::is_steady, "clock must be steady");
336
337 private:
338 using Impl = BasicDynamicTokenBucket<Clock>;
339
340 public:
341 /**
342 * Construct a token bucket with a specific maximum rate and burst size.
343 *
344 * @param genRate Number of tokens to generate per second.
345 * @param burstSize Maximum burst size. Must be greater than 0.
346 * @param zeroTime Initial time at which to consider the token bucket
347 * starting to fill. Defaults to 0, so by default token
348 * bucket is "full" after construction.
349 */
350 BasicTokenBucket(
351 double genRate,
352 double burstSize,
353 double zeroTime = 0) noexcept
354 : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
355 assert(rate_ > 0);
356 assert(burstSize_ > 0);
357 }
358
359 /**
360 * Copy constructor.
361 *
362 * Warning: not thread safe!
363 */
364 BasicTokenBucket(const BasicTokenBucket& other) noexcept = default;
365
366 /**
367 * Copy-assignment operator.
368 *
369 * Warning: not thread safe!
370 */
371 BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default;
372
373 /**
374 * Returns the current time in seconds since Epoch.
375 */
376 static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
377 return Impl::defaultClockNow();
378 }
379
380 /**
381 * Change rate and burst size.
382 *
383 * Warning: not thread safe!
384 *
385 * @param genRate Number of tokens to generate per second.
386 * @param burstSize Maximum burst size. Must be greater than 0.
387 * @param nowInSeconds Current time in seconds. Should be monotonically
388 * increasing from the nowInSeconds specified in
389 * this token bucket's constructor.
390 */
391 void reset(
392 double genRate,
393 double burstSize,
394 double nowInSeconds = defaultClockNow()) noexcept {
395 assert(genRate > 0);
396 assert(burstSize > 0);
397 const double availTokens = available(nowInSeconds);
398 rate_ = genRate;
399 burstSize_ = burstSize;
400 setCapacity(availTokens, nowInSeconds);
401 }
402
403 /**
404 * Change number of tokens in bucket.
405 *
406 * Warning: not thread safe!
407 *
408 * @param tokens Desired number of tokens in bucket after the call.
409 * @param nowInSeconds Current time in seconds. Should be monotonically
410 * increasing from the nowInSeconds specified in
411 * this token bucket's constructor.
412 */
413 void setCapacity(double tokens, double nowInSeconds) noexcept {
414 tokenBucket_.reset(nowInSeconds - tokens / rate_);
415 }
416
417 /**
418 * Attempts to consume some number of tokens. Tokens are first added to the
419 * bucket based on the time elapsed since the last attempt to consume tokens.
420 * Note: Attempts to consume more tokens than the burst size will always
421 * fail.
422 *
423 * Thread-safe.
424 *
425 * @param toConsume The number of tokens to consume.
426 * @param nowInSeconds Current time in seconds. Should be monotonically
427 * increasing from the nowInSeconds specified in
428 * this token bucket's constructor.
429 * @return True if the rate limit check passed, false otherwise.
430 */
431 bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
432 return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
433 }
434
435 /**
436 * Similar to consume, but always consumes some number of tokens. If the
437 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the
438 * bucket is drained.
439 *
440 * Thread-safe.
441 *
442 * @param toConsume The number of tokens to consume.
443 * @param nowInSeconds Current time in seconds. Should be monotonically
444 * increasing from the nowInSeconds specified in
445 * this token bucket's constructor.
446 * @return number of tokens that were consumed.
447 */
448 double consumeOrDrain(
449 double toConsume,
450 double nowInSeconds = defaultClockNow()) {
451 return tokenBucket_.consumeOrDrain(
452 toConsume, rate_, burstSize_, nowInSeconds);
453 }
454
455 /**
456 * Returns extra token back to the bucket.
457 */
458 void returnTokens(double tokensToReturn) {
459 return tokenBucket_.returnTokens(tokensToReturn, rate_);
460 }
461
462 /**
463 * Reserve tokens and return time to wait for in order for the reservation to
464 * be compatible with the bucket configuration.
465 */
466 Optional<double> consumeWithBorrowNonBlocking(
467 double toConsume,
468 double nowInSeconds = defaultClockNow()) {
469 return tokenBucket_.consumeWithBorrowNonBlocking(
470 toConsume, rate_, burstSize_, nowInSeconds);
471 }
472
473 /**
474 * Reserve tokens. Blocks if need be until reservation is satisfied.
475 */
476 bool consumeWithBorrowAndWait(
477 double toConsume,
478 double nowInSeconds = defaultClockNow()) {
479 return tokenBucket_.consumeWithBorrowAndWait(
480 toConsume, rate_, burstSize_, nowInSeconds);
481 }
482
483 /**
484 * Returns the number of tokens currently available.
485 *
486 * Thread-safe (but returned value may immediately be outdated).
487 */
488 double available(double nowInSeconds = defaultClockNow()) const {
489 return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
490 }
491
492 /**
493 * Returns the number of tokens generated per second.
494 *
495 * Thread-safe (but returned value may immediately be outdated).
496 */
497 double rate() const noexcept {
498 return rate_;
499 }
500
501 /**
502 * Returns the maximum burst size.
503 *
504 * Thread-safe (but returned value may immediately be outdated).
505 */
506 double burst() const noexcept {
507 return burstSize_;
508 }
509
510 private:
511 Impl tokenBucket_;
512 double rate_;
513 double burstSize_;
514};
515
516using TokenBucket = BasicTokenBucket<>;
517using DynamicTokenBucket = BasicDynamicTokenBucket<>;
518
519} // namespace folly
520