1#include <Processors/Transforms/LimitsCheckingTransform.h>
2#include <Access/QuotaContext.h>
3
4namespace DB
5{
6
7namespace ErrorCodes
8{
9 extern const int TOO_MANY_ROWS;
10 extern const int TOO_MANY_BYTES;
11 extern const int TOO_MANY_ROWS_OR_BYTES;
12 extern const int TIMEOUT_EXCEEDED;
13 extern const int TOO_SLOW;
14 extern const int LOGICAL_ERROR;
15 extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
16 extern const int TOO_DEEP_PIPELINE;
17}
18
19
20static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
21{
22 switch (mode)
23 {
24 case OverflowMode::THROW:
25 throw Exception(message, code);
26 case OverflowMode::BREAK:
27 return false;
28 default:
29 throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
30 }
31}
32
33
34void ProcessorProfileInfo::update(const Chunk & block)
35{
36 ++blocks;
37 rows += block.getNumRows();
38 bytes += block.bytes();
39}
40
41LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, LocalLimits limits_)
42 : ISimpleTransform(header_, header_, false)
43 , limits(std::move(limits_))
44{
45}
46
47//LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem)
48// : ISimpleTransform(header, header, false)
49// , limits(std::move(limits))
50// , process_list_elem(process_list_elem)
51//{
52//}
53
54void LimitsCheckingTransform::transform(Chunk & chunk)
55{
56 if (!info.started)
57 {
58 info.total_stopwatch.start();
59 info.started = true;
60 }
61
62 if (!checkTimeLimit())
63 {
64 stopReading();
65 return;
66 }
67
68 if (chunk)
69 {
70 info.update(chunk);
71
72 if (limits.mode == LimitsMode::LIMITS_CURRENT &&
73 !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
74 stopReading();
75
76 if (quota)
77 checkQuota(chunk);
78 }
79}
80
81bool LimitsCheckingTransform::checkTimeLimit()
82{
83 if (limits.speed_limits.max_execution_time != 0
84 && info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
85 return handleOverflowMode(limits.timeout_overflow_mode,
86 "Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
87 + " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
88 ErrorCodes::TIMEOUT_EXCEEDED);
89
90 return true;
91}
92
93void LimitsCheckingTransform::checkQuota(Chunk & chunk)
94{
95 switch (limits.mode)
96 {
97 case LimitsMode::LIMITS_TOTAL:
98 /// Checked in `progress` method.
99 break;
100
101 case LimitsMode::LIMITS_CURRENT:
102 {
103 UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
104 quota->used({Quota::RESULT_ROWS, chunk.getNumRows()}, {Quota::RESULT_BYTES, chunk.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
105 prev_elapsed = total_elapsed;
106 break;
107 }
108 }
109}
110
111}
112