1#include <DataStreams/FinishSortingBlockInputStream.h>
2#include <DataStreams/MergeSortingBlockInputStream.h>
3#include <DataStreams/processConstants.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int LOGICAL_ERROR;
12}
13
14static bool isPrefix(const SortDescription & pref_descr, const SortDescription & descr)
15{
16 if (pref_descr.size() > descr.size())
17 return false;
18
19 for (size_t i = 0; i < pref_descr.size(); ++i)
20 if (pref_descr[i] != descr[i])
21 return false;
22
23 return true;
24}
25
26FinishSortingBlockInputStream::FinishSortingBlockInputStream(
27 const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
28 const SortDescription & description_to_sort_,
29 size_t max_merged_block_size_, UInt64 limit_)
30 : description_sorted(description_sorted_), description_to_sort(description_to_sort_),
31 max_merged_block_size(max_merged_block_size_), limit(limit_)
32{
33 if (!isPrefix(description_sorted, description_to_sort))
34 throw Exception("Can`t finish sorting. SortDescription of already sorted stream is not prefix of "
35 "SortDescription needed to sort", ErrorCodes::LOGICAL_ERROR);
36
37 children.push_back(input);
38 header = children.at(0)->getHeader();
39 removeConstantsFromSortDescription(header, description_to_sort);
40}
41
42
43struct Less
44{
45 const ColumnsWithSortDescriptions & left_columns;
46 const ColumnsWithSortDescriptions & right_columns;
47
48 Less(const ColumnsWithSortDescriptions & left_columns_, const ColumnsWithSortDescriptions & right_columns_) :
49 left_columns(left_columns_), right_columns(right_columns_) {}
50
51 bool operator() (size_t a, size_t b) const
52 {
53 for (auto it = left_columns.begin(), jt = right_columns.begin(); it != left_columns.end(); ++it, ++jt)
54 {
55 int res = it->description.direction * it->column->compareAt(a, b, *jt->column, it->description.nulls_direction);
56 if (res < 0)
57 return true;
58 else if (res > 0)
59 return false;
60 }
61 return false;
62 }
63};
64
65
66Block FinishSortingBlockInputStream::readImpl()
67{
68 if (limit && total_rows_processed >= limit)
69 return {};
70
71 Block res;
72 if (impl)
73 res = impl->read();
74
75 /// If res block is empty, we have finished sorting previous chunk of blocks.
76 if (!res)
77 {
78 if (end_of_stream)
79 return {};
80
81 blocks.clear();
82 if (tail_block)
83 blocks.push_back(std::move(tail_block));
84
85 while (true)
86 {
87 Block block = children.back()->read();
88
89 /// End of input stream, but we can`t return immediately, we need to merge already read blocks.
90 /// Check it later, when get end of stream from impl.
91 if (!block)
92 {
93 end_of_stream = true;
94 break;
95 }
96
97 // If there were only const columns in sort description, then there is no need to sort.
98 // Return the blocks as is.
99 if (description_to_sort.empty())
100 return block;
101
102 if (block.rows() == 0)
103 continue;
104
105
106 removeConstantsFromBlock(block);
107
108 /// Find the position of last already read key in current block.
109 if (!blocks.empty())
110 {
111 const Block & last_block = blocks.back();
112 auto last_columns = getColumnsWithSortDescription(last_block, description_sorted);
113 auto current_columns = getColumnsWithSortDescription(block, description_sorted);
114
115 Less less(last_columns, current_columns);
116
117 size_t size = block.rows();
118 IColumn::Permutation perm(size);
119 for (size_t i = 0; i < size; ++i)
120 perm[i] = i;
121
122 auto it = std::upper_bound(perm.begin(), perm.end(), last_block.rows() - 1, less);
123
124 /// We need to save tail of block, because next block may starts with the same key as in tail
125 /// and we should sort these rows in one chunk.
126 if (it != perm.end())
127 {
128 size_t tail_pos = it - perm.begin();
129 Block head_block = block.cloneEmpty();
130 tail_block = block.cloneEmpty();
131
132 for (size_t i = 0; i < block.columns(); ++i)
133 {
134 head_block.getByPosition(i).column = block.getByPosition(i).column->cut(0, tail_pos);
135 tail_block.getByPosition(i).column = block.getByPosition(i).column->cut(tail_pos, block.rows() - tail_pos);
136 }
137
138 if (head_block.rows())
139 blocks.push_back(head_block);
140
141 break;
142 }
143 }
144
145 /// If we reach here, that means that current block is first in chunk
146 /// or it all consists of rows with the same key as tail of a previous block.
147 blocks.push_back(block);
148 }
149
150 if (!blocks.empty())
151 {
152 impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description_to_sort, max_merged_block_size, limit);
153 res = impl->read();
154 }
155 }
156
157 if (res)
158 enrichBlockWithConstants(res, header);
159
160 total_rows_processed += res.rows();
161
162 return res;
163}
164}
165