1#include <Processors/Transforms/FinishSortingTransform.h>
2
3namespace DB
4{
5
6namespace ErrorCodes
7{
8 extern const int LOGICAL_ERROR;
9}
10
11static bool isPrefix(const SortDescription & pref_descr, const SortDescription & descr)
12{
13 if (pref_descr.size() > descr.size())
14 return false;
15
16 for (size_t i = 0; i < pref_descr.size(); ++i)
17 if (pref_descr[i] != descr[i])
18 return false;
19
20 return true;
21}
22
23FinishSortingTransform::FinishSortingTransform(
24 const Block & header, const SortDescription & description_sorted_,
25 const SortDescription & description_to_sort_,
26 size_t max_merged_block_size_, UInt64 limit_)
27 : SortingTransform(header, description_to_sort_, max_merged_block_size_, limit_)
28 , description_sorted(description_sorted_)
29{
30 const auto & sample = inputs.front().getHeader();
31
32 /// Replace column names to column position in description_sorted.
33 for (auto & column_description : description_sorted)
34 {
35 if (!column_description.column_name.empty())
36 {
37 column_description.column_number = sample.getPositionByName(column_description.column_name);
38 column_description.column_name.clear();
39 }
40 }
41
42 if (!isPrefix(description_sorted, description))
43 throw Exception("Can`t finish sorting. SortDescription of already sorted stream is not prefix of "
44 "SortDescription needed to sort", ErrorCodes::LOGICAL_ERROR);
45}
46
47static bool less(const Columns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
48{
49 for (const auto & elem : descr)
50 {
51 size_t ind = elem.column_number;
52 int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
53 if (res < 0)
54 return true;
55 else if (res > 0)
56 return false;
57 }
58 return false;
59}
60
61void FinishSortingTransform::consume(Chunk chunk)
62{
63 generated_prefix = false;
64
65 // If there were only const columns in sort description, then there is no need to sort.
66 // Return the chunks as is.
67 if (description.empty())
68 {
69 generated_chunk = std::move(chunk);
70 return;
71 }
72
73 removeConstColumns(chunk);
74
75 /// Find the position of last already read key in current chunk.
76 if (!chunks.empty())
77 {
78 size_t size = chunk.getNumRows();
79 const auto & last_chunk = chunks.back();
80
81 ssize_t low = -1;
82 ssize_t high = size;
83 while (high - low > 1)
84 {
85 ssize_t mid = (low + high) / 2;
86 if (!less(last_chunk.getColumns(), chunk.getColumns(), last_chunk.getNumRows() - 1, mid, description_sorted))
87 low = mid;
88 else
89 high = mid;
90 }
91
92 size_t tail_pos = high;
93
94 /// We need to save tail of chunk, because next chunk may starts with the same key as in tail
95 /// and we should sort these rows in one portion.
96 if (tail_pos != size)
97 {
98 auto source_columns = chunk.detachColumns();
99 Columns tail_columns;
100
101 for (size_t i = 0; i < source_columns.size(); ++i)
102 {
103 tail_columns.push_back(source_columns[i]->cut(tail_pos, size - tail_pos));
104 source_columns[i] = source_columns[i]->cut(0, tail_pos);
105 }
106
107 chunks.emplace_back(std::move(source_columns), tail_pos);
108 tail_chunk.setColumns(std::move(tail_columns), size - tail_pos);
109
110 stage = Stage::Generate;
111 return;
112 }
113 }
114
115 /// If we reach here, that means that current cunk is first in portion
116 /// or it all consists of rows with the same key as tail of a previous chunk.
117 chunks.push_back(std::move(chunk));
118}
119
120void FinishSortingTransform::generate()
121{
122 if (!merge_sorter)
123 {
124 merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
125 generated_prefix = true;
126 }
127
128 generated_chunk = merge_sorter->read();
129
130 if (!generated_chunk)
131 {
132 merge_sorter.reset();
133 if (tail_chunk)
134 chunks.push_back(std::move(tail_chunk));
135 stage = Stage::Consume;
136 }
137 else
138 enrichChunkWithConstants(generated_chunk);
139}
140
141}
142