1 | #pragma once |
2 | #include <Columns/IColumnUnique.h> |
3 | #include <Columns/ReverseIndex.h> |
4 | |
5 | #include <Columns/ColumnVector.h> |
6 | #include <Columns/ColumnNullable.h> |
7 | #include <Columns/ColumnString.h> |
8 | #include <Columns/ColumnFixedString.h> |
9 | |
10 | #include <DataTypes/DataTypeNullable.h> |
11 | #include <DataTypes/NumberTraits.h> |
12 | |
13 | #include <Common/typeid_cast.h> |
14 | #include <Common/assert_cast.h> |
15 | #include <ext/range.h> |
16 | |
17 | #include <common/unaligned.h> |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int ILLEGAL_COLUMN; |
26 | } |
27 | |
28 | template <typename ColumnType> |
29 | class ColumnUnique final : public COWHelper<IColumnUnique, ColumnUnique<ColumnType>> |
30 | { |
31 | friend class COWHelper<IColumnUnique, ColumnUnique<ColumnType>>; |
32 | |
33 | private: |
34 | explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable); |
35 | explicit ColumnUnique(const IDataType & type); |
36 | ColumnUnique(const ColumnUnique & other); |
37 | |
38 | public: |
39 | MutableColumnPtr cloneEmpty() const override; |
40 | |
41 | const ColumnPtr & getNestedColumn() const override; |
42 | const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; } |
43 | bool nestedColumnIsNullable() const override { return is_nullable; } |
44 | |
45 | size_t uniqueInsert(const Field & x) override; |
46 | size_t uniqueInsertFrom(const IColumn & src, size_t n) override; |
47 | MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override; |
48 | IColumnUnique::IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start, size_t length, |
49 | size_t max_dictionary_size) override; |
50 | size_t uniqueInsertData(const char * pos, size_t length) override; |
51 | size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override; |
52 | |
53 | size_t getDefaultValueIndex() const override { return 0; } |
54 | size_t getNullValueIndex() const override; |
55 | size_t getNestedTypeDefaultValueIndex() const override { return is_nullable ? 1 : 0; } |
56 | bool canContainNulls() const override { return is_nullable; } |
57 | |
58 | Field operator[](size_t n) const override { return (*getNestedColumn())[n]; } |
59 | void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); } |
60 | StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); } |
61 | StringRef getDataAtWithTerminatingZero(size_t n) const override |
62 | { |
63 | return getNestedColumn()->getDataAtWithTerminatingZero(n); |
64 | } |
65 | UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); } |
66 | UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); } |
67 | Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); } |
68 | Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); } |
69 | Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); } |
70 | bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); } |
71 | bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); } |
72 | StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; |
73 | void updateHashWithValue(size_t n, SipHash & hash_func) const override |
74 | { |
75 | return getNestedColumn()->updateHashWithValue(n, hash_func); |
76 | } |
77 | |
78 | int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override; |
79 | |
80 | void getExtremes(Field & min, Field & max) const override { column_holder->getExtremes(min, max); } |
81 | bool valuesHaveFixedSize() const override { return column_holder->valuesHaveFixedSize(); } |
82 | bool isFixedAndContiguous() const override { return column_holder->isFixedAndContiguous(); } |
83 | size_t sizeOfValueIfFixed() const override { return column_holder->sizeOfValueIfFixed(); } |
84 | bool isNumeric() const override { return column_holder->isNumeric(); } |
85 | |
86 | size_t byteSize() const override { return column_holder->byteSize(); } |
87 | void protect() override { column_holder->protect(); } |
88 | size_t allocatedBytes() const override |
89 | { |
90 | return column_holder->allocatedBytes() |
91 | + index.allocatedBytes() |
92 | + (nested_null_mask ? nested_null_mask->allocatedBytes() : 0); |
93 | } |
94 | void forEachSubcolumn(IColumn::ColumnCallback callback) override |
95 | { |
96 | callback(column_holder); |
97 | index.setColumn(getRawColumnPtr()); |
98 | if (is_nullable) |
99 | nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask); |
100 | } |
101 | |
102 | bool structureEquals(const IColumn & rhs) const override |
103 | { |
104 | if (auto rhs_concrete = typeid_cast<const ColumnUnique *>(&rhs)) |
105 | return column_holder->structureEquals(*rhs_concrete->column_holder); |
106 | return false; |
107 | } |
108 | |
109 | const UInt64 * tryGetSavedHash() const override { return index.tryGetSavedHash(); } |
110 | |
111 | UInt128 getHash() const override { return hash.getHash(*getRawColumnPtr()); } |
112 | |
113 | private: |
114 | |
115 | IColumn::WrappedPtr column_holder; |
116 | bool is_nullable; |
117 | size_t size_of_value_if_fixed = 0; |
118 | ReverseIndex<UInt64, ColumnType> index; |
119 | |
120 | /// For DataTypeNullable, stores null map. |
121 | IColumn::WrappedPtr nested_null_mask; |
122 | IColumn::WrappedPtr nested_column_nullable; |
123 | |
124 | class IncrementalHash |
125 | { |
126 | private: |
127 | UInt128 hash; |
128 | std::atomic<size_t> num_added_rows; |
129 | |
130 | std::mutex mutex; |
131 | public: |
132 | IncrementalHash() : num_added_rows(0) {} |
133 | |
134 | UInt128 getHash(const ColumnType & column); |
135 | }; |
136 | |
137 | mutable IncrementalHash hash; |
138 | |
139 | void createNullMask(); |
140 | void updateNullMask(); |
141 | |
142 | static size_t numSpecialValues(bool is_nullable) { return is_nullable ? 2 : 1; } |
143 | size_t numSpecialValues() const { return numSpecialValues(is_nullable); } |
144 | |
145 | ColumnType * getRawColumnPtr() { return assert_cast<ColumnType *>(column_holder.get()); } |
146 | const ColumnType * getRawColumnPtr() const { return assert_cast<const ColumnType *>(column_holder.get()); } |
147 | |
148 | template <typename IndexType> |
149 | MutableColumnPtr uniqueInsertRangeImpl( |
150 | const IColumn & src, |
151 | size_t start, |
152 | size_t length, |
153 | size_t num_added_rows, |
154 | typename ColumnVector<IndexType>::MutablePtr && positions_column, |
155 | ReverseIndex<UInt64, ColumnType> * secondary_index, |
156 | size_t max_dictionary_size); |
157 | }; |
158 | |
159 | template <typename ColumnType> |
160 | MutableColumnPtr ColumnUnique<ColumnType>::cloneEmpty() const |
161 | { |
162 | return ColumnUnique<ColumnType>::create(column_holder->cloneResized(numSpecialValues()), is_nullable); |
163 | } |
164 | |
165 | template <typename ColumnType> |
166 | ColumnUnique<ColumnType>::ColumnUnique(const ColumnUnique & other) |
167 | : column_holder(other.column_holder) |
168 | , is_nullable(other.is_nullable) |
169 | , size_of_value_if_fixed (other.size_of_value_if_fixed) |
170 | , index(numSpecialValues(is_nullable), 0) |
171 | { |
172 | index.setColumn(getRawColumnPtr()); |
173 | createNullMask(); |
174 | } |
175 | |
176 | template <typename ColumnType> |
177 | ColumnUnique<ColumnType>::ColumnUnique(const IDataType & type) |
178 | : is_nullable(type.isNullable()) |
179 | , index(numSpecialValues(is_nullable), 0) |
180 | { |
181 | const auto & holder_type = is_nullable ? *static_cast<const DataTypeNullable &>(type).getNestedType() : type; |
182 | column_holder = holder_type.createColumn()->cloneResized(numSpecialValues()); |
183 | index.setColumn(getRawColumnPtr()); |
184 | createNullMask(); |
185 | |
186 | if (column_holder->valuesHaveFixedSize()) |
187 | size_of_value_if_fixed = column_holder->sizeOfValueIfFixed(); |
188 | } |
189 | |
190 | template <typename ColumnType> |
191 | ColumnUnique<ColumnType>::ColumnUnique(MutableColumnPtr && holder, bool is_nullable_) |
192 | : column_holder(std::move(holder)) |
193 | , is_nullable(is_nullable_) |
194 | , index(numSpecialValues(is_nullable_), 0) |
195 | { |
196 | if (column_holder->size() < numSpecialValues()) |
197 | throw Exception("Too small holder column for ColumnUnique." , ErrorCodes::ILLEGAL_COLUMN); |
198 | if (isColumnNullable(*column_holder)) |
199 | throw Exception("Holder column for ColumnUnique can't be nullable." , ErrorCodes::ILLEGAL_COLUMN); |
200 | |
201 | index.setColumn(getRawColumnPtr()); |
202 | createNullMask(); |
203 | |
204 | if (column_holder->valuesHaveFixedSize()) |
205 | size_of_value_if_fixed = column_holder->sizeOfValueIfFixed(); |
206 | } |
207 | |
208 | template <typename ColumnType> |
209 | void ColumnUnique<ColumnType>::createNullMask() |
210 | { |
211 | if (is_nullable) |
212 | { |
213 | size_t size = getRawColumnPtr()->size(); |
214 | if (!nested_null_mask) |
215 | { |
216 | ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0)); |
217 | null_mask->getData()[getNullValueIndex()] = 1; |
218 | nested_null_mask = std::move(null_mask); |
219 | nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask); |
220 | } |
221 | else |
222 | throw Exception("Null mask for ColumnUnique is already created." , ErrorCodes::LOGICAL_ERROR); |
223 | } |
224 | } |
225 | |
226 | template <typename ColumnType> |
227 | void ColumnUnique<ColumnType>::updateNullMask() |
228 | { |
229 | if (is_nullable) |
230 | { |
231 | if (!nested_null_mask) |
232 | throw Exception("Null mask for ColumnUnique is was not created." , ErrorCodes::LOGICAL_ERROR); |
233 | |
234 | size_t size = getRawColumnPtr()->size(); |
235 | |
236 | if (nested_null_mask->size() != size) |
237 | assert_cast<ColumnUInt8 &>(*nested_null_mask).getData().resize_fill(size); |
238 | } |
239 | } |
240 | |
241 | template <typename ColumnType> |
242 | const ColumnPtr & ColumnUnique<ColumnType>::getNestedColumn() const |
243 | { |
244 | if (is_nullable) |
245 | return nested_column_nullable; |
246 | |
247 | return column_holder; |
248 | } |
249 | |
250 | template <typename ColumnType> |
251 | size_t ColumnUnique<ColumnType>::getNullValueIndex() const |
252 | { |
253 | if (!is_nullable) |
254 | throw Exception("ColumnUnique can't contain null values." , ErrorCodes::LOGICAL_ERROR); |
255 | |
256 | return 0; |
257 | } |
258 | |
259 | template <typename ColumnType> |
260 | size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x) |
261 | { |
262 | if (x.getType() == Field::Types::Null) |
263 | return getNullValueIndex(); |
264 | |
265 | if (size_of_value_if_fixed) |
266 | return uniqueInsertData(&x.reinterpret<char>(), size_of_value_if_fixed); |
267 | |
268 | auto & val = x.get<String>(); |
269 | return uniqueInsertData(val.data(), val.size()); |
270 | } |
271 | |
272 | template <typename ColumnType> |
273 | size_t ColumnUnique<ColumnType>::uniqueInsertFrom(const IColumn & src, size_t n) |
274 | { |
275 | if (is_nullable && src.isNullAt(n)) |
276 | return getNullValueIndex(); |
277 | |
278 | if (auto * nullable = checkAndGetColumn<ColumnNullable>(src)) |
279 | return uniqueInsertFrom(nullable->getNestedColumn(), n); |
280 | |
281 | auto ref = src.getDataAt(n); |
282 | return uniqueInsertData(ref.data, ref.size); |
283 | } |
284 | |
285 | template <typename ColumnType> |
286 | size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t length) |
287 | { |
288 | auto column = getRawColumnPtr(); |
289 | |
290 | if (column->getDataAt(getNestedTypeDefaultValueIndex()) == StringRef(pos, length)) |
291 | return getNestedTypeDefaultValueIndex(); |
292 | |
293 | auto insertion_point = index.insert(StringRef(pos, length)); |
294 | |
295 | updateNullMask(); |
296 | |
297 | return insertion_point; |
298 | } |
299 | |
300 | template <typename ColumnType> |
301 | StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const |
302 | { |
303 | if (is_nullable) |
304 | { |
305 | static constexpr auto s = sizeof(UInt8); |
306 | |
307 | auto pos = arena.allocContinue(s, begin); |
308 | UInt8 flag = (n == getNullValueIndex() ? 1 : 0); |
309 | unalignedStore<UInt8>(pos, flag); |
310 | |
311 | if (n == getNullValueIndex()) |
312 | return StringRef(pos, s); |
313 | |
314 | auto nested_ref = column_holder->serializeValueIntoArena(n, arena, begin); |
315 | |
316 | /// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back. |
317 | return StringRef(nested_ref.data - s, nested_ref.size + s); |
318 | } |
319 | |
320 | return column_holder->serializeValueIntoArena(n, arena, begin); |
321 | } |
322 | |
323 | template <typename ColumnType> |
324 | size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) |
325 | { |
326 | if (is_nullable) |
327 | { |
328 | UInt8 val = *reinterpret_cast<const UInt8 *>(pos); |
329 | pos += sizeof(val); |
330 | |
331 | if (val) |
332 | { |
333 | new_pos = pos; |
334 | return getNullValueIndex(); |
335 | } |
336 | } |
337 | |
338 | /// Numbers, FixedString |
339 | if (size_of_value_if_fixed) |
340 | { |
341 | new_pos = pos + size_of_value_if_fixed; |
342 | return uniqueInsertData(pos, size_of_value_if_fixed); |
343 | } |
344 | |
345 | /// String |
346 | const size_t string_size = unalignedLoad<size_t>(pos); |
347 | pos += sizeof(string_size); |
348 | new_pos = pos + string_size; |
349 | |
350 | /// -1 because of terminating zero |
351 | return uniqueInsertData(pos, string_size - 1); |
352 | } |
353 | |
354 | template <typename ColumnType> |
355 | int ColumnUnique<ColumnType>::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const |
356 | { |
357 | if (is_nullable) |
358 | { |
359 | /// See ColumnNullable::compareAt |
360 | bool lval_is_null = n == getNullValueIndex(); |
361 | bool rval_is_null = m == getNullValueIndex(); |
362 | |
363 | if (unlikely(lval_is_null || rval_is_null)) |
364 | { |
365 | if (lval_is_null && rval_is_null) |
366 | return 0; |
367 | else |
368 | return lval_is_null ? nan_direction_hint : -nan_direction_hint; |
369 | } |
370 | } |
371 | |
372 | auto & column_unique = static_cast<const IColumnUnique &>(rhs); |
373 | return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint); |
374 | } |
375 | |
376 | template <typename IndexType> |
377 | static void checkIndexes(const ColumnVector<IndexType> & indexes, size_t max_dictionary_size) |
378 | { |
379 | auto & data = indexes.getData(); |
380 | for (size_t i = 0; i < data.size(); ++i) |
381 | { |
382 | if (data[i] >= max_dictionary_size) |
383 | { |
384 | throw Exception("Found index " + toString(data[i]) + " at position " + toString(i) |
385 | + " which is grated or equal than dictionary size " + toString(max_dictionary_size), |
386 | ErrorCodes::LOGICAL_ERROR); |
387 | } |
388 | } |
389 | } |
390 | |
391 | template <typename ColumnType> |
392 | template <typename IndexType> |
393 | MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl( |
394 | const IColumn & src, |
395 | size_t start, |
396 | size_t length, |
397 | size_t num_added_rows, |
398 | typename ColumnVector<IndexType>::MutablePtr && positions_column, |
399 | ReverseIndex<UInt64, ColumnType> * secondary_index, |
400 | size_t max_dictionary_size) |
401 | { |
402 | const ColumnType * src_column; |
403 | const NullMap * null_map = nullptr; |
404 | auto & positions = positions_column->getData(); |
405 | |
406 | auto update_position = [&](UInt64 & next_position) -> MutableColumnPtr |
407 | { |
408 | constexpr auto next_size = NumberTraits::nextSize(sizeof(IndexType)); |
409 | using SuperiorIndexType = typename NumberTraits::Construct<false, false, next_size>::Type; |
410 | |
411 | ++next_position; |
412 | |
413 | if (next_position > std::numeric_limits<IndexType>::max()) |
414 | { |
415 | if (sizeof(SuperiorIndexType) == sizeof(IndexType)) |
416 | throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()), |
417 | ErrorCodes::LOGICAL_ERROR); |
418 | |
419 | auto expanded_column = ColumnVector<SuperiorIndexType>::create(length); |
420 | auto & expanded_data = expanded_column->getData(); |
421 | for (size_t i = 0; i < num_added_rows; ++i) |
422 | expanded_data[i] = positions[i]; |
423 | |
424 | return uniqueInsertRangeImpl<SuperiorIndexType>( |
425 | src, |
426 | start, |
427 | length, |
428 | num_added_rows, |
429 | std::move(expanded_column), |
430 | secondary_index, |
431 | max_dictionary_size); |
432 | } |
433 | |
434 | return nullptr; |
435 | }; |
436 | |
437 | if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(src)) |
438 | { |
439 | src_column = typeid_cast<const ColumnType *>(&nullable_column->getNestedColumn()); |
440 | null_map = &nullable_column->getNullMapData(); |
441 | } |
442 | else |
443 | src_column = typeid_cast<const ColumnType *>(&src); |
444 | |
445 | if (src_column == nullptr) |
446 | throw Exception("Invalid column type for ColumnUnique::insertRangeFrom. Expected " + column_holder->getName() + |
447 | ", got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN); |
448 | |
449 | auto column = getRawColumnPtr(); |
450 | |
451 | UInt64 next_position = column->size(); |
452 | if (secondary_index) |
453 | next_position += secondary_index->size(); |
454 | |
455 | auto insert_key = [&](const StringRef & ref, ReverseIndex<UInt64, ColumnType> & cur_index) -> MutableColumnPtr |
456 | { |
457 | auto inserted_pos = cur_index.insert(ref); |
458 | positions[num_added_rows] = inserted_pos; |
459 | if (inserted_pos == next_position) |
460 | return update_position(next_position); |
461 | |
462 | return nullptr; |
463 | }; |
464 | |
465 | for (; num_added_rows < length; ++num_added_rows) |
466 | { |
467 | auto row = start + num_added_rows; |
468 | |
469 | if (null_map && (*null_map)[row]) |
470 | positions[num_added_rows] = getNullValueIndex(); |
471 | else if (column->compareAt(getNestedTypeDefaultValueIndex(), row, *src_column, 1) == 0) |
472 | positions[num_added_rows] = getNestedTypeDefaultValueIndex(); |
473 | else |
474 | { |
475 | auto ref = src_column->getDataAt(row); |
476 | MutableColumnPtr res = nullptr; |
477 | |
478 | if (secondary_index && next_position >= max_dictionary_size) |
479 | { |
480 | auto insertion_point = index.getInsertionPoint(ref); |
481 | if (insertion_point == index.lastInsertionPoint()) |
482 | res = insert_key(ref, *secondary_index); |
483 | else |
484 | positions[num_added_rows] = insertion_point; |
485 | } |
486 | else |
487 | res = insert_key(ref, index); |
488 | |
489 | if (res) |
490 | return res; |
491 | } |
492 | } |
493 | |
494 | // checkIndexes(*positions_column, column->size() + (overflowed_keys ? overflowed_keys->size() : 0)); |
495 | return std::move(positions_column); |
496 | } |
497 | |
498 | template <typename ColumnType> |
499 | MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) |
500 | { |
501 | auto callForType = [this, &src, start, length](auto x) -> MutableColumnPtr |
502 | { |
503 | size_t size = getRawColumnPtr()->size(); |
504 | |
505 | using IndexType = decltype(x); |
506 | if (size <= std::numeric_limits<IndexType>::max()) |
507 | { |
508 | auto positions = ColumnVector<IndexType>::create(length); |
509 | return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions), nullptr, 0); |
510 | } |
511 | |
512 | return nullptr; |
513 | }; |
514 | |
515 | MutableColumnPtr positions_column; |
516 | if (!positions_column) |
517 | positions_column = callForType(UInt8()); |
518 | if (!positions_column) |
519 | positions_column = callForType(UInt16()); |
520 | if (!positions_column) |
521 | positions_column = callForType(UInt32()); |
522 | if (!positions_column) |
523 | positions_column = callForType(UInt64()); |
524 | if (!positions_column) |
525 | throw Exception("Can't find index type for ColumnUnique" , ErrorCodes::LOGICAL_ERROR); |
526 | |
527 | updateNullMask(); |
528 | |
529 | return positions_column; |
530 | } |
531 | |
532 | template <typename ColumnType> |
533 | IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWithOverflow( |
534 | const IColumn & src, |
535 | size_t start, |
536 | size_t length, |
537 | size_t max_dictionary_size) |
538 | { |
539 | auto overflowed_keys = column_holder->cloneEmpty(); |
540 | auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get()); |
541 | if (!overflowed_keys_ptr) |
542 | throw Exception("Invalid keys type for ColumnUnique." , ErrorCodes::LOGICAL_ERROR); |
543 | |
544 | auto callForType = [this, &src, start, length, overflowed_keys_ptr, max_dictionary_size](auto x) -> MutableColumnPtr |
545 | { |
546 | size_t size = getRawColumnPtr()->size(); |
547 | |
548 | using IndexType = decltype(x); |
549 | if (size <= std::numeric_limits<IndexType>::max()) |
550 | { |
551 | auto positions = ColumnVector<IndexType>::create(length); |
552 | ReverseIndex<UInt64, ColumnType> secondary_index(0, max_dictionary_size); |
553 | secondary_index.setColumn(overflowed_keys_ptr); |
554 | return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions), |
555 | &secondary_index, max_dictionary_size); |
556 | } |
557 | |
558 | return nullptr; |
559 | }; |
560 | |
561 | MutableColumnPtr positions_column; |
562 | if (!positions_column) |
563 | positions_column = callForType(UInt8()); |
564 | if (!positions_column) |
565 | positions_column = callForType(UInt16()); |
566 | if (!positions_column) |
567 | positions_column = callForType(UInt32()); |
568 | if (!positions_column) |
569 | positions_column = callForType(UInt64()); |
570 | if (!positions_column) |
571 | throw Exception("Can't find index type for ColumnUnique" , ErrorCodes::LOGICAL_ERROR); |
572 | |
573 | updateNullMask(); |
574 | |
575 | IColumnUnique::IndexesWithOverflow indexes_with_overflow; |
576 | indexes_with_overflow.indexes = std::move(positions_column); |
577 | indexes_with_overflow.overflowed_keys = std::move(overflowed_keys); |
578 | return indexes_with_overflow; |
579 | } |
580 | |
581 | template <typename ColumnType> |
582 | UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column) |
583 | { |
584 | size_t column_size = column.size(); |
585 | UInt128 cur_hash; |
586 | |
587 | if (column_size != num_added_rows.load()) |
588 | { |
589 | SipHash sip_hash; |
590 | for (size_t i = 0; i < column_size; ++i) |
591 | column.updateHashWithValue(i, sip_hash); |
592 | |
593 | std::lock_guard lock(mutex); |
594 | sip_hash.get128(hash.low, hash.high); |
595 | cur_hash = hash; |
596 | num_added_rows.store(column_size); |
597 | } |
598 | else |
599 | { |
600 | std::lock_guard lock(mutex); |
601 | cur_hash = hash; |
602 | } |
603 | |
604 | return cur_hash; |
605 | } |
606 | |
607 | } |
608 | |