1#include <DataStreams/ExecutionSpeedLimits.h>
2
3#include <Common/ProfileEvents.h>
4#include <Common/CurrentThread.h>
5#include <IO/WriteHelpers.h>
6#include <common/sleep.h>
7
8namespace ProfileEvents
9{
10 extern const Event ThrottlerSleepMicroseconds;
11}
12
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int TOO_SLOW;
20}
21
22static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds)
23{
24 /// How much time to wait for the average speed to become `max_speed_in_seconds`.
25 UInt64 desired_microseconds = total_progress_size * 1000000 / max_speed_in_seconds;
26
27 if (desired_microseconds > total_elapsed_microseconds)
28 {
29 UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
30
31 /// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang).
32 sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds);
33
34 sleepForMicroseconds(sleep_microseconds);
35
36 ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
37 }
38}
39
40void ExecutionSpeedLimits::throttle(
41 size_t read_rows, size_t read_bytes,
42 size_t total_rows_to_read, UInt64 total_elapsed_microseconds)
43{
44 if ((min_execution_rps != 0 || max_execution_rps != 0
45 || min_execution_bps != 0 || max_execution_bps != 0
46 || (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0)) &&
47 (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
48 {
49 /// Do not count sleeps in throttlers
50 UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
51
52 double elapsed_seconds = 0;
53 if (throttler_sleep_microseconds > total_elapsed_microseconds)
54 elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;
55
56 if (elapsed_seconds > 0)
57 {
58 auto rows_per_second = read_rows / elapsed_seconds;
59 if (min_execution_rps && rows_per_second < min_execution_rps)
60 throw Exception("Query is executing too slow: " + toString(read_rows / elapsed_seconds)
61 + " rows/sec., minimum: " + toString(min_execution_rps),
62 ErrorCodes::TOO_SLOW);
63
64 auto bytes_per_second = read_bytes / elapsed_seconds;
65 if (min_execution_bps && bytes_per_second < min_execution_bps)
66 throw Exception("Query is executing too slow: " + toString(read_bytes / elapsed_seconds)
67 + " bytes/sec., minimum: " + toString(min_execution_bps),
68 ErrorCodes::TOO_SLOW);
69
70 /// If the predicted execution time is longer than `max_execution_time`.
71 if (max_execution_time != 0 && total_rows_to_read && read_rows)
72 {
73 double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows);
74
75 if (estimated_execution_time_seconds > max_execution_time.totalSeconds())
76 throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
77 + " is too long. Maximum: " + toString(max_execution_time.totalSeconds())
78 + ". Estimated rows to process: " + toString(total_rows_to_read),
79 ErrorCodes::TOO_SLOW);
80 }
81
82 if (max_execution_rps && rows_per_second >= max_execution_rps)
83 limitProgressingSpeed(read_rows, max_execution_rps, total_elapsed_microseconds);
84
85 if (max_execution_bps && bytes_per_second >= max_execution_bps)
86 limitProgressingSpeed(read_bytes, max_execution_bps, total_elapsed_microseconds);
87 }
88 }
89}
90
91}
92