1 | #include <Processors/Transforms/SortingTransform.h> |
2 | |
3 | #include <Core/SortDescription.h> |
4 | #include <Core/SortCursor.h> |
5 | |
6 | #include <Common/formatReadable.h> |
7 | #include <Common/ProfileEvents.h> |
8 | |
9 | #include <IO/WriteBufferFromFile.h> |
10 | #include <Compression/CompressedWriteBuffer.h> |
11 | |
12 | #include <DataStreams/NativeBlockInputStream.h> |
13 | #include <DataStreams/NativeBlockOutputStream.h> |
14 | |
15 | |
16 | namespace ProfileEvents |
17 | { |
18 | extern const Event ExternalSortWritePart; |
19 | extern const Event ExternalSortMerge; |
20 | } |
21 | |
22 | |
23 | namespace DB |
24 | { |
25 | |
26 | MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_) |
27 | : chunks(std::move(chunks_)), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) |
28 | { |
29 | Chunks nonempty_chunks; |
30 | for (auto & chunk : chunks) |
31 | { |
32 | if (chunk.getNumRows() == 0) |
33 | continue; |
34 | |
35 | cursors.emplace_back(chunk.getColumns(), description); |
36 | has_collation |= cursors.back().has_collation; |
37 | |
38 | nonempty_chunks.emplace_back(std::move(chunk)); |
39 | } |
40 | |
41 | chunks.swap(nonempty_chunks); |
42 | |
43 | if (!has_collation) |
44 | { |
45 | for (auto & cursor : cursors) |
46 | queue_without_collation.push(SortCursor(&cursor)); |
47 | } |
48 | else |
49 | { |
50 | for (auto & cursor : cursors) |
51 | queue_with_collation.push(SortCursorWithCollation(&cursor)); |
52 | } |
53 | } |
54 | |
55 | |
56 | Chunk MergeSorter::read() |
57 | { |
58 | if (chunks.empty()) |
59 | return Chunk(); |
60 | |
61 | if (chunks.size() == 1) |
62 | { |
63 | auto res = std::move(chunks[0]); |
64 | chunks.clear(); |
65 | return res; |
66 | } |
67 | |
68 | return !has_collation |
69 | ? mergeImpl<SortCursor>(queue_without_collation) |
70 | : mergeImpl<SortCursorWithCollation>(queue_with_collation); |
71 | } |
72 | |
73 | |
74 | template <typename TSortCursor> |
75 | Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue) |
76 | { |
77 | size_t num_columns = chunks[0].getNumColumns(); |
78 | |
79 | MutableColumns merged_columns = chunks[0].cloneEmptyColumns(); |
80 | /// TODO: reserve (in each column) |
81 | |
82 | /// Take rows from queue in right order and push to 'merged'. |
83 | size_t merged_rows = 0; |
84 | while (!queue.empty()) |
85 | { |
86 | TSortCursor current = queue.top(); |
87 | queue.pop(); |
88 | |
89 | for (size_t i = 0; i < num_columns; ++i) |
90 | merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); |
91 | |
92 | ++total_merged_rows; |
93 | ++merged_rows; |
94 | |
95 | if (!current->isLast()) |
96 | { |
97 | current->next(); |
98 | queue.push(current); |
99 | } |
100 | |
101 | if (limit && total_merged_rows == limit) |
102 | { |
103 | chunks.clear(); |
104 | return Chunk(std::move(merged_columns), merged_rows); |
105 | } |
106 | |
107 | if (merged_rows == max_merged_block_size) |
108 | return Chunk(std::move(merged_columns), merged_rows); |
109 | } |
110 | |
111 | chunks.clear(); |
112 | |
113 | if (merged_rows == 0) |
114 | return {}; |
115 | |
116 | return Chunk(std::move(merged_columns), merged_rows); |
117 | } |
118 | |
119 | |
120 | SortingTransform::SortingTransform( |
121 | const Block & , |
122 | const SortDescription & description_, |
123 | size_t max_merged_block_size_, UInt64 limit_) |
124 | : IProcessor({header}, {header}) |
125 | , description(description_) |
126 | , max_merged_block_size(max_merged_block_size_) |
127 | , limit(limit_) |
128 | { |
129 | auto & sample = inputs.front().getHeader(); |
130 | |
131 | /// Replace column names to column position in sort_description. |
132 | for (auto & column_description : description) |
133 | { |
134 | if (!column_description.column_name.empty()) |
135 | { |
136 | column_description.column_number = sample.getPositionByName(column_description.column_name); |
137 | column_description.column_name.clear(); |
138 | } |
139 | } |
140 | |
141 | /// Remove constants from header and map old indexes to new. |
142 | size_t num_columns = sample.columns(); |
143 | ColumnNumbers map(num_columns, num_columns); |
144 | const_columns_to_remove.assign(num_columns, true); |
145 | for (size_t pos = 0; pos < num_columns; ++pos) |
146 | { |
147 | const auto & column = sample.getByPosition(pos); |
148 | if (!(column.column && isColumnConst(*column.column))) |
149 | { |
150 | map[pos] = header_without_constants.columns(); |
151 | header_without_constants.insert(column); |
152 | const_columns_to_remove[pos] = false; |
153 | } |
154 | } |
155 | |
156 | /// Remove constants from column_description and remap positions. |
157 | SortDescription description_without_constants; |
158 | description_without_constants.reserve(description.size()); |
159 | for (const auto & column_description : description) |
160 | { |
161 | auto old_pos = column_description.column_number; |
162 | auto new_pos = map[old_pos]; |
163 | if (new_pos < num_columns) |
164 | { |
165 | description_without_constants.push_back(column_description); |
166 | description_without_constants.back().column_number = new_pos; |
167 | } |
168 | } |
169 | |
170 | description.swap(description_without_constants); |
171 | } |
172 | |
173 | SortingTransform::~SortingTransform() = default; |
174 | |
175 | IProcessor::Status SortingTransform::prepare() |
176 | { |
177 | if (stage == Stage::Serialize) |
178 | { |
179 | if (!processors.empty()) |
180 | return Status::ExpandPipeline; |
181 | |
182 | auto status = prepareSerialize(); |
183 | if (status != Status::Finished) |
184 | return status; |
185 | |
186 | stage = Stage::Consume; |
187 | } |
188 | |
189 | if (stage == Stage::Consume) |
190 | { |
191 | auto status = prepareConsume(); |
192 | if (status != Status::Finished) |
193 | return status; |
194 | |
195 | stage = Stage::Generate; |
196 | } |
197 | |
198 | /// stage == Stage::Generate |
199 | |
200 | if (!generated_prefix || !chunks.empty()) |
201 | return Status::Ready; |
202 | |
203 | if (!processors.empty()) |
204 | return Status::ExpandPipeline; |
205 | |
206 | return prepareGenerate(); |
207 | } |
208 | |
209 | IProcessor::Status SortingTransform::prepareConsume() |
210 | { |
211 | auto & input = inputs.front(); |
212 | auto & output = outputs.front(); |
213 | |
214 | /// Check can output. |
215 | |
216 | if (output.isFinished()) |
217 | { |
218 | input.close(); |
219 | return Status::Finished; |
220 | } |
221 | |
222 | if (!output.canPush()) |
223 | { |
224 | input.setNotNeeded(); |
225 | return Status::PortFull; |
226 | } |
227 | |
228 | if (generated_chunk) |
229 | output.push(std::move(generated_chunk)); |
230 | |
231 | /// Check can input. |
232 | if (!current_chunk) |
233 | { |
234 | if (input.isFinished()) |
235 | return Status::Finished; |
236 | |
237 | input.setNeeded(); |
238 | |
239 | if (!input.hasData()) |
240 | return Status::NeedData; |
241 | |
242 | current_chunk = input.pull(); |
243 | } |
244 | |
245 | /// Now consume. |
246 | return Status::Ready; |
247 | } |
248 | |
249 | IProcessor::Status SortingTransform::prepareSerialize() |
250 | { |
251 | auto & output = outputs.back(); |
252 | |
253 | if (output.isFinished()) |
254 | return Status::Finished; |
255 | |
256 | if (!output.canPush()) |
257 | return Status::PortFull; |
258 | |
259 | if (current_chunk) |
260 | output.push(std::move(current_chunk)); |
261 | |
262 | if (merge_sorter) |
263 | return Status::Ready; |
264 | |
265 | output.finish(); |
266 | return Status::Finished; |
267 | } |
268 | |
269 | IProcessor::Status SortingTransform::prepareGenerate() |
270 | { |
271 | auto & output = outputs.front(); |
272 | |
273 | if (output.isFinished()) |
274 | { |
275 | inputs.front().close(); |
276 | return Status::Finished; |
277 | } |
278 | |
279 | if (!output.canPush()) |
280 | return Status::PortFull; |
281 | |
282 | if (merge_sorter) |
283 | { |
284 | if (!generated_chunk) |
285 | return Status::Ready; |
286 | |
287 | output.push(std::move(generated_chunk)); |
288 | return Status::PortFull; |
289 | } |
290 | else |
291 | { |
292 | auto & input = inputs.back(); |
293 | |
294 | if (generated_chunk) |
295 | output.push(std::move(generated_chunk)); |
296 | |
297 | if (input.isFinished()) |
298 | { |
299 | output.finish(); |
300 | return Status::Finished; |
301 | } |
302 | |
303 | input.setNeeded(); |
304 | |
305 | if (!input.hasData()) |
306 | return Status::NeedData; |
307 | |
308 | auto chunk = input.pull(); |
309 | enrichChunkWithConstants(chunk); |
310 | output.push(std::move(chunk)); |
311 | return Status::PortFull; |
312 | } |
313 | } |
314 | |
315 | void SortingTransform::work() |
316 | { |
317 | if (stage == Stage::Consume) |
318 | consume(std::move(current_chunk)); |
319 | |
320 | if (stage == Stage::Serialize) |
321 | serialize(); |
322 | |
323 | if (stage == Stage::Generate) |
324 | generate(); |
325 | } |
326 | |
327 | void SortingTransform::removeConstColumns(Chunk & chunk) |
328 | { |
329 | size_t num_columns = chunk.getNumColumns(); |
330 | size_t num_rows = chunk.getNumRows(); |
331 | |
332 | if (num_columns != const_columns_to_remove.size()) |
333 | throw Exception("Block has different number of columns with header: " + toString(num_columns) |
334 | + " vs " + toString(const_columns_to_remove.size()), ErrorCodes::LOGICAL_ERROR); |
335 | |
336 | auto columns = chunk.detachColumns(); |
337 | Columns column_without_constants; |
338 | column_without_constants.reserve(header_without_constants.columns()); |
339 | |
340 | for (size_t position = 0; position < num_columns; ++position) |
341 | { |
342 | if (!const_columns_to_remove[position]) |
343 | column_without_constants.push_back(std::move(columns[position])); |
344 | } |
345 | |
346 | chunk.setColumns(std::move(column_without_constants), num_rows); |
347 | } |
348 | |
349 | void SortingTransform::enrichChunkWithConstants(Chunk & chunk) |
350 | { |
351 | size_t num_rows = chunk.getNumRows(); |
352 | size_t num_result_columns = const_columns_to_remove.size(); |
353 | |
354 | auto columns = chunk.detachColumns(); |
355 | Columns column_with_constants; |
356 | column_with_constants.reserve(num_result_columns); |
357 | |
358 | auto & = inputs.front().getHeader(); |
359 | |
360 | size_t next_non_const_column = 0; |
361 | for (size_t i = 0; i < num_result_columns; ++i) |
362 | { |
363 | if (const_columns_to_remove[i]) |
364 | column_with_constants.emplace_back(header.getByPosition(i).column->cloneResized(num_rows)); |
365 | else |
366 | { |
367 | if (next_non_const_column >= columns.size()) |
368 | throw Exception("Can't enrich chunk with constants because run out of non-constant columns." , |
369 | ErrorCodes::LOGICAL_ERROR); |
370 | |
371 | column_with_constants.emplace_back(std::move(columns[next_non_const_column])); |
372 | ++next_non_const_column; |
373 | } |
374 | } |
375 | |
376 | chunk.setColumns(std::move(column_with_constants), num_rows); |
377 | } |
378 | |
379 | void SortingTransform::serialize() |
380 | { |
381 | throw Exception("Method 'serialize' is not implemented for " + getName() + " processor" , ErrorCodes::NOT_IMPLEMENTED); |
382 | } |
383 | |
384 | } |
385 | |