1#pragma once
2#include <Processors/ISource.h>
3#include <DataStreams/IBlockInputStream.h>
4#include <Common/Stopwatch.h>
5
6namespace DB
7{
8
9/// Adds progress to ISource.
10/// This class takes care of limits, quotas, callback on progress and updating performance counters for current thread.
11class ISourceWithProgress : public ISource
12{
13public:
14 using ISource::ISource;
15
16 using LocalLimits = IBlockInputStream::LocalLimits;
17 using LimitsMode = IBlockInputStream::LimitsMode;
18
19 /// Set limitations that checked on each chunk.
20 virtual void setLimits(const LocalLimits & limits_) = 0;
21
22 /// Set the quota. If you set a quota on the amount of raw data,
23 /// then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
24 virtual void setQuota(const std::shared_ptr<QuotaContext> & quota_) = 0;
25
26 /// Set the pointer to the process list item.
27 /// General information about the resources spent on the request will be written into it.
28 /// Based on this information, the quota and some restrictions will be checked.
29 /// This information will also be available in the SHOW PROCESSLIST request.
30 virtual void setProcessListElement(QueryStatus * elem) = 0;
31
32 /// Set the execution progress bar callback.
33 /// It is called after each chunk.
34 /// The function takes the number of rows in the last chunk, the number of bytes in the last chunk.
35 /// Note that the callback can be called from different threads.
36 virtual void setProgressCallback(const ProgressCallback & callback) = 0;
37
38 /// Set the approximate total number of rows to read.
39 virtual void addTotalRowsApprox(size_t value) = 0;
40};
41
42/// Implementation for ISourceWithProgress
43class SourceWithProgress : public ISourceWithProgress
44{
45public:
46 using ISourceWithProgress::ISourceWithProgress;
47
48 using LocalLimits = IBlockInputStream::LocalLimits;
49 using LimitsMode = IBlockInputStream::LimitsMode;
50
51 void setLimits(const LocalLimits & limits_) final { limits = limits_; }
52 void setQuota(const std::shared_ptr<QuotaContext> & quota_) final { quota = quota_; }
53 void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; }
54 void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
55 void addTotalRowsApprox(size_t value) final { total_rows_approx += value; }
56
57protected:
58 /// Call this method to provide information about progress.
59 void progress(const Progress & value);
60
61private:
62 LocalLimits limits;
63 std::shared_ptr<QuotaContext> quota;
64 ProgressCallback progress_callback;
65 QueryStatus * process_list_elem = nullptr;
66
67 /// The approximate total number of rows to read. For progress bar.
68 size_t total_rows_approx = 0;
69
70 Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time.
71 /// According to total_stopwatch in microseconds.
72 UInt64 last_profile_events_update_time = 0;
73};
74
75}
76