1#include <Processors/LimitTransform.h>
2
3
4namespace DB
5{
6
7LimitTransform::LimitTransform(
8 const Block & header_, size_t limit_, size_t offset_,
9 bool always_read_till_end_, bool with_ties_,
10 const SortDescription & description_)
11 : IProcessor({header_}, {header_})
12 , input(inputs.front()), output(outputs.front())
13 , limit(limit_), offset(offset_)
14 , always_read_till_end(always_read_till_end_)
15 , with_ties(with_ties_), description(description_)
16{
17 for (const auto & desc : description)
18 {
19 if (!desc.column_name.empty())
20 sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
21 else
22 sort_column_positions.push_back(desc.column_number);
23 }
24}
25
26
27LimitTransform::Status LimitTransform::prepare()
28{
29 /// Check can output.
30 bool output_finished = false;
31 if (output.isFinished())
32 {
33 output_finished = true;
34 if (!always_read_till_end)
35 {
36 input.close();
37 return Status::Finished;
38 }
39 }
40
41 if (!output_finished && !output.canPush())
42 {
43 input.setNotNeeded();
44 return Status::PortFull;
45 }
46
47 /// Push block if can.
48 if (!output_finished && has_block && block_processed)
49 {
50 output.push(std::move(current_chunk));
51 has_block = false;
52 block_processed = false;
53 }
54
55 /// Check if we are done with pushing.
56 bool pushing_is_finished = (rows_read >= offset + limit) && ties_row_ref.empty();
57 if (pushing_is_finished)
58 {
59 if (!always_read_till_end)
60 {
61 output.finish();
62 input.close();
63 return Status::Finished;
64 }
65 }
66
67 /// Check can input.
68
69 if (input.isFinished())
70 {
71 output.finish();
72 return Status::Finished;
73 }
74
75 input.setNeeded();
76 if (!input.hasData())
77 return Status::NeedData;
78
79 current_chunk = input.pull();
80 has_block = true;
81
82 auto rows = current_chunk.getNumRows();
83 rows_before_limit_at_least += rows;
84
85 /// Skip block (for 'always_read_till_end' case).
86 if (pushing_is_finished)
87 {
88 current_chunk.clear();
89 has_block = false;
90
91 if (input.isFinished())
92 {
93 output.finish();
94 return Status::Finished;
95 }
96
97 /// Now, we pulled from input, and it must be empty.
98 return Status::NeedData;
99 }
100
101 /// Process block.
102
103 rows_read += rows;
104
105 if (rows_read <= offset)
106 {
107 current_chunk.clear();
108 has_block = false;
109
110 if (input.isFinished())
111 {
112 output.finish();
113 return Status::Finished;
114 }
115
116 /// Now, we pulled from input, and it must be empty.
117 return Status::NeedData;
118 }
119
120 /// Return the whole block.
121 if (rows_read >= offset + rows && rows_read <= offset + limit)
122 {
123 if (output.hasData())
124 return Status::PortFull;
125
126 if (with_ties && rows_read == offset + limit)
127 {
128 SharedChunkPtr shared_chunk = new detail::SharedChunk(current_chunk.clone());
129 shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
130 ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, shared_chunk->getNumRows() - 1);
131 }
132
133 output.push(std::move(current_chunk));
134 has_block = false;
135
136 return Status::PortFull;
137 }
138
139 /// No more data is needed.
140 if (!always_read_till_end && rows_read >= offset + limit)
141 input.close();
142
143 return Status::Ready;
144}
145
146
147void LimitTransform::work()
148{
149 SharedChunkPtr shared_chunk = new detail::SharedChunk(std::move(current_chunk));
150 shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
151
152 size_t num_rows = shared_chunk->getNumRows();
153 size_t num_columns = shared_chunk->getNumColumns();
154
155 if (!ties_row_ref.empty() && rows_read >= offset + limit)
156 {
157 UInt64 len;
158 for (len = 0; len < num_rows; ++len)
159 {
160 SharedChunkRowRef current_row;
161 current_row.set(shared_chunk, &shared_chunk->sort_columns, len);
162
163 if (current_row != ties_row_ref)
164 {
165 ties_row_ref.reset();
166 break;
167 }
168 }
169
170 auto columns = shared_chunk->detachColumns();
171
172 if (len < num_rows)
173 {
174 for (size_t i = 0; i < num_columns; ++i)
175 columns[i] = columns[i]->cut(0, len);
176 }
177
178 current_chunk.setColumns(std::move(columns), len);
179 block_processed = true;
180 return;
181 }
182
183 /// return a piece of the block
184 size_t start = std::max(
185 static_cast<Int64>(0),
186 static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
187
188 size_t length = std::min(
189 static_cast<Int64>(limit), std::min(
190 static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
191 static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
192
193 /// check if other rows in current block equals to last one in limit
194 if (with_ties)
195 {
196 ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, start + length - 1);
197 SharedChunkRowRef current_row;
198
199 for (size_t i = ties_row_ref.row_num + 1; i < num_rows; ++i)
200 {
201 current_row.set(shared_chunk, &shared_chunk->sort_columns, i);
202 if (current_row == ties_row_ref)
203 ++length;
204 else
205 {
206 ties_row_ref.reset();
207 break;
208 }
209 }
210 }
211
212 if (length == num_rows)
213 {
214 current_chunk = std::move(*shared_chunk);
215 block_processed = true;
216 return;
217 }
218
219 auto columns = shared_chunk->detachColumns();
220
221 for (size_t i = 0; i < num_columns; ++i)
222 columns[i] = columns[i]->cut(start, length);
223
224 current_chunk.setColumns(std::move(columns), length);
225
226 block_processed = true;
227}
228
229ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns)
230{
231 ColumnRawPtrs res;
232 res.reserve(description.size());
233 for (size_t pos : sort_column_positions)
234 res.push_back(columns[pos].get());
235
236 return res;
237}
238
239}
240
241