1 | #include <algorithm> |
2 | |
3 | #include <DataStreams/LimitBlockInputStream.h> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | /// gets pointers to all columns of block, which were used for ORDER BY |
10 | static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description) |
11 | { |
12 | size_t size = description.size(); |
13 | ColumnRawPtrs res; |
14 | res.reserve(size); |
15 | |
16 | for (size_t i = 0; i < size; ++i) |
17 | { |
18 | const IColumn * column = !description[i].column_name.empty() |
19 | ? block.getByName(description[i].column_name).column.get() |
20 | : block.safeGetByPosition(description[i].column_number).column.get(); |
21 | res.emplace_back(column); |
22 | } |
23 | |
24 | return res; |
25 | } |
26 | |
27 | |
28 | LimitBlockInputStream::LimitBlockInputStream( |
29 | const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, |
30 | bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_) |
31 | : limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_) |
32 | , description(description_) |
33 | { |
34 | if (use_limit_as_total_rows_approx) |
35 | { |
36 | addTotalRowsApprox(static_cast<size_t>(limit)); |
37 | } |
38 | |
39 | children.push_back(input); |
40 | } |
41 | |
42 | Block LimitBlockInputStream::readImpl() |
43 | { |
44 | Block res; |
45 | UInt64 rows = 0; |
46 | |
47 | /// pos >= offset + limit and all rows in the end of previous block were equal |
48 | /// to row at 'limit' position. So we check current block. |
49 | if (!ties_row_ref.empty() && pos >= offset + limit) |
50 | { |
51 | res = children.back()->read(); |
52 | rows = res.rows(); |
53 | |
54 | if (!res) |
55 | return res; |
56 | |
57 | SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); |
58 | ptr->sort_columns = extractSortColumns(*ptr, description); |
59 | |
60 | UInt64 len; |
61 | for (len = 0; len < rows; ++len) |
62 | { |
63 | SharedBlockRowRef current_row; |
64 | current_row.set(ptr, &ptr->sort_columns, len); |
65 | |
66 | if (current_row != ties_row_ref) |
67 | { |
68 | ties_row_ref.reset(); |
69 | break; |
70 | } |
71 | } |
72 | |
73 | if (len < rows) |
74 | { |
75 | for (size_t i = 0; i < ptr->columns(); ++i) |
76 | ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len); |
77 | } |
78 | |
79 | return *ptr; |
80 | } |
81 | |
82 | if (pos >= offset + limit) |
83 | { |
84 | if (!always_read_till_end) |
85 | return res; |
86 | else |
87 | { |
88 | while (children.back()->read()) |
89 | ; |
90 | return res; |
91 | } |
92 | } |
93 | |
94 | do |
95 | { |
96 | res = children.back()->read(); |
97 | if (!res) |
98 | return res; |
99 | rows = res.rows(); |
100 | pos += rows; |
101 | } while (pos <= offset); |
102 | |
103 | SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); |
104 | if (with_ties) |
105 | ptr->sort_columns = extractSortColumns(*ptr, description); |
106 | |
107 | /// give away the whole block |
108 | if (pos >= offset + rows && pos <= offset + limit) |
109 | { |
110 | /// Save rowref for last row, because probalbly next block begins with the same row. |
111 | if (with_ties && pos == offset + limit) |
112 | ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1); |
113 | return *ptr; |
114 | } |
115 | |
116 | /// give away a piece of the block |
117 | UInt64 start = std::max( |
118 | static_cast<Int64>(0), |
119 | static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)); |
120 | |
121 | UInt64 length = std::min( |
122 | static_cast<Int64>(limit), std::min( |
123 | static_cast<Int64>(pos) - static_cast<Int64>(offset), |
124 | static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows))); |
125 | |
126 | |
127 | /// check if other rows in current block equals to last one in limit |
128 | if (with_ties) |
129 | { |
130 | ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1); |
131 | |
132 | for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i) |
133 | { |
134 | SharedBlockRowRef current_row; |
135 | current_row.set(ptr, &ptr->sort_columns, i); |
136 | if (current_row == ties_row_ref) |
137 | ++length; |
138 | else |
139 | { |
140 | ties_row_ref.reset(); |
141 | break; |
142 | } |
143 | } |
144 | } |
145 | |
146 | if (length == rows) |
147 | return *ptr; |
148 | |
149 | for (size_t i = 0; i < ptr->columns(); ++i) |
150 | ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length); |
151 | |
152 | // TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed. |
153 | // It's crucial for streaming engines like Kafka. |
154 | |
155 | return *ptr; |
156 | } |
157 | |
158 | } |
159 | |