1#pragma once
2#include <Processors/ISimpleTransform.h>
3#include <DataStreams/SizeLimits.h>
4#include <Poco/Timespan.h>
5#include <Interpreters/ProcessList.h>
6
7#include <DataStreams/IBlockOutputStream.h>
8
9namespace DB
10{
11
12/// Information for profiling.
13struct ProcessorProfileInfo
14{
15 bool started = false;
16 Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time
17
18 size_t rows = 0;
19 size_t blocks = 0;
20 size_t bytes = 0;
21
22 void update(const Chunk & block);
23};
24
25class LimitsCheckingTransform : public ISimpleTransform
26{
27public:
28
29 using LocalLimits = IBlockInputStream::LocalLimits;
30 using LimitsMode = IBlockInputStream::LimitsMode;
31
32 /// LIMITS_CURRENT
33 LimitsCheckingTransform(const Block & header_, LocalLimits limits_);
34 /// LIMITS_TOTAL
35 /// LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem);
36
37 String getName() const override { return "LimitsCheckingTransform"; }
38
39 void setQuota(const std::shared_ptr<QuotaContext> & quota_) { quota = quota_; }
40
41protected:
42 void transform(Chunk & chunk) override;
43
44private:
45 LocalLimits limits;
46
47 std::shared_ptr<QuotaContext> quota;
48 UInt64 prev_elapsed = 0;
49
50 ProcessorProfileInfo info;
51
52 bool checkTimeLimit();
53 void checkQuota(Chunk & chunk);
54};
55
56}
57