| 1 | #include <Processors/Sources/SourceWithProgress.h> |
| 2 | |
| 3 | #include <Interpreters/ProcessList.h> |
| 4 | #include <Access/QuotaContext.h> |
| 5 | |
| 6 | namespace DB |
| 7 | { |
| 8 | |
| 9 | namespace ErrorCodes |
| 10 | { |
| 11 | extern const int TOO_MANY_ROWS; |
| 12 | extern const int TOO_MANY_BYTES; |
| 13 | } |
| 14 | |
| 15 | /// Aggregated copy-paste from IBlockInputStream::progressImpl. |
| 16 | /// Most of this must be done in PipelineExecutor outside. Now it's done for compatibility with IBlockInputStream. |
| 17 | void SourceWithProgress::progress(const Progress & value) |
| 18 | { |
| 19 | if (total_rows_approx != 0) |
| 20 | { |
| 21 | Progress total_rows_progress = {0, 0, total_rows_approx}; |
| 22 | |
| 23 | if (progress_callback) |
| 24 | progress_callback(total_rows_progress); |
| 25 | |
| 26 | if (process_list_elem) |
| 27 | process_list_elem->updateProgressIn(total_rows_progress); |
| 28 | |
| 29 | total_rows_approx = 0; |
| 30 | } |
| 31 | |
| 32 | if (progress_callback) |
| 33 | progress_callback(value); |
| 34 | |
| 35 | if (process_list_elem) |
| 36 | { |
| 37 | if (!process_list_elem->updateProgressIn(value)) |
| 38 | cancel(); |
| 39 | |
| 40 | /// The total amount of data processed or intended for processing in all sources, possibly on remote servers. |
| 41 | |
| 42 | ProgressValues progress = process_list_elem->getProgressIn(); |
| 43 | size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read); |
| 44 | |
| 45 | /// Check the restrictions on the |
| 46 | /// * amount of data to read |
| 47 | /// * speed of the query |
| 48 | /// * quota on the amount of data to read |
| 49 | /// NOTE: Maybe it makes sense to have them checked directly in ProcessList? |
| 50 | |
| 51 | if (limits.mode == LimitsMode::LIMITS_TOTAL) |
| 52 | { |
| 53 | if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read" , |
| 54 | ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) |
| 55 | cancel(); |
| 56 | } |
| 57 | |
| 58 | size_t total_rows = progress.total_rows_to_read; |
| 59 | |
| 60 | constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds |
| 61 | UInt64 total_elapsed_microseconds = total_stopwatch.elapsedMicroseconds(); |
| 62 | |
| 63 | if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds) |
| 64 | { |
| 65 | /// Should be done in PipelineExecutor. |
| 66 | /// It is here for compatibility with IBlockInputsStream. |
| 67 | CurrentThread::updatePerformanceCounters(); |
| 68 | last_profile_events_update_time = total_elapsed_microseconds; |
| 69 | } |
| 70 | |
| 71 | /// Should be done in PipelineExecutor. |
| 72 | /// It is here for compatibility with IBlockInputsStream. |
| 73 | limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds); |
| 74 | |
| 75 | if (quota && limits.mode == LimitsMode::LIMITS_TOTAL) |
| 76 | quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes}); |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | } |
| 81 | |