| 1 | #include <Processors/Sources/SourceWithProgress.h> | 
|---|
| 2 |  | 
|---|
| 3 | #include <Interpreters/ProcessList.h> | 
|---|
| 4 | #include <Access/QuotaContext.h> | 
|---|
| 5 |  | 
|---|
| 6 | namespace DB | 
|---|
| 7 | { | 
|---|
| 8 |  | 
|---|
| 9 | namespace ErrorCodes | 
|---|
| 10 | { | 
|---|
| 11 | extern const int TOO_MANY_ROWS; | 
|---|
| 12 | extern const int TOO_MANY_BYTES; | 
|---|
| 13 | } | 
|---|
| 14 |  | 
|---|
| 15 | /// Aggregated copy-paste from IBlockInputStream::progressImpl. | 
|---|
| 16 | /// Most of this must be done in PipelineExecutor outside. Now it's done for compatibility with IBlockInputStream. | 
|---|
| 17 | void SourceWithProgress::progress(const Progress & value) | 
|---|
| 18 | { | 
|---|
| 19 | if (total_rows_approx != 0) | 
|---|
| 20 | { | 
|---|
| 21 | Progress total_rows_progress = {0, 0, total_rows_approx}; | 
|---|
| 22 |  | 
|---|
| 23 | if (progress_callback) | 
|---|
| 24 | progress_callback(total_rows_progress); | 
|---|
| 25 |  | 
|---|
| 26 | if (process_list_elem) | 
|---|
| 27 | process_list_elem->updateProgressIn(total_rows_progress); | 
|---|
| 28 |  | 
|---|
| 29 | total_rows_approx = 0; | 
|---|
| 30 | } | 
|---|
| 31 |  | 
|---|
| 32 | if (progress_callback) | 
|---|
| 33 | progress_callback(value); | 
|---|
| 34 |  | 
|---|
| 35 | if (process_list_elem) | 
|---|
| 36 | { | 
|---|
| 37 | if (!process_list_elem->updateProgressIn(value)) | 
|---|
| 38 | cancel(); | 
|---|
| 39 |  | 
|---|
| 40 | /// The total amount of data processed or intended for processing in all sources, possibly on remote servers. | 
|---|
| 41 |  | 
|---|
| 42 | ProgressValues progress = process_list_elem->getProgressIn(); | 
|---|
| 43 | size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read); | 
|---|
| 44 |  | 
|---|
| 45 | /// Check the restrictions on the | 
|---|
| 46 | ///  * amount of data to read | 
|---|
| 47 | ///  * speed of the query | 
|---|
| 48 | ///  * quota on the amount of data to read | 
|---|
| 49 | /// NOTE: Maybe it makes sense to have them checked directly in ProcessList? | 
|---|
| 50 |  | 
|---|
| 51 | if (limits.mode == LimitsMode::LIMITS_TOTAL) | 
|---|
| 52 | { | 
|---|
| 53 | if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read", | 
|---|
| 54 | ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) | 
|---|
| 55 | cancel(); | 
|---|
| 56 | } | 
|---|
| 57 |  | 
|---|
| 58 | size_t total_rows = progress.total_rows_to_read; | 
|---|
| 59 |  | 
|---|
| 60 | constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds | 
|---|
| 61 | UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds(); | 
|---|
| 62 |  | 
|---|
| 63 | if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) | 
|---|
| 64 | { | 
|---|
| 65 | /// Should be done in PipelineExecutor. | 
|---|
| 66 | /// It is here for compatibility with IBlockInputsStream. | 
|---|
| 67 | CurrentThread::updatePerformanceCounters(); | 
|---|
| 68 | last_profile_events_update_time = total_elapsed_microseconds; | 
|---|
| 69 | } | 
|---|
| 70 |  | 
|---|
| 71 | /// Should be done in PipelineExecutor. | 
|---|
| 72 | /// It is here for compatibility with IBlockInputsStream. | 
|---|
| 73 | limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds); | 
|---|
| 74 |  | 
|---|
| 75 | if (quota && limits.mode == LimitsMode::LIMITS_TOTAL) | 
|---|
| 76 | quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes}); | 
|---|
| 77 | } | 
|---|
| 78 | } | 
|---|
| 79 |  | 
|---|
| 80 | } | 
|---|
| 81 |  | 
|---|