| 1 | #pragma once |
| 2 | |
| 3 | #include <atomic> |
| 4 | #include <cstddef> |
| 5 | #include <common/Types.h> |
| 6 | |
| 7 | #include <Core/Defines.h> |
| 8 | |
| 9 | namespace DB |
| 10 | { |
| 11 | |
| 12 | class ReadBuffer; |
| 13 | class WriteBuffer; |
| 14 | |
| 15 | /// See Progress. |
| 16 | struct ProgressValues |
| 17 | { |
| 18 | size_t read_rows; |
| 19 | size_t read_bytes; |
| 20 | size_t total_rows_to_read; |
| 21 | size_t written_rows; |
| 22 | size_t written_bytes; |
| 23 | |
| 24 | void read(ReadBuffer & in, UInt64 server_revision); |
| 25 | void write(WriteBuffer & out, UInt64 client_revision) const; |
| 26 | void writeJSON(WriteBuffer & out) const; |
| 27 | }; |
| 28 | |
| 29 | struct ReadProgress |
| 30 | { |
| 31 | size_t read_rows; |
| 32 | size_t read_bytes; |
| 33 | size_t total_rows_to_read; |
| 34 | |
| 35 | ReadProgress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0) |
| 36 | : read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {} |
| 37 | }; |
| 38 | |
| 39 | struct WriteProgress |
| 40 | { |
| 41 | size_t written_rows; |
| 42 | size_t written_bytes; |
| 43 | |
| 44 | WriteProgress(size_t written_rows_, size_t written_bytes_) |
| 45 | : written_rows(written_rows_), written_bytes(written_bytes_) {} |
| 46 | }; |
| 47 | |
| 48 | /** Progress of query execution. |
| 49 | * Values, transferred over network are deltas - how much was done after previously sent value. |
| 50 | * The same struct is also used for summarized values. |
| 51 | */ |
| 52 | struct Progress |
| 53 | { |
| 54 | std::atomic<size_t> read_rows {0}; /// Rows (source) processed. |
| 55 | std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed. |
| 56 | |
| 57 | /** How much rows must be processed, in total, approximately. Non-zero value is sent when there is information about some new part of job. |
| 58 | * Received values must be summed to get estimate of total rows to process. |
| 59 | * Used for rendering progress bar on client. |
| 60 | */ |
| 61 | std::atomic<size_t> total_rows_to_read {0}; |
| 62 | |
| 63 | |
| 64 | std::atomic<size_t> written_rows {0}; |
| 65 | std::atomic<size_t> written_bytes {0}; |
| 66 | |
| 67 | Progress() {} |
| 68 | Progress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0) |
| 69 | : read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {} |
| 70 | Progress(ReadProgress read_progress) |
| 71 | : read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {} |
| 72 | Progress(WriteProgress write_progress) |
| 73 | : written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {} |
| 74 | |
| 75 | void read(ReadBuffer & in, UInt64 server_revision); |
| 76 | void write(WriteBuffer & out, UInt64 client_revision) const; |
| 77 | /// Progress in JSON format (single line, without whitespaces) is used in HTTP headers. |
| 78 | void writeJSON(WriteBuffer & out) const; |
| 79 | |
| 80 | /// Each value separately is changed atomically (but not whole object). |
| 81 | bool incrementPiecewiseAtomically(const Progress & rhs) |
| 82 | { |
| 83 | read_rows += rhs.read_rows; |
| 84 | read_bytes += rhs.read_bytes; |
| 85 | total_rows_to_read += rhs.total_rows_to_read; |
| 86 | written_rows += rhs.written_rows; |
| 87 | written_bytes += rhs.written_bytes; |
| 88 | |
| 89 | return rhs.read_rows || rhs.written_rows ? true : false; |
| 90 | } |
| 91 | |
| 92 | void reset() |
| 93 | { |
| 94 | read_rows = 0; |
| 95 | read_bytes = 0; |
| 96 | total_rows_to_read = 0; |
| 97 | written_rows = 0; |
| 98 | written_bytes = 0; |
| 99 | } |
| 100 | |
| 101 | ProgressValues getValues() const |
| 102 | { |
| 103 | ProgressValues res; |
| 104 | |
| 105 | res.read_rows = read_rows.load(std::memory_order_relaxed); |
| 106 | res.read_bytes = read_bytes.load(std::memory_order_relaxed); |
| 107 | res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed); |
| 108 | res.written_rows = written_rows.load(std::memory_order_relaxed); |
| 109 | res.written_bytes = written_bytes.load(std::memory_order_relaxed); |
| 110 | |
| 111 | return res; |
| 112 | } |
| 113 | |
| 114 | ProgressValues fetchAndResetPiecewiseAtomically() |
| 115 | { |
| 116 | ProgressValues res; |
| 117 | |
| 118 | res.read_rows = read_rows.fetch_and(0); |
| 119 | res.read_bytes = read_bytes.fetch_and(0); |
| 120 | res.total_rows_to_read = total_rows_to_read.fetch_and(0); |
| 121 | res.written_rows = written_rows.fetch_and(0); |
| 122 | res.written_bytes = written_bytes.fetch_and(0); |
| 123 | |
| 124 | return res; |
| 125 | } |
| 126 | |
| 127 | Progress & operator=(Progress && other) |
| 128 | { |
| 129 | read_rows = other.read_rows.load(std::memory_order_relaxed); |
| 130 | read_bytes = other.read_bytes.load(std::memory_order_relaxed); |
| 131 | total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed); |
| 132 | written_rows = other.written_rows.load(std::memory_order_relaxed); |
| 133 | written_bytes = other.written_bytes.load(std::memory_order_relaxed); |
| 134 | |
| 135 | return *this; |
| 136 | } |
| 137 | |
| 138 | Progress(Progress && other) |
| 139 | { |
| 140 | *this = std::move(other); |
| 141 | } |
| 142 | }; |
| 143 | |
| 144 | } |
| 145 | |