1#include <Processors/Transforms/SortingTransform.h>
2
3#include <Core/SortDescription.h>
4#include <Core/SortCursor.h>
5
6#include <Common/formatReadable.h>
7#include <Common/ProfileEvents.h>
8
9#include <IO/WriteBufferFromFile.h>
10#include <Compression/CompressedWriteBuffer.h>
11
12#include <DataStreams/NativeBlockInputStream.h>
13#include <DataStreams/NativeBlockOutputStream.h>
14
15
16namespace ProfileEvents
17{
18 extern const Event ExternalSortWritePart;
19 extern const Event ExternalSortMerge;
20}
21
22
23namespace DB
24{
25
26MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
27 : chunks(std::move(chunks_)), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
28{
29 Chunks nonempty_chunks;
30 for (auto & chunk : chunks)
31 {
32 if (chunk.getNumRows() == 0)
33 continue;
34
35 cursors.emplace_back(chunk.getColumns(), description);
36 has_collation |= cursors.back().has_collation;
37
38 nonempty_chunks.emplace_back(std::move(chunk));
39 }
40
41 chunks.swap(nonempty_chunks);
42
43 if (!has_collation)
44 {
45 for (auto & cursor : cursors)
46 queue_without_collation.push(SortCursor(&cursor));
47 }
48 else
49 {
50 for (auto & cursor : cursors)
51 queue_with_collation.push(SortCursorWithCollation(&cursor));
52 }
53}
54
55
56Chunk MergeSorter::read()
57{
58 if (chunks.empty())
59 return Chunk();
60
61 if (chunks.size() == 1)
62 {
63 auto res = std::move(chunks[0]);
64 chunks.clear();
65 return res;
66 }
67
68 return !has_collation
69 ? mergeImpl<SortCursor>(queue_without_collation)
70 : mergeImpl<SortCursorWithCollation>(queue_with_collation);
71}
72
73
74template <typename TSortCursor>
75Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue)
76{
77 size_t num_columns = chunks[0].getNumColumns();
78
79 MutableColumns merged_columns = chunks[0].cloneEmptyColumns();
80 /// TODO: reserve (in each column)
81
82 /// Take rows from queue in right order and push to 'merged'.
83 size_t merged_rows = 0;
84 while (!queue.empty())
85 {
86 TSortCursor current = queue.top();
87 queue.pop();
88
89 for (size_t i = 0; i < num_columns; ++i)
90 merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
91
92 ++total_merged_rows;
93 ++merged_rows;
94
95 if (!current->isLast())
96 {
97 current->next();
98 queue.push(current);
99 }
100
101 if (limit && total_merged_rows == limit)
102 {
103 chunks.clear();
104 return Chunk(std::move(merged_columns), merged_rows);
105 }
106
107 if (merged_rows == max_merged_block_size)
108 return Chunk(std::move(merged_columns), merged_rows);
109 }
110
111 chunks.clear();
112
113 if (merged_rows == 0)
114 return {};
115
116 return Chunk(std::move(merged_columns), merged_rows);
117}
118
119
120SortingTransform::SortingTransform(
121 const Block & header,
122 const SortDescription & description_,
123 size_t max_merged_block_size_, UInt64 limit_)
124 : IProcessor({header}, {header})
125 , description(description_)
126 , max_merged_block_size(max_merged_block_size_)
127 , limit(limit_)
128{
129 auto & sample = inputs.front().getHeader();
130
131 /// Replace column names to column position in sort_description.
132 for (auto & column_description : description)
133 {
134 if (!column_description.column_name.empty())
135 {
136 column_description.column_number = sample.getPositionByName(column_description.column_name);
137 column_description.column_name.clear();
138 }
139 }
140
141 /// Remove constants from header and map old indexes to new.
142 size_t num_columns = sample.columns();
143 ColumnNumbers map(num_columns, num_columns);
144 const_columns_to_remove.assign(num_columns, true);
145 for (size_t pos = 0; pos < num_columns; ++pos)
146 {
147 const auto & column = sample.getByPosition(pos);
148 if (!(column.column && isColumnConst(*column.column)))
149 {
150 map[pos] = header_without_constants.columns();
151 header_without_constants.insert(column);
152 const_columns_to_remove[pos] = false;
153 }
154 }
155
156 /// Remove constants from column_description and remap positions.
157 SortDescription description_without_constants;
158 description_without_constants.reserve(description.size());
159 for (const auto & column_description : description)
160 {
161 auto old_pos = column_description.column_number;
162 auto new_pos = map[old_pos];
163 if (new_pos < num_columns)
164 {
165 description_without_constants.push_back(column_description);
166 description_without_constants.back().column_number = new_pos;
167 }
168 }
169
170 description.swap(description_without_constants);
171}
172
173SortingTransform::~SortingTransform() = default;
174
175IProcessor::Status SortingTransform::prepare()
176{
177 if (stage == Stage::Serialize)
178 {
179 if (!processors.empty())
180 return Status::ExpandPipeline;
181
182 auto status = prepareSerialize();
183 if (status != Status::Finished)
184 return status;
185
186 stage = Stage::Consume;
187 }
188
189 if (stage == Stage::Consume)
190 {
191 auto status = prepareConsume();
192 if (status != Status::Finished)
193 return status;
194
195 stage = Stage::Generate;
196 }
197
198 /// stage == Stage::Generate
199
200 if (!generated_prefix || !chunks.empty())
201 return Status::Ready;
202
203 if (!processors.empty())
204 return Status::ExpandPipeline;
205
206 return prepareGenerate();
207}
208
209IProcessor::Status SortingTransform::prepareConsume()
210{
211 auto & input = inputs.front();
212 auto & output = outputs.front();
213
214 /// Check can output.
215
216 if (output.isFinished())
217 {
218 input.close();
219 return Status::Finished;
220 }
221
222 if (!output.canPush())
223 {
224 input.setNotNeeded();
225 return Status::PortFull;
226 }
227
228 if (generated_chunk)
229 output.push(std::move(generated_chunk));
230
231 /// Check can input.
232 if (!current_chunk)
233 {
234 if (input.isFinished())
235 return Status::Finished;
236
237 input.setNeeded();
238
239 if (!input.hasData())
240 return Status::NeedData;
241
242 current_chunk = input.pull();
243 }
244
245 /// Now consume.
246 return Status::Ready;
247}
248
249IProcessor::Status SortingTransform::prepareSerialize()
250{
251 auto & output = outputs.back();
252
253 if (output.isFinished())
254 return Status::Finished;
255
256 if (!output.canPush())
257 return Status::PortFull;
258
259 if (current_chunk)
260 output.push(std::move(current_chunk));
261
262 if (merge_sorter)
263 return Status::Ready;
264
265 output.finish();
266 return Status::Finished;
267}
268
269IProcessor::Status SortingTransform::prepareGenerate()
270{
271 auto & output = outputs.front();
272
273 if (output.isFinished())
274 {
275 inputs.front().close();
276 return Status::Finished;
277 }
278
279 if (!output.canPush())
280 return Status::PortFull;
281
282 if (merge_sorter)
283 {
284 if (!generated_chunk)
285 return Status::Ready;
286
287 output.push(std::move(generated_chunk));
288 return Status::PortFull;
289 }
290 else
291 {
292 auto & input = inputs.back();
293
294 if (generated_chunk)
295 output.push(std::move(generated_chunk));
296
297 if (input.isFinished())
298 {
299 output.finish();
300 return Status::Finished;
301 }
302
303 input.setNeeded();
304
305 if (!input.hasData())
306 return Status::NeedData;
307
308 auto chunk = input.pull();
309 enrichChunkWithConstants(chunk);
310 output.push(std::move(chunk));
311 return Status::PortFull;
312 }
313}
314
315void SortingTransform::work()
316{
317 if (stage == Stage::Consume)
318 consume(std::move(current_chunk));
319
320 if (stage == Stage::Serialize)
321 serialize();
322
323 if (stage == Stage::Generate)
324 generate();
325}
326
327void SortingTransform::removeConstColumns(Chunk & chunk)
328{
329 size_t num_columns = chunk.getNumColumns();
330 size_t num_rows = chunk.getNumRows();
331
332 if (num_columns != const_columns_to_remove.size())
333 throw Exception("Block has different number of columns with header: " + toString(num_columns)
334 + " vs " + toString(const_columns_to_remove.size()), ErrorCodes::LOGICAL_ERROR);
335
336 auto columns = chunk.detachColumns();
337 Columns column_without_constants;
338 column_without_constants.reserve(header_without_constants.columns());
339
340 for (size_t position = 0; position < num_columns; ++position)
341 {
342 if (!const_columns_to_remove[position])
343 column_without_constants.push_back(std::move(columns[position]));
344 }
345
346 chunk.setColumns(std::move(column_without_constants), num_rows);
347}
348
349void SortingTransform::enrichChunkWithConstants(Chunk & chunk)
350{
351 size_t num_rows = chunk.getNumRows();
352 size_t num_result_columns = const_columns_to_remove.size();
353
354 auto columns = chunk.detachColumns();
355 Columns column_with_constants;
356 column_with_constants.reserve(num_result_columns);
357
358 auto & header = inputs.front().getHeader();
359
360 size_t next_non_const_column = 0;
361 for (size_t i = 0; i < num_result_columns; ++i)
362 {
363 if (const_columns_to_remove[i])
364 column_with_constants.emplace_back(header.getByPosition(i).column->cloneResized(num_rows));
365 else
366 {
367 if (next_non_const_column >= columns.size())
368 throw Exception("Can't enrich chunk with constants because run out of non-constant columns.",
369 ErrorCodes::LOGICAL_ERROR);
370
371 column_with_constants.emplace_back(std::move(columns[next_non_const_column]));
372 ++next_non_const_column;
373 }
374 }
375
376 chunk.setColumns(std::move(column_with_constants), num_rows);
377}
378
379void SortingTransform::serialize()
380{
381 throw Exception("Method 'serialize' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
382}
383
384}
385