1#include <DataStreams/BlockStreamProfileInfo.h>
2#include <DataStreams/IBlockInputStream.h>
3
4#include <IO/ReadHelpers.h>
5#include <IO/WriteHelpers.h>
6
7#include <Core/Block.h>
8
9namespace DB
10{
11
12void BlockStreamProfileInfo::read(ReadBuffer & in)
13{
14 readVarUInt(rows, in);
15 readVarUInt(blocks, in);
16 readVarUInt(bytes, in);
17 readBinary(applied_limit, in);
18 readVarUInt(rows_before_limit, in);
19 readBinary(calculated_rows_before_limit, in);
20}
21
22
23void BlockStreamProfileInfo::write(WriteBuffer & out) const
24{
25 writeVarUInt(rows, out);
26 writeVarUInt(blocks, out);
27 writeVarUInt(bytes, out);
28 writeBinary(hasAppliedLimit(), out);
29 writeVarUInt(getRowsBeforeLimit(), out);
30 writeBinary(calculated_rows_before_limit, out);
31}
32
33
34void BlockStreamProfileInfo::setFrom(const BlockStreamProfileInfo & rhs, bool skip_block_size_info)
35{
36 if (!skip_block_size_info)
37 {
38 rows = rhs.rows;
39 blocks = rhs.blocks;
40 bytes = rhs.bytes;
41 }
42 applied_limit = rhs.applied_limit;
43 rows_before_limit = rhs.rows_before_limit;
44 calculated_rows_before_limit = rhs.calculated_rows_before_limit;
45}
46
47
48size_t BlockStreamProfileInfo::getRowsBeforeLimit() const
49{
50 if (!calculated_rows_before_limit)
51 calculateRowsBeforeLimit();
52 return rows_before_limit;
53}
54
55
56bool BlockStreamProfileInfo::hasAppliedLimit() const
57{
58 if (!calculated_rows_before_limit)
59 calculateRowsBeforeLimit();
60 return applied_limit;
61}
62
63
64void BlockStreamProfileInfo::update(Block & block)
65{
66 ++blocks;
67 rows += block.rows();
68 bytes += block.bytes();
69}
70
71
72void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const
73{
74 if (!parent)
75 return;
76
77 if (parent->getName() == name)
78 {
79 res.push_back(this);
80 return;
81 }
82
83 parent->forEachChild([&] (IBlockInputStream & child)
84 {
85 child.getProfileInfo().collectInfosForStreamsWithName(name, res);
86 return false;
87 });
88}
89
90
91void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
92{
93 calculated_rows_before_limit = true;
94
95 /// is there a Limit?
96 BlockStreamProfileInfos limits;
97 collectInfosForStreamsWithName("Limit", limits);
98
99 if (!limits.empty())
100 {
101 applied_limit = true;
102
103 /** Take the number of lines read below `PartialSorting`, if any, or below `Limit`.
104 * This is necessary, because sorting can return only part of the rows.
105 */
106 BlockStreamProfileInfos partial_sortings;
107 collectInfosForStreamsWithName("PartialSorting", partial_sortings);
108
109 BlockStreamProfileInfos & limits_or_sortings = partial_sortings.empty() ? limits : partial_sortings;
110
111 for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
112 {
113 info_limit_or_sort->parent->forEachChild([&] (IBlockInputStream & child)
114 {
115 rows_before_limit += child.getProfileInfo().rows;
116 return false;
117 });
118 }
119 }
120 else
121 {
122 /// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server).
123 BlockStreamProfileInfos remotes;
124 collectInfosForStreamsWithName("Remote", remotes);
125
126 if (remotes.empty())
127 return;
128
129 for (const auto & info : remotes)
130 {
131 if (info->applied_limit)
132 {
133 applied_limit = true;
134 rows_before_limit += info->rows_before_limit;
135 }
136 }
137 }
138}
139
140}
141