1#include <Processors/Sources/SourceWithProgress.h>
2
3#include <Interpreters/ProcessList.h>
4#include <Access/QuotaContext.h>
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int TOO_MANY_ROWS;
12 extern const int TOO_MANY_BYTES;
13}
14
15/// Aggregated copy-paste from IBlockInputStream::progressImpl.
16/// Most of this must be done in PipelineExecutor outside. Now it's done for compatibility with IBlockInputStream.
17void SourceWithProgress::progress(const Progress & value)
18{
19 if (total_rows_approx != 0)
20 {
21 Progress total_rows_progress = {0, 0, total_rows_approx};
22
23 if (progress_callback)
24 progress_callback(total_rows_progress);
25
26 if (process_list_elem)
27 process_list_elem->updateProgressIn(total_rows_progress);
28
29 total_rows_approx = 0;
30 }
31
32 if (progress_callback)
33 progress_callback(value);
34
35 if (process_list_elem)
36 {
37 if (!process_list_elem->updateProgressIn(value))
38 cancel();
39
40 /// The total amount of data processed or intended for processing in all sources, possibly on remote servers.
41
42 ProgressValues progress = process_list_elem->getProgressIn();
43 size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
44
45 /// Check the restrictions on the
46 /// * amount of data to read
47 /// * speed of the query
48 /// * quota on the amount of data to read
49 /// NOTE: Maybe it makes sense to have them checked directly in ProcessList?
50
51 if (limits.mode == LimitsMode::LIMITS_TOTAL)
52 {
53 if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
54 ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
55 cancel();
56 }
57
58 size_t total_rows = progress.total_rows_to_read;
59
60 constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
61 UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds();
62
63 if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
64 {
65 /// Should be done in PipelineExecutor.
66 /// It is here for compatibility with IBlockInputsStream.
67 CurrentThread::updatePerformanceCounters();
68 last_profile_events_update_time = total_elapsed_microseconds;
69 }
70
71 /// Should be done in PipelineExecutor.
72 /// It is here for compatibility with IBlockInputsStream.
73 limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
74
75 if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)
76 quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
77 }
78}
79
80}
81