| 1 | #include <DataStreams/ExecutionSpeedLimits.h> |
| 2 | |
| 3 | #include <Common/ProfileEvents.h> |
| 4 | #include <Common/CurrentThread.h> |
| 5 | #include <IO/WriteHelpers.h> |
| 6 | #include <common/sleep.h> |
| 7 | |
| 8 | namespace ProfileEvents |
| 9 | { |
| 10 | extern const Event ThrottlerSleepMicroseconds; |
| 11 | } |
| 12 | |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | |
| 17 | namespace ErrorCodes |
| 18 | { |
| 19 | extern const int TOO_SLOW; |
| 20 | } |
| 21 | |
| 22 | static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds) |
| 23 | { |
| 24 | /// How much time to wait for the average speed to become `max_speed_in_seconds`. |
| 25 | UInt64 desired_microseconds = total_progress_size * 1000000 / max_speed_in_seconds; |
| 26 | |
| 27 | if (desired_microseconds > total_elapsed_microseconds) |
| 28 | { |
| 29 | UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds; |
| 30 | |
| 31 | /// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang). |
| 32 | sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds); |
| 33 | |
| 34 | sleepForMicroseconds(sleep_microseconds); |
| 35 | |
| 36 | ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds); |
| 37 | } |
| 38 | } |
| 39 | |
| 40 | void ExecutionSpeedLimits::throttle( |
| 41 | size_t read_rows, size_t read_bytes, |
| 42 | size_t total_rows_to_read, UInt64 total_elapsed_microseconds) |
| 43 | { |
| 44 | if ((min_execution_rps != 0 || max_execution_rps != 0 |
| 45 | || min_execution_bps != 0 || max_execution_bps != 0 |
| 46 | || (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0)) && |
| 47 | (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds())) |
| 48 | { |
| 49 | /// Do not count sleeps in throttlers |
| 50 | UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds]; |
| 51 | |
| 52 | double elapsed_seconds = 0; |
| 53 | if (throttler_sleep_microseconds > total_elapsed_microseconds) |
| 54 | elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0; |
| 55 | |
| 56 | if (elapsed_seconds > 0) |
| 57 | { |
| 58 | auto rows_per_second = read_rows / elapsed_seconds; |
| 59 | if (min_execution_rps && rows_per_second < min_execution_rps) |
| 60 | throw Exception("Query is executing too slow: " + toString(read_rows / elapsed_seconds) |
| 61 | + " rows/sec., minimum: " + toString(min_execution_rps), |
| 62 | ErrorCodes::TOO_SLOW); |
| 63 | |
| 64 | auto bytes_per_second = read_bytes / elapsed_seconds; |
| 65 | if (min_execution_bps && bytes_per_second < min_execution_bps) |
| 66 | throw Exception("Query is executing too slow: " + toString(read_bytes / elapsed_seconds) |
| 67 | + " bytes/sec., minimum: " + toString(min_execution_bps), |
| 68 | ErrorCodes::TOO_SLOW); |
| 69 | |
| 70 | /// If the predicted execution time is longer than `max_execution_time`. |
| 71 | if (max_execution_time != 0 && total_rows_to_read && read_rows) |
| 72 | { |
| 73 | double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows); |
| 74 | |
| 75 | if (estimated_execution_time_seconds > max_execution_time.totalSeconds()) |
| 76 | throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)" |
| 77 | + " is too long. Maximum: " + toString(max_execution_time.totalSeconds()) |
| 78 | + ". Estimated rows to process: " + toString(total_rows_to_read), |
| 79 | ErrorCodes::TOO_SLOW); |
| 80 | } |
| 81 | |
| 82 | if (max_execution_rps && rows_per_second >= max_execution_rps) |
| 83 | limitProgressingSpeed(read_rows, max_execution_rps, total_elapsed_microseconds); |
| 84 | |
| 85 | if (max_execution_bps && bytes_per_second >= max_execution_bps) |
| 86 | limitProgressingSpeed(read_bytes, max_execution_bps, total_elapsed_microseconds); |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | } |
| 92 | |