1#pragma once
2
3#include <cassert>
4#include <vector>
5#include <algorithm>
6
7#include <Common/typeid_cast.h>
8#include <Common/assert_cast.h>
9#include <Core/SortDescription.h>
10#include <Core/Block.h>
11#include <Core/ColumnNumbers.h>
12#include <Columns/IColumn.h>
13#include <Columns/ColumnString.h>
14
15
16namespace DB
17{
18
19/** Cursor allows to compare rows in different blocks (and parts).
20 * Cursor moves inside single block.
21 * It is used in priority queue.
22 */
23struct SortCursorImpl
24{
25 ColumnRawPtrs all_columns;
26 ColumnRawPtrs sort_columns;
27 SortDescription desc;
28 size_t sort_columns_size = 0;
29 size_t pos = 0;
30 size_t rows = 0;
31
32 /** Determines order if comparing columns are equal.
33 * Order is determined by number of cursor.
34 *
35 * Cursor number (always?) equals to number of merging part.
36 * Therefore this field can be used to determine part number of current row (see ColumnGathererStream).
37 */
38 size_t order = 0;
39
40 using NeedCollationFlags = std::vector<UInt8>;
41
42 /** Should we use Collator to sort a column? */
43 NeedCollationFlags need_collation;
44
45 /** Is there at least one column with Collator. */
46 bool has_collation = false;
47
48 SortCursorImpl() {}
49
50 SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
51 : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
52 {
53 reset(block);
54 }
55
56 SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0)
57 : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
58 {
59 for (auto & column_desc : desc)
60 {
61 if (!column_desc.column_name.empty())
62 throw Exception("SortDesctiption should contain column position if SortCursor was used without header.",
63 ErrorCodes::LOGICAL_ERROR);
64 }
65 reset(columns, {});
66 }
67
68 bool empty() const { return rows == 0; }
69
70 /// Set the cursor to the beginning of the new block.
71 void reset(const Block & block)
72 {
73 reset(block.getColumns(), block);
74 }
75
76 /// Set the cursor to the beginning of the new block.
77 void reset(const Columns & columns, const Block & block)
78 {
79 all_columns.clear();
80 sort_columns.clear();
81
82 size_t num_columns = columns.size();
83
84 for (size_t j = 0; j < num_columns; ++j)
85 all_columns.push_back(columns[j].get());
86
87 for (size_t j = 0, size = desc.size(); j < size; ++j)
88 {
89 auto & column_desc = desc[j];
90 size_t column_number = !column_desc.column_name.empty()
91 ? block.getPositionByName(column_desc.column_name)
92 : column_desc.column_number;
93 sort_columns.push_back(columns[column_number].get());
94
95 need_collation[j] = desc[j].collator != nullptr && typeid_cast<const ColumnString *>(sort_columns.back()); /// TODO Nullable(String)
96 has_collation |= need_collation[j];
97 }
98
99 pos = 0;
100 rows = all_columns[0]->size();
101 }
102
103 bool isFirst() const { return pos == 0; }
104 bool isLast() const { return pos + 1 >= rows; }
105 bool isValid() const { return pos < rows; }
106 void next() { ++pos; }
107};
108
109using SortCursorImpls = std::vector<SortCursorImpl>;
110
111
112/// For easy copying.
113struct SortCursor
114{
115 SortCursorImpl * impl;
116
117 SortCursor(SortCursorImpl * impl_) : impl(impl_) {}
118 SortCursorImpl * operator-> () { return impl; }
119 const SortCursorImpl * operator-> () const { return impl; }
120
121 /// The specified row of this cursor is greater than the specified row of another cursor.
122 bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
123 {
124 for (size_t i = 0; i < impl->sort_columns_size; ++i)
125 {
126 int direction = impl->desc[i].direction;
127 int nulls_direction = impl->desc[i].nulls_direction;
128 int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);
129 if (res > 0)
130 return true;
131 if (res < 0)
132 return false;
133 }
134 return impl->order > rhs.impl->order;
135 }
136
137 /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
138 bool totallyLessOrEquals(const SortCursor & rhs) const
139 {
140 if (impl->rows == 0 || rhs.impl->rows == 0)
141 return false;
142
143 /// The last row of this cursor is no larger than the first row of the another cursor.
144 return !greaterAt(rhs, impl->rows - 1, 0);
145 }
146
147 bool greater(const SortCursor & rhs) const
148 {
149 return greaterAt(rhs, impl->pos, rhs.impl->pos);
150 }
151
152 /// Inverted so that the priority queue elements are removed in ascending order.
153 bool operator< (const SortCursor & rhs) const
154 {
155 return greater(rhs);
156 }
157};
158
159
160/// Separate comparator for locale-sensitive string comparisons
161struct SortCursorWithCollation
162{
163 SortCursorImpl * impl;
164
165 SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {}
166 SortCursorImpl * operator-> () { return impl; }
167 const SortCursorImpl * operator-> () const { return impl; }
168
169 bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
170 {
171 for (size_t i = 0; i < impl->sort_columns_size; ++i)
172 {
173 int direction = impl->desc[i].direction;
174 int nulls_direction = impl->desc[i].nulls_direction;
175 int res;
176 if (impl->need_collation[i])
177 {
178 const ColumnString & column_string = assert_cast<const ColumnString &>(*impl->sort_columns[i]);
179 res = column_string.compareAtWithCollation(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), *impl->desc[i].collator);
180 }
181 else
182 res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);
183
184 res *= direction;
185 if (res > 0)
186 return true;
187 if (res < 0)
188 return false;
189 }
190 return impl->order > rhs.impl->order;
191 }
192
193 bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const
194 {
195 if (impl->rows == 0 || rhs.impl->rows == 0)
196 return false;
197
198 /// The last row of this cursor is no larger than the first row of the another cursor.
199 return !greaterAt(rhs, impl->rows - 1, 0);
200 }
201
202 bool greater(const SortCursorWithCollation & rhs) const
203 {
204 return greaterAt(rhs, impl->pos, rhs.impl->pos);
205 }
206
207 bool operator< (const SortCursorWithCollation & rhs) const
208 {
209 return greater(rhs);
210 }
211};
212
213
214/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams).
215 */
216template <typename Cursor>
217class SortingHeap
218{
219public:
220 SortingHeap() = default;
221
222 template <typename Cursors>
223 SortingHeap(Cursors & cursors)
224 {
225 size_t size = cursors.size();
226 queue.reserve(size);
227 for (size_t i = 0; i < size; ++i)
228 queue.emplace_back(&cursors[i]);
229 std::make_heap(queue.begin(), queue.end());
230 }
231
232 bool isValid() const { return !queue.empty(); }
233
234 Cursor & current() { return queue.front(); }
235
236 void next()
237 {
238 assert(isValid());
239
240 if (!current()->isLast())
241 {
242 current()->next();
243 updateTop();
244 }
245 else
246 removeTop();
247 }
248
249private:
250 using Container = std::vector<Cursor>;
251 Container queue;
252
253 /// This is adapted version of the function __sift_down from libc++.
254 /// Why cannot simply use std::priority_queue?
255 /// - because it doesn't support updating the top element and requires pop and push instead.
256 void updateTop()
257 {
258 size_t size = queue.size();
259 if (size < 2)
260 return;
261
262 size_t child_idx = 1;
263 auto begin = queue.begin();
264 auto child_it = begin + 1;
265
266 /// Right child exists and is greater than left child.
267 if (size > 2 && *child_it < *(child_it + 1))
268 {
269 ++child_it;
270 ++child_idx;
271 }
272
273 /// Check if we are in order.
274 if (*child_it < *begin)
275 return;
276
277 auto curr_it = begin;
278 auto top(std::move(*begin));
279 do
280 {
281 /// We are not in heap-order, swap the parent with it's largest child.
282 *curr_it = std::move(*child_it);
283 curr_it = child_it;
284
285 if ((size - 2) / 2 < child_idx)
286 break;
287
288 // recompute the child based off of the updated parent
289 child_idx = 2 * child_idx + 1;
290 child_it = begin + child_idx;
291
292 if ((child_idx + 1) < size && *child_it < *(child_it + 1))
293 {
294 /// Right child exists and is greater than left child.
295 ++child_it;
296 ++child_idx;
297 }
298
299 /// Check if we are in order.
300 } while (!(*child_it < top));
301 *curr_it = std::move(top);
302 }
303
304 void removeTop()
305 {
306 std::pop_heap(queue.begin(), queue.end());
307 queue.pop_back();
308 }
309};
310
311}
312