1 | #pragma once |
---|---|
2 | #include <Processors/ISimpleTransform.h> |
3 | #include <DataStreams/SizeLimits.h> |
4 | #include <Poco/Timespan.h> |
5 | #include <Interpreters/ProcessList.h> |
6 | |
7 | #include <DataStreams/IBlockOutputStream.h> |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | /// Information for profiling. |
13 | struct ProcessorProfileInfo |
14 | { |
15 | bool started = false; |
16 | Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time |
17 | |
18 | size_t rows = 0; |
19 | size_t blocks = 0; |
20 | size_t bytes = 0; |
21 | |
22 | void update(const Chunk & block); |
23 | }; |
24 | |
25 | class LimitsCheckingTransform : public ISimpleTransform |
26 | { |
27 | public: |
28 | |
29 | using LocalLimits = IBlockInputStream::LocalLimits; |
30 | using LimitsMode = IBlockInputStream::LimitsMode; |
31 | |
32 | /// LIMITS_CURRENT |
33 | LimitsCheckingTransform(const Block & header_, LocalLimits limits_); |
34 | /// LIMITS_TOTAL |
35 | /// LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem); |
36 | |
37 | String getName() const override { return "LimitsCheckingTransform"; } |
38 | |
39 | void setQuota(const std::shared_ptr<QuotaContext> & quota_) { quota = quota_; } |
40 | |
41 | protected: |
42 | void transform(Chunk & chunk) override; |
43 | |
44 | private: |
45 | LocalLimits limits; |
46 | |
47 | std::shared_ptr<QuotaContext> quota; |
48 | UInt64 prev_elapsed = 0; |
49 | |
50 | ProcessorProfileInfo info; |
51 | |
52 | bool checkTimeLimit(); |
53 | void checkQuota(Chunk & chunk); |
54 | }; |
55 | |
56 | } |
57 |