1 | #pragma once |
2 | |
3 | #include <atomic> |
4 | #include <cstddef> |
5 | #include <common/Types.h> |
6 | |
7 | #include <Core/Defines.h> |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | class ReadBuffer; |
13 | class WriteBuffer; |
14 | |
15 | /// See Progress. |
16 | struct ProgressValues |
17 | { |
18 | size_t read_rows; |
19 | size_t read_bytes; |
20 | size_t total_rows_to_read; |
21 | size_t written_rows; |
22 | size_t written_bytes; |
23 | |
24 | void read(ReadBuffer & in, UInt64 server_revision); |
25 | void write(WriteBuffer & out, UInt64 client_revision) const; |
26 | void writeJSON(WriteBuffer & out) const; |
27 | }; |
28 | |
29 | struct ReadProgress |
30 | { |
31 | size_t read_rows; |
32 | size_t read_bytes; |
33 | size_t total_rows_to_read; |
34 | |
35 | ReadProgress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0) |
36 | : read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {} |
37 | }; |
38 | |
39 | struct WriteProgress |
40 | { |
41 | size_t written_rows; |
42 | size_t written_bytes; |
43 | |
44 | WriteProgress(size_t written_rows_, size_t written_bytes_) |
45 | : written_rows(written_rows_), written_bytes(written_bytes_) {} |
46 | }; |
47 | |
48 | /** Progress of query execution. |
49 | * Values, transferred over network are deltas - how much was done after previously sent value. |
50 | * The same struct is also used for summarized values. |
51 | */ |
52 | struct Progress |
53 | { |
54 | std::atomic<size_t> read_rows {0}; /// Rows (source) processed. |
55 | std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed. |
56 | |
57 | /** How much rows must be processed, in total, approximately. Non-zero value is sent when there is information about some new part of job. |
58 | * Received values must be summed to get estimate of total rows to process. |
59 | * Used for rendering progress bar on client. |
60 | */ |
61 | std::atomic<size_t> total_rows_to_read {0}; |
62 | |
63 | |
64 | std::atomic<size_t> written_rows {0}; |
65 | std::atomic<size_t> written_bytes {0}; |
66 | |
67 | Progress() {} |
68 | Progress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0) |
69 | : read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {} |
70 | Progress(ReadProgress read_progress) |
71 | : read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {} |
72 | Progress(WriteProgress write_progress) |
73 | : written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {} |
74 | |
75 | void read(ReadBuffer & in, UInt64 server_revision); |
76 | void write(WriteBuffer & out, UInt64 client_revision) const; |
77 | /// Progress in JSON format (single line, without whitespaces) is used in HTTP headers. |
78 | void writeJSON(WriteBuffer & out) const; |
79 | |
80 | /// Each value separately is changed atomically (but not whole object). |
81 | bool incrementPiecewiseAtomically(const Progress & rhs) |
82 | { |
83 | read_rows += rhs.read_rows; |
84 | read_bytes += rhs.read_bytes; |
85 | total_rows_to_read += rhs.total_rows_to_read; |
86 | written_rows += rhs.written_rows; |
87 | written_bytes += rhs.written_bytes; |
88 | |
89 | return rhs.read_rows || rhs.written_rows ? true : false; |
90 | } |
91 | |
92 | void reset() |
93 | { |
94 | read_rows = 0; |
95 | read_bytes = 0; |
96 | total_rows_to_read = 0; |
97 | written_rows = 0; |
98 | written_bytes = 0; |
99 | } |
100 | |
101 | ProgressValues getValues() const |
102 | { |
103 | ProgressValues res; |
104 | |
105 | res.read_rows = read_rows.load(std::memory_order_relaxed); |
106 | res.read_bytes = read_bytes.load(std::memory_order_relaxed); |
107 | res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed); |
108 | res.written_rows = written_rows.load(std::memory_order_relaxed); |
109 | res.written_bytes = written_bytes.load(std::memory_order_relaxed); |
110 | |
111 | return res; |
112 | } |
113 | |
114 | ProgressValues fetchAndResetPiecewiseAtomically() |
115 | { |
116 | ProgressValues res; |
117 | |
118 | res.read_rows = read_rows.fetch_and(0); |
119 | res.read_bytes = read_bytes.fetch_and(0); |
120 | res.total_rows_to_read = total_rows_to_read.fetch_and(0); |
121 | res.written_rows = written_rows.fetch_and(0); |
122 | res.written_bytes = written_bytes.fetch_and(0); |
123 | |
124 | return res; |
125 | } |
126 | |
127 | Progress & operator=(Progress && other) |
128 | { |
129 | read_rows = other.read_rows.load(std::memory_order_relaxed); |
130 | read_bytes = other.read_bytes.load(std::memory_order_relaxed); |
131 | total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed); |
132 | written_rows = other.written_rows.load(std::memory_order_relaxed); |
133 | written_bytes = other.written_bytes.load(std::memory_order_relaxed); |
134 | |
135 | return *this; |
136 | } |
137 | |
138 | Progress(Progress && other) |
139 | { |
140 | *this = std::move(other); |
141 | } |
142 | }; |
143 | |
144 | } |
145 | |