1#pragma once
2
3#include <atomic>
4#include <cstddef>
5#include <common/Types.h>
6
7#include <Core/Defines.h>
8
9namespace DB
10{
11
12class ReadBuffer;
13class WriteBuffer;
14
15/// See Progress.
16struct ProgressValues
17{
18 size_t read_rows;
19 size_t read_bytes;
20 size_t total_rows_to_read;
21 size_t written_rows;
22 size_t written_bytes;
23
24 void read(ReadBuffer & in, UInt64 server_revision);
25 void write(WriteBuffer & out, UInt64 client_revision) const;
26 void writeJSON(WriteBuffer & out) const;
27};
28
29struct ReadProgress
30{
31 size_t read_rows;
32 size_t read_bytes;
33 size_t total_rows_to_read;
34
35 ReadProgress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
36 : read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
37};
38
39struct WriteProgress
40{
41 size_t written_rows;
42 size_t written_bytes;
43
44 WriteProgress(size_t written_rows_, size_t written_bytes_)
45 : written_rows(written_rows_), written_bytes(written_bytes_) {}
46};
47
48/** Progress of query execution.
49 * Values, transferred over network are deltas - how much was done after previously sent value.
50 * The same struct is also used for summarized values.
51 */
52struct Progress
53{
54 std::atomic<size_t> read_rows {0}; /// Rows (source) processed.
55 std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed.
56
57 /** How much rows must be processed, in total, approximately. Non-zero value is sent when there is information about some new part of job.
58 * Received values must be summed to get estimate of total rows to process.
59 * Used for rendering progress bar on client.
60 */
61 std::atomic<size_t> total_rows_to_read {0};
62
63
64 std::atomic<size_t> written_rows {0};
65 std::atomic<size_t> written_bytes {0};
66
67 Progress() {}
68 Progress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
69 : read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
70 Progress(ReadProgress read_progress)
71 : read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {}
72 Progress(WriteProgress write_progress)
73 : written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {}
74
75 void read(ReadBuffer & in, UInt64 server_revision);
76 void write(WriteBuffer & out, UInt64 client_revision) const;
77 /// Progress in JSON format (single line, without whitespaces) is used in HTTP headers.
78 void writeJSON(WriteBuffer & out) const;
79
80 /// Each value separately is changed atomically (but not whole object).
81 bool incrementPiecewiseAtomically(const Progress & rhs)
82 {
83 read_rows += rhs.read_rows;
84 read_bytes += rhs.read_bytes;
85 total_rows_to_read += rhs.total_rows_to_read;
86 written_rows += rhs.written_rows;
87 written_bytes += rhs.written_bytes;
88
89 return rhs.read_rows || rhs.written_rows ? true : false;
90 }
91
92 void reset()
93 {
94 read_rows = 0;
95 read_bytes = 0;
96 total_rows_to_read = 0;
97 written_rows = 0;
98 written_bytes = 0;
99 }
100
101 ProgressValues getValues() const
102 {
103 ProgressValues res;
104
105 res.read_rows = read_rows.load(std::memory_order_relaxed);
106 res.read_bytes = read_bytes.load(std::memory_order_relaxed);
107 res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed);
108 res.written_rows = written_rows.load(std::memory_order_relaxed);
109 res.written_bytes = written_bytes.load(std::memory_order_relaxed);
110
111 return res;
112 }
113
114 ProgressValues fetchAndResetPiecewiseAtomically()
115 {
116 ProgressValues res;
117
118 res.read_rows = read_rows.fetch_and(0);
119 res.read_bytes = read_bytes.fetch_and(0);
120 res.total_rows_to_read = total_rows_to_read.fetch_and(0);
121 res.written_rows = written_rows.fetch_and(0);
122 res.written_bytes = written_bytes.fetch_and(0);
123
124 return res;
125 }
126
127 Progress & operator=(Progress && other)
128 {
129 read_rows = other.read_rows.load(std::memory_order_relaxed);
130 read_bytes = other.read_bytes.load(std::memory_order_relaxed);
131 total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
132 written_rows = other.written_rows.load(std::memory_order_relaxed);
133 written_bytes = other.written_bytes.load(std::memory_order_relaxed);
134
135 return *this;
136 }
137
138 Progress(Progress && other)
139 {
140 *this = std::move(other);
141 }
142};
143
144}
145