1 | #include <Processors/Sources/SourceWithProgress.h> |
2 | |
3 | #include <Interpreters/ProcessList.h> |
4 | #include <Access/QuotaContext.h> |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | namespace ErrorCodes |
10 | { |
11 | extern const int TOO_MANY_ROWS; |
12 | extern const int TOO_MANY_BYTES; |
13 | } |
14 | |
15 | /// Aggregated copy-paste from IBlockInputStream::progressImpl. |
16 | /// Most of this must be done in PipelineExecutor outside. Now it's done for compatibility with IBlockInputStream. |
17 | void SourceWithProgress::progress(const Progress & value) |
18 | { |
19 | if (total_rows_approx != 0) |
20 | { |
21 | Progress total_rows_progress = {0, 0, total_rows_approx}; |
22 | |
23 | if (progress_callback) |
24 | progress_callback(total_rows_progress); |
25 | |
26 | if (process_list_elem) |
27 | process_list_elem->updateProgressIn(total_rows_progress); |
28 | |
29 | total_rows_approx = 0; |
30 | } |
31 | |
32 | if (progress_callback) |
33 | progress_callback(value); |
34 | |
35 | if (process_list_elem) |
36 | { |
37 | if (!process_list_elem->updateProgressIn(value)) |
38 | cancel(); |
39 | |
40 | /// The total amount of data processed or intended for processing in all sources, possibly on remote servers. |
41 | |
42 | ProgressValues progress = process_list_elem->getProgressIn(); |
43 | size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read); |
44 | |
45 | /// Check the restrictions on the |
46 | /// * amount of data to read |
47 | /// * speed of the query |
48 | /// * quota on the amount of data to read |
49 | /// NOTE: Maybe it makes sense to have them checked directly in ProcessList? |
50 | |
51 | if (limits.mode == LimitsMode::LIMITS_TOTAL) |
52 | { |
53 | if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read" , |
54 | ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) |
55 | cancel(); |
56 | } |
57 | |
58 | size_t total_rows = progress.total_rows_to_read; |
59 | |
60 | constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds |
61 | UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds(); |
62 | |
63 | if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) |
64 | { |
65 | /// Should be done in PipelineExecutor. |
66 | /// It is here for compatibility with IBlockInputsStream. |
67 | CurrentThread::updatePerformanceCounters(); |
68 | last_profile_events_update_time = total_elapsed_microseconds; |
69 | } |
70 | |
71 | /// Should be done in PipelineExecutor. |
72 | /// It is here for compatibility with IBlockInputsStream. |
73 | limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds); |
74 | |
75 | if (quota && limits.mode == LimitsMode::LIMITS_TOTAL) |
76 | quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes}); |
77 | } |
78 | } |
79 | |
80 | } |
81 | |