1 | #include <DataStreams/ExecutionSpeedLimits.h> |
2 | |
3 | #include <Common/ProfileEvents.h> |
4 | #include <Common/CurrentThread.h> |
5 | #include <IO/WriteHelpers.h> |
6 | #include <common/sleep.h> |
7 | |
8 | namespace ProfileEvents |
9 | { |
10 | extern const Event ThrottlerSleepMicroseconds; |
11 | } |
12 | |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | namespace ErrorCodes |
18 | { |
19 | extern const int TOO_SLOW; |
20 | } |
21 | |
22 | static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds) |
23 | { |
24 | /// How much time to wait for the average speed to become `max_speed_in_seconds`. |
25 | UInt64 desired_microseconds = total_progress_size * 1000000 / max_speed_in_seconds; |
26 | |
27 | if (desired_microseconds > total_elapsed_microseconds) |
28 | { |
29 | UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds; |
30 | |
31 | /// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang). |
32 | sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds); |
33 | |
34 | sleepForMicroseconds(sleep_microseconds); |
35 | |
36 | ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds); |
37 | } |
38 | } |
39 | |
40 | void ExecutionSpeedLimits::throttle( |
41 | size_t read_rows, size_t read_bytes, |
42 | size_t total_rows_to_read, UInt64 total_elapsed_microseconds) |
43 | { |
44 | if ((min_execution_rps != 0 || max_execution_rps != 0 |
45 | || min_execution_bps != 0 || max_execution_bps != 0 |
46 | || (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0)) && |
47 | (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds())) |
48 | { |
49 | /// Do not count sleeps in throttlers |
50 | UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds]; |
51 | |
52 | double elapsed_seconds = 0; |
53 | if (throttler_sleep_microseconds > total_elapsed_microseconds) |
54 | elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0; |
55 | |
56 | if (elapsed_seconds > 0) |
57 | { |
58 | auto rows_per_second = read_rows / elapsed_seconds; |
59 | if (min_execution_rps && rows_per_second < min_execution_rps) |
60 | throw Exception("Query is executing too slow: " + toString(read_rows / elapsed_seconds) |
61 | + " rows/sec., minimum: " + toString(min_execution_rps), |
62 | ErrorCodes::TOO_SLOW); |
63 | |
64 | auto bytes_per_second = read_bytes / elapsed_seconds; |
65 | if (min_execution_bps && bytes_per_second < min_execution_bps) |
66 | throw Exception("Query is executing too slow: " + toString(read_bytes / elapsed_seconds) |
67 | + " bytes/sec., minimum: " + toString(min_execution_bps), |
68 | ErrorCodes::TOO_SLOW); |
69 | |
70 | /// If the predicted execution time is longer than `max_execution_time`. |
71 | if (max_execution_time != 0 && total_rows_to_read && read_rows) |
72 | { |
73 | double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows); |
74 | |
75 | if (estimated_execution_time_seconds > max_execution_time.totalSeconds()) |
76 | throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)" |
77 | + " is too long. Maximum: " + toString(max_execution_time.totalSeconds()) |
78 | + ". Estimated rows to process: " + toString(total_rows_to_read), |
79 | ErrorCodes::TOO_SLOW); |
80 | } |
81 | |
82 | if (max_execution_rps && rows_per_second >= max_execution_rps) |
83 | limitProgressingSpeed(read_rows, max_execution_rps, total_elapsed_microseconds); |
84 | |
85 | if (max_execution_bps && bytes_per_second >= max_execution_bps) |
86 | limitProgressingSpeed(read_bytes, max_execution_bps, total_elapsed_microseconds); |
87 | } |
88 | } |
89 | } |
90 | |
91 | } |
92 | |