1 | #include <Processors/LimitTransform.h> |
2 | |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | LimitTransform::LimitTransform( |
8 | const Block & , size_t limit_, size_t offset_, |
9 | bool always_read_till_end_, bool with_ties_, |
10 | const SortDescription & description_) |
11 | : IProcessor({header_}, {header_}) |
12 | , input(inputs.front()), output(outputs.front()) |
13 | , limit(limit_), offset(offset_) |
14 | , always_read_till_end(always_read_till_end_) |
15 | , with_ties(with_ties_), description(description_) |
16 | { |
17 | for (const auto & desc : description) |
18 | { |
19 | if (!desc.column_name.empty()) |
20 | sort_column_positions.push_back(header_.getPositionByName(desc.column_name)); |
21 | else |
22 | sort_column_positions.push_back(desc.column_number); |
23 | } |
24 | } |
25 | |
26 | |
27 | LimitTransform::Status LimitTransform::prepare() |
28 | { |
29 | /// Check can output. |
30 | bool output_finished = false; |
31 | if (output.isFinished()) |
32 | { |
33 | output_finished = true; |
34 | if (!always_read_till_end) |
35 | { |
36 | input.close(); |
37 | return Status::Finished; |
38 | } |
39 | } |
40 | |
41 | if (!output_finished && !output.canPush()) |
42 | { |
43 | input.setNotNeeded(); |
44 | return Status::PortFull; |
45 | } |
46 | |
47 | /// Push block if can. |
48 | if (!output_finished && has_block && block_processed) |
49 | { |
50 | output.push(std::move(current_chunk)); |
51 | has_block = false; |
52 | block_processed = false; |
53 | } |
54 | |
55 | /// Check if we are done with pushing. |
56 | bool pushing_is_finished = (rows_read >= offset + limit) && ties_row_ref.empty(); |
57 | if (pushing_is_finished) |
58 | { |
59 | if (!always_read_till_end) |
60 | { |
61 | output.finish(); |
62 | input.close(); |
63 | return Status::Finished; |
64 | } |
65 | } |
66 | |
67 | /// Check can input. |
68 | |
69 | if (input.isFinished()) |
70 | { |
71 | output.finish(); |
72 | return Status::Finished; |
73 | } |
74 | |
75 | input.setNeeded(); |
76 | if (!input.hasData()) |
77 | return Status::NeedData; |
78 | |
79 | current_chunk = input.pull(); |
80 | has_block = true; |
81 | |
82 | auto rows = current_chunk.getNumRows(); |
83 | rows_before_limit_at_least += rows; |
84 | |
85 | /// Skip block (for 'always_read_till_end' case). |
86 | if (pushing_is_finished) |
87 | { |
88 | current_chunk.clear(); |
89 | has_block = false; |
90 | |
91 | if (input.isFinished()) |
92 | { |
93 | output.finish(); |
94 | return Status::Finished; |
95 | } |
96 | |
97 | /// Now, we pulled from input, and it must be empty. |
98 | return Status::NeedData; |
99 | } |
100 | |
101 | /// Process block. |
102 | |
103 | rows_read += rows; |
104 | |
105 | if (rows_read <= offset) |
106 | { |
107 | current_chunk.clear(); |
108 | has_block = false; |
109 | |
110 | if (input.isFinished()) |
111 | { |
112 | output.finish(); |
113 | return Status::Finished; |
114 | } |
115 | |
116 | /// Now, we pulled from input, and it must be empty. |
117 | return Status::NeedData; |
118 | } |
119 | |
120 | /// Return the whole block. |
121 | if (rows_read >= offset + rows && rows_read <= offset + limit) |
122 | { |
123 | if (output.hasData()) |
124 | return Status::PortFull; |
125 | |
126 | if (with_ties && rows_read == offset + limit) |
127 | { |
128 | SharedChunkPtr shared_chunk = new detail::SharedChunk(current_chunk.clone()); |
129 | shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns()); |
130 | ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, shared_chunk->getNumRows() - 1); |
131 | } |
132 | |
133 | output.push(std::move(current_chunk)); |
134 | has_block = false; |
135 | |
136 | return Status::PortFull; |
137 | } |
138 | |
139 | /// No more data is needed. |
140 | if (!always_read_till_end && rows_read >= offset + limit) |
141 | input.close(); |
142 | |
143 | return Status::Ready; |
144 | } |
145 | |
146 | |
147 | void LimitTransform::work() |
148 | { |
149 | SharedChunkPtr shared_chunk = new detail::SharedChunk(std::move(current_chunk)); |
150 | shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns()); |
151 | |
152 | size_t num_rows = shared_chunk->getNumRows(); |
153 | size_t num_columns = shared_chunk->getNumColumns(); |
154 | |
155 | if (!ties_row_ref.empty() && rows_read >= offset + limit) |
156 | { |
157 | UInt64 len; |
158 | for (len = 0; len < num_rows; ++len) |
159 | { |
160 | SharedChunkRowRef current_row; |
161 | current_row.set(shared_chunk, &shared_chunk->sort_columns, len); |
162 | |
163 | if (current_row != ties_row_ref) |
164 | { |
165 | ties_row_ref.reset(); |
166 | break; |
167 | } |
168 | } |
169 | |
170 | auto columns = shared_chunk->detachColumns(); |
171 | |
172 | if (len < num_rows) |
173 | { |
174 | for (size_t i = 0; i < num_columns; ++i) |
175 | columns[i] = columns[i]->cut(0, len); |
176 | } |
177 | |
178 | current_chunk.setColumns(std::move(columns), len); |
179 | block_processed = true; |
180 | return; |
181 | } |
182 | |
183 | /// return a piece of the block |
184 | size_t start = std::max( |
185 | static_cast<Int64>(0), |
186 | static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)); |
187 | |
188 | size_t length = std::min( |
189 | static_cast<Int64>(limit), std::min( |
190 | static_cast<Int64>(rows_read) - static_cast<Int64>(offset), |
191 | static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows))); |
192 | |
193 | /// check if other rows in current block equals to last one in limit |
194 | if (with_ties) |
195 | { |
196 | ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, start + length - 1); |
197 | SharedChunkRowRef current_row; |
198 | |
199 | for (size_t i = ties_row_ref.row_num + 1; i < num_rows; ++i) |
200 | { |
201 | current_row.set(shared_chunk, &shared_chunk->sort_columns, i); |
202 | if (current_row == ties_row_ref) |
203 | ++length; |
204 | else |
205 | { |
206 | ties_row_ref.reset(); |
207 | break; |
208 | } |
209 | } |
210 | } |
211 | |
212 | if (length == num_rows) |
213 | { |
214 | current_chunk = std::move(*shared_chunk); |
215 | block_processed = true; |
216 | return; |
217 | } |
218 | |
219 | auto columns = shared_chunk->detachColumns(); |
220 | |
221 | for (size_t i = 0; i < num_columns; ++i) |
222 | columns[i] = columns[i]->cut(start, length); |
223 | |
224 | current_chunk.setColumns(std::move(columns), length); |
225 | |
226 | block_processed = true; |
227 | } |
228 | |
229 | ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) |
230 | { |
231 | ColumnRawPtrs res; |
232 | res.reserve(description.size()); |
233 | for (size_t pos : sort_column_positions) |
234 | res.push_back(columns[pos].get()); |
235 | |
236 | return res; |
237 | } |
238 | |
239 | } |
240 | |
241 | |