1 | #include <Processors/Transforms/LimitsCheckingTransform.h> |
2 | #include <Access/QuotaContext.h> |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | namespace ErrorCodes |
8 | { |
9 | extern const int TOO_MANY_ROWS; |
10 | extern const int TOO_MANY_BYTES; |
11 | extern const int TOO_MANY_ROWS_OR_BYTES; |
12 | extern const int TIMEOUT_EXCEEDED; |
13 | extern const int TOO_SLOW; |
14 | extern const int LOGICAL_ERROR; |
15 | extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; |
16 | extern const int TOO_DEEP_PIPELINE; |
17 | } |
18 | |
19 | |
20 | static bool handleOverflowMode(OverflowMode mode, const String & message, int code) |
21 | { |
22 | switch (mode) |
23 | { |
24 | case OverflowMode::THROW: |
25 | throw Exception(message, code); |
26 | case OverflowMode::BREAK: |
27 | return false; |
28 | default: |
29 | throw Exception("Logical error: unknown overflow mode" , ErrorCodes::LOGICAL_ERROR); |
30 | } |
31 | } |
32 | |
33 | |
34 | void ProcessorProfileInfo::update(const Chunk & block) |
35 | { |
36 | ++blocks; |
37 | rows += block.getNumRows(); |
38 | bytes += block.bytes(); |
39 | } |
40 | |
41 | LimitsCheckingTransform::LimitsCheckingTransform(const Block & , LocalLimits limits_) |
42 | : ISimpleTransform(header_, header_, false) |
43 | , limits(std::move(limits_)) |
44 | { |
45 | } |
46 | |
47 | //LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem) |
48 | // : ISimpleTransform(header, header, false) |
49 | // , limits(std::move(limits)) |
50 | // , process_list_elem(process_list_elem) |
51 | //{ |
52 | //} |
53 | |
54 | void LimitsCheckingTransform::transform(Chunk & chunk) |
55 | { |
56 | if (!info.started) |
57 | { |
58 | info.total_stopwatch.start(); |
59 | info.started = true; |
60 | } |
61 | |
62 | if (!checkTimeLimit()) |
63 | { |
64 | stopReading(); |
65 | return; |
66 | } |
67 | |
68 | if (chunk) |
69 | { |
70 | info.update(chunk); |
71 | |
72 | if (limits.mode == LimitsMode::LIMITS_CURRENT && |
73 | !limits.size_limits.check(info.rows, info.bytes, "result" , ErrorCodes::TOO_MANY_ROWS_OR_BYTES)) |
74 | stopReading(); |
75 | |
76 | if (quota) |
77 | checkQuota(chunk); |
78 | } |
79 | } |
80 | |
81 | bool LimitsCheckingTransform::checkTimeLimit() |
82 | { |
83 | if (limits.speed_limits.max_execution_time != 0 |
84 | && info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000) |
85 | return handleOverflowMode(limits.timeout_overflow_mode, |
86 | "Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds()) |
87 | + " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0), |
88 | ErrorCodes::TIMEOUT_EXCEEDED); |
89 | |
90 | return true; |
91 | } |
92 | |
93 | void LimitsCheckingTransform::checkQuota(Chunk & chunk) |
94 | { |
95 | switch (limits.mode) |
96 | { |
97 | case LimitsMode::LIMITS_TOTAL: |
98 | /// Checked in `progress` method. |
99 | break; |
100 | |
101 | case LimitsMode::LIMITS_CURRENT: |
102 | { |
103 | UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds(); |
104 | quota->used({Quota::RESULT_ROWS, chunk.getNumRows()}, {Quota::RESULT_BYTES, chunk.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed}); |
105 | prev_elapsed = total_elapsed; |
106 | break; |
107 | } |
108 | } |
109 | } |
110 | |
111 | } |
112 | |