1#pragma once
2
3#include <time.h> /// nanosleep
4#include <mutex>
5#include <memory>
6#include <Common/Stopwatch.h>
7#include <Common/Exception.h>
8#include <Common/ProfileEvents.h>
9#include <common/sleep.h>
10#include <IO/WriteHelpers.h>
11#include <port/clock.h>
12
13
14namespace ProfileEvents
15{
16 extern const Event ThrottlerSleepMicroseconds;
17}
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int LIMIT_EXCEEDED;
26}
27
28
29/** Allows you to limit the speed of something (in entities per second) using sleep.
30 * Specifics of work:
31 * - only the average speed is considered, from the moment of the first call of `add` function;
32 * if there were periods with low speed, then during some time after them, the speed will be higher;
33 *
34 * Also allows you to set a limit on the maximum number of entities. If exceeded, an exception will be thrown.
35 */
36class Throttler
37{
38public:
39 Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_ = nullptr)
40 : max_speed(max_speed_), limit_exceeded_exception_message(""), parent(parent_) {}
41
42 Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
43 const std::shared_ptr<Throttler> & parent_ = nullptr)
44 : max_speed(max_speed_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_), parent(parent_) {}
45
46 void add(const size_t amount)
47 {
48 size_t new_count;
49 UInt64 elapsed_ns = 0;
50
51 {
52 std::lock_guard lock(mutex);
53
54 if (max_speed)
55 {
56 if (0 == count)
57 {
58 watch.start();
59 elapsed_ns = 0;
60 }
61 else
62 elapsed_ns = watch.elapsed();
63 }
64
65 count += amount;
66 new_count = count;
67 }
68
69 if (limit && new_count > limit)
70 throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
71
72 if (max_speed)
73 {
74 /// How much time to wait for the average speed to become `max_speed`.
75 UInt64 desired_ns = new_count * 1000000000 / max_speed;
76
77 if (desired_ns > elapsed_ns)
78 {
79 UInt64 sleep_ns = desired_ns - elapsed_ns;
80 sleepForNanoseconds(sleep_ns);
81
82 ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL);
83 }
84 }
85
86 if (parent)
87 parent->add(amount);
88 }
89
90 /// Not thread safe
91 void setParent(const std::shared_ptr<Throttler> & parent_)
92 {
93 parent = parent_;
94 }
95
96 void reset()
97 {
98 std::lock_guard lock(mutex);
99
100 count = 0;
101 watch.reset();
102 }
103
104private:
105 size_t count = 0;
106 const size_t max_speed = 0;
107 const UInt64 limit = 0; /// 0 - not limited.
108 const char * limit_exceeded_exception_message = nullptr;
109 Stopwatch watch {CLOCK_MONOTONIC_COARSE};
110 std::mutex mutex;
111
112 /// Used to implement a hierarchy of throttlers
113 std::shared_ptr<Throttler> parent;
114};
115
116
117using ThrottlerPtr = std::shared_ptr<Throttler>;
118
119}
120