| 1 | /* |
| 2 | * Copyright 2015-present Facebook, Inc. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #pragma once |
| 18 | |
| 19 | #include <algorithm> |
| 20 | #include <atomic> |
| 21 | #include <chrono> |
| 22 | #include <thread> |
| 23 | |
| 24 | #include <folly/Likely.h> |
| 25 | #include <folly/Optional.h> |
| 26 | #include <folly/concurrency/CacheLocality.h> |
| 27 | |
| 28 | namespace folly { |
| 29 | |
| 30 | /** |
| 31 | * Thread-safe (atomic) token bucket implementation. |
| 32 | * |
| 33 | * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream |
| 34 | * of events with an average rate and some amount of burstiness. The canonical |
| 35 | * example is a packet switched network: the network can accept some number of |
| 36 | * bytes per second and the bytes come in finite packets (bursts). A token |
| 37 | * bucket stores up to a fixed number of tokens (the burst size). Some number |
| 38 | * of tokens are removed when an event occurs. The tokens are replenished at a |
| 39 | * fixed rate. Failure to allocate tokens implies resource is unavailable and |
| 40 | * caller needs to implement its own retry mechanism. For simple cases where |
| 41 | * caller is okay with a FIFO starvation-free scheduling behavior, there are |
| 42 | * also APIs to 'borrow' from the future effectively assigning a start time to |
| 43 | * the caller when it should proceed with using the resource. It is also |
| 44 | * possible to 'return' previously allocated tokens to make them available to |
| 45 | * other users. Returns in excess of burstSize are considered expired and |
| 46 | * will not be available to later callers. |
| 47 | * |
| 48 | * This implementation records the last time it was updated. This allows the |
| 49 | * token bucket to add tokens "just in time" when tokens are requested. |
| 50 | * |
| 51 | * The "dynamic" base variant allows the token generation rate and maximum |
| 52 | * burst size to change with every token consumption. |
| 53 | * |
| 54 | * @tparam Clock Clock type, must be steady i.e. monotonic. |
| 55 | */ |
| 56 | template <typename Clock = std::chrono::steady_clock> |
| 57 | class BasicDynamicTokenBucket { |
| 58 | static_assert(Clock::is_steady, "clock must be steady" ); |
| 59 | |
| 60 | public: |
| 61 | /** |
| 62 | * Constructor. |
| 63 | * |
| 64 | * @param zeroTime Initial time at which to consider the token bucket |
| 65 | * starting to fill. Defaults to 0, so by default token |
| 66 | * buckets are "full" after construction. |
| 67 | */ |
| 68 | explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept |
| 69 | : zeroTime_(zeroTime) {} |
| 70 | |
| 71 | /** |
| 72 | * Copy constructor. |
| 73 | * |
| 74 | * Thread-safe. (Copy constructors of derived classes may not be thread-safe |
| 75 | * however.) |
| 76 | */ |
| 77 | BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept |
| 78 | : zeroTime_(other.zeroTime_.load()) {} |
| 79 | |
| 80 | /** |
| 81 | * Copy-assignment operator. |
| 82 | * |
| 83 | * Warning: not thread safe for the object being assigned to (including |
| 84 | * self-assignment). Thread-safe for the other object. |
| 85 | */ |
| 86 | BasicDynamicTokenBucket& operator=( |
| 87 | const BasicDynamicTokenBucket& other) noexcept { |
| 88 | zeroTime_ = other.zeroTime_.load(); |
| 89 | return *this; |
| 90 | } |
| 91 | |
| 92 | /** |
| 93 | * Re-initialize token bucket. |
| 94 | * |
| 95 | * Thread-safe. |
| 96 | * |
| 97 | * @param zeroTime Initial time at which to consider the token bucket |
| 98 | * starting to fill. Defaults to 0, so by default token |
| 99 | * bucket is reset to "full". |
| 100 | */ |
| 101 | void reset(double zeroTime = 0) noexcept { |
| 102 | zeroTime_ = zeroTime; |
| 103 | } |
| 104 | |
| 105 | /** |
| 106 | * Returns the current time in seconds since Epoch. |
| 107 | */ |
| 108 | static double defaultClockNow() noexcept { |
| 109 | using dur = std::chrono::duration<double>; |
| 110 | auto const now = Clock::now().time_since_epoch(); |
| 111 | return std::chrono::duration_cast<dur>(now).count(); |
| 112 | } |
| 113 | |
| 114 | /** |
| 115 | * Attempts to consume some number of tokens. Tokens are first added to the |
| 116 | * bucket based on the time elapsed since the last attempt to consume tokens. |
| 117 | * Note: Attempts to consume more tokens than the burst size will always |
| 118 | * fail. |
| 119 | * |
| 120 | * Thread-safe. |
| 121 | * |
| 122 | * @param toConsume The number of tokens to consume. |
| 123 | * @param rate Number of tokens to generate per second. |
| 124 | * @param burstSize Maximum burst size. Must be greater than 0. |
| 125 | * @param nowInSeconds Current time in seconds. Should be monotonically |
| 126 | * increasing from the nowInSeconds specified in |
| 127 | * this token bucket's constructor. |
| 128 | * @return True if the rate limit check passed, false otherwise. |
| 129 | */ |
| 130 | bool consume( |
| 131 | double toConsume, |
| 132 | double rate, |
| 133 | double burstSize, |
| 134 | double nowInSeconds = defaultClockNow()) { |
| 135 | assert(rate > 0); |
| 136 | assert(burstSize > 0); |
| 137 | |
| 138 | if (nowInSeconds <= zeroTime_.load()) { |
| 139 | return 0; |
| 140 | } |
| 141 | |
| 142 | return consumeImpl( |
| 143 | rate, burstSize, nowInSeconds, [toConsume](double& tokens) { |
| 144 | if (tokens < toConsume) { |
| 145 | return false; |
| 146 | } |
| 147 | tokens -= toConsume; |
| 148 | return true; |
| 149 | }); |
| 150 | } |
| 151 | |
| 152 | /** |
| 153 | * Similar to consume, but always consumes some number of tokens. If the |
| 154 | * bucket contains enough tokens - consumes toConsume tokens. Otherwise the |
| 155 | * bucket is drained. |
| 156 | * |
| 157 | * Thread-safe. |
| 158 | * |
| 159 | * @param toConsume The number of tokens to consume. |
| 160 | * @param rate Number of tokens to generate per second. |
| 161 | * @param burstSize Maximum burst size. Must be greater than 0. |
| 162 | * @param nowInSeconds Current time in seconds. Should be monotonically |
| 163 | * increasing from the nowInSeconds specified in |
| 164 | * this token bucket's constructor. |
| 165 | * @return number of tokens that were consumed. |
| 166 | */ |
| 167 | double consumeOrDrain( |
| 168 | double toConsume, |
| 169 | double rate, |
| 170 | double burstSize, |
| 171 | double nowInSeconds = defaultClockNow()) { |
| 172 | assert(rate > 0); |
| 173 | assert(burstSize > 0); |
| 174 | |
| 175 | if (nowInSeconds <= zeroTime_.load()) { |
| 176 | return 0; |
| 177 | } |
| 178 | |
| 179 | double consumed; |
| 180 | consumeImpl( |
| 181 | rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) { |
| 182 | if (tokens < toConsume) { |
| 183 | consumed = tokens; |
| 184 | tokens = 0.0; |
| 185 | } else { |
| 186 | consumed = toConsume; |
| 187 | tokens -= toConsume; |
| 188 | } |
| 189 | return true; |
| 190 | }); |
| 191 | return consumed; |
| 192 | } |
| 193 | |
| 194 | /** |
| 195 | * Return extra tokens back to the bucket. This will move the zeroTime_ |
| 196 | * value back based on the rate. |
| 197 | * |
| 198 | * Thread-safe. |
| 199 | */ |
| 200 | void returnTokens(double tokensToReturn, double rate) { |
| 201 | assert(rate > 0); |
| 202 | assert(tokensToReturn > 0); |
| 203 | |
| 204 | returnTokensImpl(tokensToReturn, rate); |
| 205 | } |
| 206 | |
| 207 | /** |
| 208 | * Like consumeOrDrain but the call will always satisfy the asked for count. |
| 209 | * It does so by borrowing tokens from the future (zeroTime_ will move |
| 210 | * forward) if the currently available count isn't sufficient. |
| 211 | * |
| 212 | * Returns a folly::Optional<double>. The optional wont be set if the request |
| 213 | * cannot be satisfied: only case is when it is larger than burstSize. The |
| 214 | * value of the optional is a double indicating the time in seconds that the |
| 215 | * caller needs to wait at which the reservation becomes valid. The caller |
| 216 | * could simply sleep for the returned duration to smooth out the allocation |
| 217 | * to match the rate limiter or do some other computation in the meantime. In |
| 218 | * any case, any regular consume or consumeOrDrain calls will fail to allocate |
| 219 | * any tokens until the future time is reached. |
| 220 | * |
| 221 | * Note: It is assumed the caller will not ask for a very large count nor use |
| 222 | * it immediately (if not waiting inline) as that would break the burst |
| 223 | * prevention the limiter is meant to be used for. |
| 224 | * |
| 225 | * Thread-safe. |
| 226 | */ |
| 227 | Optional<double> consumeWithBorrowNonBlocking( |
| 228 | double toConsume, |
| 229 | double rate, |
| 230 | double burstSize, |
| 231 | double nowInSeconds = defaultClockNow()) { |
| 232 | assert(rate > 0); |
| 233 | assert(burstSize > 0); |
| 234 | |
| 235 | if (burstSize < toConsume) { |
| 236 | return folly::none; |
| 237 | } |
| 238 | |
| 239 | while (toConsume > 0) { |
| 240 | double consumed = |
| 241 | consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); |
| 242 | if (consumed > 0) { |
| 243 | toConsume -= consumed; |
| 244 | } else { |
| 245 | double zeroTimeNew = returnTokensImpl(-toConsume, rate); |
| 246 | double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); |
| 247 | return napTime; |
| 248 | } |
| 249 | } |
| 250 | return 0; |
| 251 | } |
| 252 | |
| 253 | /** |
| 254 | * Convenience wrapper around non-blocking borrow to sleep inline until |
| 255 | * reservation is valid. |
| 256 | */ |
| 257 | bool consumeWithBorrowAndWait( |
| 258 | double toConsume, |
| 259 | double rate, |
| 260 | double burstSize, |
| 261 | double nowInSeconds = defaultClockNow()) { |
| 262 | auto res = |
| 263 | consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); |
| 264 | if (res.value_or(0) > 0) { |
| 265 | int64_t napUSec = res.value() * 1000000; |
| 266 | std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); |
| 267 | } |
| 268 | return res.has_value(); |
| 269 | } |
| 270 | |
| 271 | /** |
| 272 | * Returns the number of tokens currently available. |
| 273 | * |
| 274 | * Thread-safe (but returned value may immediately be outdated). |
| 275 | */ |
| 276 | double available( |
| 277 | double rate, |
| 278 | double burstSize, |
| 279 | double nowInSeconds = defaultClockNow()) const noexcept { |
| 280 | assert(rate > 0); |
| 281 | assert(burstSize > 0); |
| 282 | |
| 283 | double zt = this->zeroTime_.load(); |
| 284 | if (nowInSeconds <= zt) { |
| 285 | return 0; |
| 286 | } |
| 287 | return std::min((nowInSeconds - zt) * rate, burstSize); |
| 288 | } |
| 289 | |
| 290 | private: |
| 291 | template <typename TCallback> |
| 292 | bool consumeImpl( |
| 293 | double rate, |
| 294 | double burstSize, |
| 295 | double nowInSeconds, |
| 296 | const TCallback& callback) { |
| 297 | auto zeroTimeOld = zeroTime_.load(); |
| 298 | double zeroTimeNew; |
| 299 | do { |
| 300 | auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); |
| 301 | if (!callback(tokens)) { |
| 302 | return false; |
| 303 | } |
| 304 | zeroTimeNew = nowInSeconds - tokens / rate; |
| 305 | } while ( |
| 306 | UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); |
| 307 | |
| 308 | return true; |
| 309 | } |
| 310 | |
| 311 | /** |
| 312 | * Adjust zeroTime based on rate and tokenCount and return the new value of |
| 313 | * zeroTime_. Note: Token count can be negative to move the zeroTime_ value |
| 314 | * into the future. |
| 315 | */ |
| 316 | double returnTokensImpl(double tokenCount, double rate) { |
| 317 | auto zeroTimeOld = zeroTime_.load(); |
| 318 | double zeroTimeNew; |
| 319 | do { |
| 320 | zeroTimeNew = zeroTimeOld - tokenCount / rate; |
| 321 | } while ( |
| 322 | UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); |
| 323 | return zeroTimeNew; |
| 324 | } |
| 325 | |
| 326 | alignas(hardware_destructive_interference_size) std::atomic<double> zeroTime_; |
| 327 | }; |
| 328 | |
| 329 | /** |
| 330 | * Specialization of BasicDynamicTokenBucket with a fixed token |
| 331 | * generation rate and a fixed maximum burst size. |
| 332 | */ |
| 333 | template <typename Clock = std::chrono::steady_clock> |
| 334 | class BasicTokenBucket { |
| 335 | static_assert(Clock::is_steady, "clock must be steady" ); |
| 336 | |
| 337 | private: |
| 338 | using Impl = BasicDynamicTokenBucket<Clock>; |
| 339 | |
| 340 | public: |
| 341 | /** |
| 342 | * Construct a token bucket with a specific maximum rate and burst size. |
| 343 | * |
| 344 | * @param genRate Number of tokens to generate per second. |
| 345 | * @param burstSize Maximum burst size. Must be greater than 0. |
| 346 | * @param zeroTime Initial time at which to consider the token bucket |
| 347 | * starting to fill. Defaults to 0, so by default token |
| 348 | * bucket is "full" after construction. |
| 349 | */ |
| 350 | BasicTokenBucket( |
| 351 | double genRate, |
| 352 | double burstSize, |
| 353 | double zeroTime = 0) noexcept |
| 354 | : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) { |
| 355 | assert(rate_ > 0); |
| 356 | assert(burstSize_ > 0); |
| 357 | } |
| 358 | |
| 359 | /** |
| 360 | * Copy constructor. |
| 361 | * |
| 362 | * Warning: not thread safe! |
| 363 | */ |
| 364 | BasicTokenBucket(const BasicTokenBucket& other) noexcept = default; |
| 365 | |
| 366 | /** |
| 367 | * Copy-assignment operator. |
| 368 | * |
| 369 | * Warning: not thread safe! |
| 370 | */ |
| 371 | BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default; |
| 372 | |
| 373 | /** |
| 374 | * Returns the current time in seconds since Epoch. |
| 375 | */ |
| 376 | static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) { |
| 377 | return Impl::defaultClockNow(); |
| 378 | } |
| 379 | |
| 380 | /** |
| 381 | * Change rate and burst size. |
| 382 | * |
| 383 | * Warning: not thread safe! |
| 384 | * |
| 385 | * @param genRate Number of tokens to generate per second. |
| 386 | * @param burstSize Maximum burst size. Must be greater than 0. |
| 387 | * @param nowInSeconds Current time in seconds. Should be monotonically |
| 388 | * increasing from the nowInSeconds specified in |
| 389 | * this token bucket's constructor. |
| 390 | */ |
| 391 | void reset( |
| 392 | double genRate, |
| 393 | double burstSize, |
| 394 | double nowInSeconds = defaultClockNow()) noexcept { |
| 395 | assert(genRate > 0); |
| 396 | assert(burstSize > 0); |
| 397 | const double availTokens = available(nowInSeconds); |
| 398 | rate_ = genRate; |
| 399 | burstSize_ = burstSize; |
| 400 | setCapacity(availTokens, nowInSeconds); |
| 401 | } |
| 402 | |
| 403 | /** |
| 404 | * Change number of tokens in bucket. |
| 405 | * |
| 406 | * Warning: not thread safe! |
| 407 | * |
| 408 | * @param tokens Desired number of tokens in bucket after the call. |
| 409 | * @param nowInSeconds Current time in seconds. Should be monotonically |
| 410 | * increasing from the nowInSeconds specified in |
| 411 | * this token bucket's constructor. |
| 412 | */ |
| 413 | void setCapacity(double tokens, double nowInSeconds) noexcept { |
| 414 | tokenBucket_.reset(nowInSeconds - tokens / rate_); |
| 415 | } |
| 416 | |
| 417 | /** |
| 418 | * Attempts to consume some number of tokens. Tokens are first added to the |
| 419 | * bucket based on the time elapsed since the last attempt to consume tokens. |
| 420 | * Note: Attempts to consume more tokens than the burst size will always |
| 421 | * fail. |
| 422 | * |
| 423 | * Thread-safe. |
| 424 | * |
| 425 | * @param toConsume The number of tokens to consume. |
| 426 | * @param nowInSeconds Current time in seconds. Should be monotonically |
| 427 | * increasing from the nowInSeconds specified in |
| 428 | * this token bucket's constructor. |
| 429 | * @return True if the rate limit check passed, false otherwise. |
| 430 | */ |
| 431 | bool consume(double toConsume, double nowInSeconds = defaultClockNow()) { |
| 432 | return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); |
| 433 | } |
| 434 | |
| 435 | /** |
| 436 | * Similar to consume, but always consumes some number of tokens. If the |
| 437 | * bucket contains enough tokens - consumes toConsume tokens. Otherwise the |
| 438 | * bucket is drained. |
| 439 | * |
| 440 | * Thread-safe. |
| 441 | * |
| 442 | * @param toConsume The number of tokens to consume. |
| 443 | * @param nowInSeconds Current time in seconds. Should be monotonically |
| 444 | * increasing from the nowInSeconds specified in |
| 445 | * this token bucket's constructor. |
| 446 | * @return number of tokens that were consumed. |
| 447 | */ |
| 448 | double consumeOrDrain( |
| 449 | double toConsume, |
| 450 | double nowInSeconds = defaultClockNow()) { |
| 451 | return tokenBucket_.consumeOrDrain( |
| 452 | toConsume, rate_, burstSize_, nowInSeconds); |
| 453 | } |
| 454 | |
| 455 | /** |
| 456 | * Returns extra token back to the bucket. |
| 457 | */ |
| 458 | void returnTokens(double tokensToReturn) { |
| 459 | return tokenBucket_.returnTokens(tokensToReturn, rate_); |
| 460 | } |
| 461 | |
| 462 | /** |
| 463 | * Reserve tokens and return time to wait for in order for the reservation to |
| 464 | * be compatible with the bucket configuration. |
| 465 | */ |
| 466 | Optional<double> consumeWithBorrowNonBlocking( |
| 467 | double toConsume, |
| 468 | double nowInSeconds = defaultClockNow()) { |
| 469 | return tokenBucket_.consumeWithBorrowNonBlocking( |
| 470 | toConsume, rate_, burstSize_, nowInSeconds); |
| 471 | } |
| 472 | |
| 473 | /** |
| 474 | * Reserve tokens. Blocks if need be until reservation is satisfied. |
| 475 | */ |
| 476 | bool consumeWithBorrowAndWait( |
| 477 | double toConsume, |
| 478 | double nowInSeconds = defaultClockNow()) { |
| 479 | return tokenBucket_.consumeWithBorrowAndWait( |
| 480 | toConsume, rate_, burstSize_, nowInSeconds); |
| 481 | } |
| 482 | |
| 483 | /** |
| 484 | * Returns the number of tokens currently available. |
| 485 | * |
| 486 | * Thread-safe (but returned value may immediately be outdated). |
| 487 | */ |
| 488 | double available(double nowInSeconds = defaultClockNow()) const { |
| 489 | return tokenBucket_.available(rate_, burstSize_, nowInSeconds); |
| 490 | } |
| 491 | |
| 492 | /** |
| 493 | * Returns the number of tokens generated per second. |
| 494 | * |
| 495 | * Thread-safe (but returned value may immediately be outdated). |
| 496 | */ |
| 497 | double rate() const noexcept { |
| 498 | return rate_; |
| 499 | } |
| 500 | |
| 501 | /** |
| 502 | * Returns the maximum burst size. |
| 503 | * |
| 504 | * Thread-safe (but returned value may immediately be outdated). |
| 505 | */ |
| 506 | double burst() const noexcept { |
| 507 | return burstSize_; |
| 508 | } |
| 509 | |
| 510 | private: |
| 511 | Impl tokenBucket_; |
| 512 | double rate_; |
| 513 | double burstSize_; |
| 514 | }; |
| 515 | |
| 516 | using TokenBucket = BasicTokenBucket<>; |
| 517 | using DynamicTokenBucket = BasicDynamicTokenBucket<>; |
| 518 | |
| 519 | } // namespace folly |
| 520 | |