1 | #pragma once |
2 | |
3 | #include <cassert> |
4 | #include <vector> |
5 | #include <algorithm> |
6 | |
7 | #include <Common/typeid_cast.h> |
8 | #include <Common/assert_cast.h> |
9 | #include <Core/SortDescription.h> |
10 | #include <Core/Block.h> |
11 | #include <Core/ColumnNumbers.h> |
12 | #include <Columns/IColumn.h> |
13 | #include <Columns/ColumnString.h> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | /** Cursor allows to compare rows in different blocks (and parts). |
20 | * Cursor moves inside single block. |
21 | * It is used in priority queue. |
22 | */ |
23 | struct SortCursorImpl |
24 | { |
25 | ColumnRawPtrs all_columns; |
26 | ColumnRawPtrs sort_columns; |
27 | SortDescription desc; |
28 | size_t sort_columns_size = 0; |
29 | size_t pos = 0; |
30 | size_t rows = 0; |
31 | |
32 | /** Determines order if comparing columns are equal. |
33 | * Order is determined by number of cursor. |
34 | * |
35 | * Cursor number (always?) equals to number of merging part. |
36 | * Therefore this field can be used to determine part number of current row (see ColumnGathererStream). |
37 | */ |
38 | size_t order = 0; |
39 | |
40 | using NeedCollationFlags = std::vector<UInt8>; |
41 | |
42 | /** Should we use Collator to sort a column? */ |
43 | NeedCollationFlags need_collation; |
44 | |
45 | /** Is there at least one column with Collator. */ |
46 | bool has_collation = false; |
47 | |
48 | SortCursorImpl() {} |
49 | |
50 | SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0) |
51 | : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) |
52 | { |
53 | reset(block); |
54 | } |
55 | |
56 | SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0) |
57 | : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) |
58 | { |
59 | for (auto & column_desc : desc) |
60 | { |
61 | if (!column_desc.column_name.empty()) |
62 | throw Exception("SortDesctiption should contain column position if SortCursor was used without header." , |
63 | ErrorCodes::LOGICAL_ERROR); |
64 | } |
65 | reset(columns, {}); |
66 | } |
67 | |
68 | bool empty() const { return rows == 0; } |
69 | |
70 | /// Set the cursor to the beginning of the new block. |
71 | void reset(const Block & block) |
72 | { |
73 | reset(block.getColumns(), block); |
74 | } |
75 | |
76 | /// Set the cursor to the beginning of the new block. |
77 | void reset(const Columns & columns, const Block & block) |
78 | { |
79 | all_columns.clear(); |
80 | sort_columns.clear(); |
81 | |
82 | size_t num_columns = columns.size(); |
83 | |
84 | for (size_t j = 0; j < num_columns; ++j) |
85 | all_columns.push_back(columns[j].get()); |
86 | |
87 | for (size_t j = 0, size = desc.size(); j < size; ++j) |
88 | { |
89 | auto & column_desc = desc[j]; |
90 | size_t column_number = !column_desc.column_name.empty() |
91 | ? block.getPositionByName(column_desc.column_name) |
92 | : column_desc.column_number; |
93 | sort_columns.push_back(columns[column_number].get()); |
94 | |
95 | need_collation[j] = desc[j].collator != nullptr && typeid_cast<const ColumnString *>(sort_columns.back()); /// TODO Nullable(String) |
96 | has_collation |= need_collation[j]; |
97 | } |
98 | |
99 | pos = 0; |
100 | rows = all_columns[0]->size(); |
101 | } |
102 | |
103 | bool isFirst() const { return pos == 0; } |
104 | bool isLast() const { return pos + 1 >= rows; } |
105 | bool isValid() const { return pos < rows; } |
106 | void next() { ++pos; } |
107 | }; |
108 | |
109 | using SortCursorImpls = std::vector<SortCursorImpl>; |
110 | |
111 | |
112 | /// For easy copying. |
113 | struct SortCursor |
114 | { |
115 | SortCursorImpl * impl; |
116 | |
117 | SortCursor(SortCursorImpl * impl_) : impl(impl_) {} |
118 | SortCursorImpl * operator-> () { return impl; } |
119 | const SortCursorImpl * operator-> () const { return impl; } |
120 | |
121 | /// The specified row of this cursor is greater than the specified row of another cursor. |
122 | bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const |
123 | { |
124 | for (size_t i = 0; i < impl->sort_columns_size; ++i) |
125 | { |
126 | int direction = impl->desc[i].direction; |
127 | int nulls_direction = impl->desc[i].nulls_direction; |
128 | int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction); |
129 | if (res > 0) |
130 | return true; |
131 | if (res < 0) |
132 | return false; |
133 | } |
134 | return impl->order > rhs.impl->order; |
135 | } |
136 | |
137 | /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. |
138 | bool totallyLessOrEquals(const SortCursor & rhs) const |
139 | { |
140 | if (impl->rows == 0 || rhs.impl->rows == 0) |
141 | return false; |
142 | |
143 | /// The last row of this cursor is no larger than the first row of the another cursor. |
144 | return !greaterAt(rhs, impl->rows - 1, 0); |
145 | } |
146 | |
147 | bool greater(const SortCursor & rhs) const |
148 | { |
149 | return greaterAt(rhs, impl->pos, rhs.impl->pos); |
150 | } |
151 | |
152 | /// Inverted so that the priority queue elements are removed in ascending order. |
153 | bool operator< (const SortCursor & rhs) const |
154 | { |
155 | return greater(rhs); |
156 | } |
157 | }; |
158 | |
159 | |
160 | /// Separate comparator for locale-sensitive string comparisons |
161 | struct SortCursorWithCollation |
162 | { |
163 | SortCursorImpl * impl; |
164 | |
165 | SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {} |
166 | SortCursorImpl * operator-> () { return impl; } |
167 | const SortCursorImpl * operator-> () const { return impl; } |
168 | |
169 | bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const |
170 | { |
171 | for (size_t i = 0; i < impl->sort_columns_size; ++i) |
172 | { |
173 | int direction = impl->desc[i].direction; |
174 | int nulls_direction = impl->desc[i].nulls_direction; |
175 | int res; |
176 | if (impl->need_collation[i]) |
177 | { |
178 | const ColumnString & column_string = assert_cast<const ColumnString &>(*impl->sort_columns[i]); |
179 | res = column_string.compareAtWithCollation(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), *impl->desc[i].collator); |
180 | } |
181 | else |
182 | res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction); |
183 | |
184 | res *= direction; |
185 | if (res > 0) |
186 | return true; |
187 | if (res < 0) |
188 | return false; |
189 | } |
190 | return impl->order > rhs.impl->order; |
191 | } |
192 | |
193 | bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const |
194 | { |
195 | if (impl->rows == 0 || rhs.impl->rows == 0) |
196 | return false; |
197 | |
198 | /// The last row of this cursor is no larger than the first row of the another cursor. |
199 | return !greaterAt(rhs, impl->rows - 1, 0); |
200 | } |
201 | |
202 | bool greater(const SortCursorWithCollation & rhs) const |
203 | { |
204 | return greaterAt(rhs, impl->pos, rhs.impl->pos); |
205 | } |
206 | |
207 | bool operator< (const SortCursorWithCollation & rhs) const |
208 | { |
209 | return greater(rhs); |
210 | } |
211 | }; |
212 | |
213 | |
214 | /** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). |
215 | */ |
216 | template <typename Cursor> |
217 | class SortingHeap |
218 | { |
219 | public: |
220 | SortingHeap() = default; |
221 | |
222 | template <typename Cursors> |
223 | SortingHeap(Cursors & cursors) |
224 | { |
225 | size_t size = cursors.size(); |
226 | queue.reserve(size); |
227 | for (size_t i = 0; i < size; ++i) |
228 | queue.emplace_back(&cursors[i]); |
229 | std::make_heap(queue.begin(), queue.end()); |
230 | } |
231 | |
232 | bool isValid() const { return !queue.empty(); } |
233 | |
234 | Cursor & current() { return queue.front(); } |
235 | |
236 | void next() |
237 | { |
238 | assert(isValid()); |
239 | |
240 | if (!current()->isLast()) |
241 | { |
242 | current()->next(); |
243 | updateTop(); |
244 | } |
245 | else |
246 | removeTop(); |
247 | } |
248 | |
249 | private: |
250 | using Container = std::vector<Cursor>; |
251 | Container queue; |
252 | |
253 | /// This is adapted version of the function __sift_down from libc++. |
254 | /// Why cannot simply use std::priority_queue? |
255 | /// - because it doesn't support updating the top element and requires pop and push instead. |
256 | void updateTop() |
257 | { |
258 | size_t size = queue.size(); |
259 | if (size < 2) |
260 | return; |
261 | |
262 | size_t child_idx = 1; |
263 | auto begin = queue.begin(); |
264 | auto child_it = begin + 1; |
265 | |
266 | /// Right child exists and is greater than left child. |
267 | if (size > 2 && *child_it < *(child_it + 1)) |
268 | { |
269 | ++child_it; |
270 | ++child_idx; |
271 | } |
272 | |
273 | /// Check if we are in order. |
274 | if (*child_it < *begin) |
275 | return; |
276 | |
277 | auto curr_it = begin; |
278 | auto top(std::move(*begin)); |
279 | do |
280 | { |
281 | /// We are not in heap-order, swap the parent with it's largest child. |
282 | *curr_it = std::move(*child_it); |
283 | curr_it = child_it; |
284 | |
285 | if ((size - 2) / 2 < child_idx) |
286 | break; |
287 | |
288 | // recompute the child based off of the updated parent |
289 | child_idx = 2 * child_idx + 1; |
290 | child_it = begin + child_idx; |
291 | |
292 | if ((child_idx + 1) < size && *child_it < *(child_it + 1)) |
293 | { |
294 | /// Right child exists and is greater than left child. |
295 | ++child_it; |
296 | ++child_idx; |
297 | } |
298 | |
299 | /// Check if we are in order. |
300 | } while (!(*child_it < top)); |
301 | *curr_it = std::move(top); |
302 | } |
303 | |
304 | void removeTop() |
305 | { |
306 | std::pop_heap(queue.begin(), queue.end()); |
307 | queue.pop_back(); |
308 | } |
309 | }; |
310 | |
311 | } |
312 | |