| 1 | #include <Processors/Transforms/LimitsCheckingTransform.h> |
| 2 | #include <Access/QuotaContext.h> |
| 3 | |
| 4 | namespace DB |
| 5 | { |
| 6 | |
| 7 | namespace ErrorCodes |
| 8 | { |
| 9 | extern const int TOO_MANY_ROWS; |
| 10 | extern const int TOO_MANY_BYTES; |
| 11 | extern const int TOO_MANY_ROWS_OR_BYTES; |
| 12 | extern const int TIMEOUT_EXCEEDED; |
| 13 | extern const int TOO_SLOW; |
| 14 | extern const int LOGICAL_ERROR; |
| 15 | extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; |
| 16 | extern const int TOO_DEEP_PIPELINE; |
| 17 | } |
| 18 | |
| 19 | |
| 20 | static bool handleOverflowMode(OverflowMode mode, const String & message, int code) |
| 21 | { |
| 22 | switch (mode) |
| 23 | { |
| 24 | case OverflowMode::THROW: |
| 25 | throw Exception(message, code); |
| 26 | case OverflowMode::BREAK: |
| 27 | return false; |
| 28 | default: |
| 29 | throw Exception("Logical error: unknown overflow mode" , ErrorCodes::LOGICAL_ERROR); |
| 30 | } |
| 31 | } |
| 32 | |
| 33 | |
| 34 | void ProcessorProfileInfo::update(const Chunk & block) |
| 35 | { |
| 36 | ++blocks; |
| 37 | rows += block.getNumRows(); |
| 38 | bytes += block.bytes(); |
| 39 | } |
| 40 | |
| 41 | LimitsCheckingTransform::LimitsCheckingTransform(const Block & , LocalLimits limits_) |
| 42 | : ISimpleTransform(header_, header_, false) |
| 43 | , limits(std::move(limits_)) |
| 44 | { |
| 45 | } |
| 46 | |
| 47 | //LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem) |
| 48 | // : ISimpleTransform(header, header, false) |
| 49 | // , limits(std::move(limits)) |
| 50 | // , process_list_elem(process_list_elem) |
| 51 | //{ |
| 52 | //} |
| 53 | |
| 54 | void LimitsCheckingTransform::transform(Chunk & chunk) |
| 55 | { |
| 56 | if (!info.started) |
| 57 | { |
| 58 | info.total_stopwatch.start(); |
| 59 | info.started = true; |
| 60 | } |
| 61 | |
| 62 | if (!checkTimeLimit()) |
| 63 | { |
| 64 | stopReading(); |
| 65 | return; |
| 66 | } |
| 67 | |
| 68 | if (chunk) |
| 69 | { |
| 70 | info.update(chunk); |
| 71 | |
| 72 | if (limits.mode == LimitsMode::LIMITS_CURRENT && |
| 73 | !limits.size_limits.check(info.rows, info.bytes, "result" , ErrorCodes::TOO_MANY_ROWS_OR_BYTES)) |
| 74 | stopReading(); |
| 75 | |
| 76 | if (quota) |
| 77 | checkQuota(chunk); |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | bool LimitsCheckingTransform::checkTimeLimit() |
| 82 | { |
| 83 | if (limits.speed_limits.max_execution_time != 0 |
| 84 | && info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000) |
| 85 | return handleOverflowMode(limits.timeout_overflow_mode, |
| 86 | "Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds()) |
| 87 | + " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0), |
| 88 | ErrorCodes::TIMEOUT_EXCEEDED); |
| 89 | |
| 90 | return true; |
| 91 | } |
| 92 | |
| 93 | void LimitsCheckingTransform::checkQuota(Chunk & chunk) |
| 94 | { |
| 95 | switch (limits.mode) |
| 96 | { |
| 97 | case LimitsMode::LIMITS_TOTAL: |
| 98 | /// Checked in `progress` method. |
| 99 | break; |
| 100 | |
| 101 | case LimitsMode::LIMITS_CURRENT: |
| 102 | { |
| 103 | UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds(); |
| 104 | quota->used({Quota::RESULT_ROWS, chunk.getNumRows()}, {Quota::RESULT_BYTES, chunk.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed}); |
| 105 | prev_elapsed = total_elapsed; |
| 106 | break; |
| 107 | } |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | } |
| 112 | |