1#pragma once
2#include <Columns/IColumnUnique.h>
3#include <Columns/ReverseIndex.h>
4
5#include <Columns/ColumnVector.h>
6#include <Columns/ColumnNullable.h>
7#include <Columns/ColumnString.h>
8#include <Columns/ColumnFixedString.h>
9
10#include <DataTypes/DataTypeNullable.h>
11#include <DataTypes/NumberTraits.h>
12
13#include <Common/typeid_cast.h>
14#include <Common/assert_cast.h>
15#include <ext/range.h>
16
17#include <common/unaligned.h>
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int ILLEGAL_COLUMN;
26}
27
28template <typename ColumnType>
29class ColumnUnique final : public COWHelper<IColumnUnique, ColumnUnique<ColumnType>>
30{
31 friend class COWHelper<IColumnUnique, ColumnUnique<ColumnType>>;
32
33private:
34 explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable);
35 explicit ColumnUnique(const IDataType & type);
36 ColumnUnique(const ColumnUnique & other);
37
38public:
39 MutableColumnPtr cloneEmpty() const override;
40
41 const ColumnPtr & getNestedColumn() const override;
42 const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; }
43 bool nestedColumnIsNullable() const override { return is_nullable; }
44
45 size_t uniqueInsert(const Field & x) override;
46 size_t uniqueInsertFrom(const IColumn & src, size_t n) override;
47 MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override;
48 IColumnUnique::IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start, size_t length,
49 size_t max_dictionary_size) override;
50 size_t uniqueInsertData(const char * pos, size_t length) override;
51 size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override;
52
53 size_t getDefaultValueIndex() const override { return 0; }
54 size_t getNullValueIndex() const override;
55 size_t getNestedTypeDefaultValueIndex() const override { return is_nullable ? 1 : 0; }
56 bool canContainNulls() const override { return is_nullable; }
57
58 Field operator[](size_t n) const override { return (*getNestedColumn())[n]; }
59 void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); }
60 StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); }
61 StringRef getDataAtWithTerminatingZero(size_t n) const override
62 {
63 return getNestedColumn()->getDataAtWithTerminatingZero(n);
64 }
65 UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); }
66 UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
67 Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }
68 Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); }
69 Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
70 bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
71 bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
72 StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
73 void updateHashWithValue(size_t n, SipHash & hash_func) const override
74 {
75 return getNestedColumn()->updateHashWithValue(n, hash_func);
76 }
77
78 int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
79
80 void getExtremes(Field & min, Field & max) const override { column_holder->getExtremes(min, max); }
81 bool valuesHaveFixedSize() const override { return column_holder->valuesHaveFixedSize(); }
82 bool isFixedAndContiguous() const override { return column_holder->isFixedAndContiguous(); }
83 size_t sizeOfValueIfFixed() const override { return column_holder->sizeOfValueIfFixed(); }
84 bool isNumeric() const override { return column_holder->isNumeric(); }
85
86 size_t byteSize() const override { return column_holder->byteSize(); }
87 void protect() override { column_holder->protect(); }
88 size_t allocatedBytes() const override
89 {
90 return column_holder->allocatedBytes()
91 + index.allocatedBytes()
92 + (nested_null_mask ? nested_null_mask->allocatedBytes() : 0);
93 }
94 void forEachSubcolumn(IColumn::ColumnCallback callback) override
95 {
96 callback(column_holder);
97 index.setColumn(getRawColumnPtr());
98 if (is_nullable)
99 nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask);
100 }
101
102 bool structureEquals(const IColumn & rhs) const override
103 {
104 if (auto rhs_concrete = typeid_cast<const ColumnUnique *>(&rhs))
105 return column_holder->structureEquals(*rhs_concrete->column_holder);
106 return false;
107 }
108
109 const UInt64 * tryGetSavedHash() const override { return index.tryGetSavedHash(); }
110
111 UInt128 getHash() const override { return hash.getHash(*getRawColumnPtr()); }
112
113private:
114
115 IColumn::WrappedPtr column_holder;
116 bool is_nullable;
117 size_t size_of_value_if_fixed = 0;
118 ReverseIndex<UInt64, ColumnType> index;
119
120 /// For DataTypeNullable, stores null map.
121 IColumn::WrappedPtr nested_null_mask;
122 IColumn::WrappedPtr nested_column_nullable;
123
124 class IncrementalHash
125 {
126 private:
127 UInt128 hash;
128 std::atomic<size_t> num_added_rows;
129
130 std::mutex mutex;
131 public:
132 IncrementalHash() : num_added_rows(0) {}
133
134 UInt128 getHash(const ColumnType & column);
135 };
136
137 mutable IncrementalHash hash;
138
139 void createNullMask();
140 void updateNullMask();
141
142 static size_t numSpecialValues(bool is_nullable) { return is_nullable ? 2 : 1; }
143 size_t numSpecialValues() const { return numSpecialValues(is_nullable); }
144
145 ColumnType * getRawColumnPtr() { return assert_cast<ColumnType *>(column_holder.get()); }
146 const ColumnType * getRawColumnPtr() const { return assert_cast<const ColumnType *>(column_holder.get()); }
147
148 template <typename IndexType>
149 MutableColumnPtr uniqueInsertRangeImpl(
150 const IColumn & src,
151 size_t start,
152 size_t length,
153 size_t num_added_rows,
154 typename ColumnVector<IndexType>::MutablePtr && positions_column,
155 ReverseIndex<UInt64, ColumnType> * secondary_index,
156 size_t max_dictionary_size);
157};
158
159template <typename ColumnType>
160MutableColumnPtr ColumnUnique<ColumnType>::cloneEmpty() const
161{
162 return ColumnUnique<ColumnType>::create(column_holder->cloneResized(numSpecialValues()), is_nullable);
163}
164
165template <typename ColumnType>
166ColumnUnique<ColumnType>::ColumnUnique(const ColumnUnique & other)
167 : column_holder(other.column_holder)
168 , is_nullable(other.is_nullable)
169 , size_of_value_if_fixed (other.size_of_value_if_fixed)
170 , index(numSpecialValues(is_nullable), 0)
171{
172 index.setColumn(getRawColumnPtr());
173 createNullMask();
174}
175
176template <typename ColumnType>
177ColumnUnique<ColumnType>::ColumnUnique(const IDataType & type)
178 : is_nullable(type.isNullable())
179 , index(numSpecialValues(is_nullable), 0)
180{
181 const auto & holder_type = is_nullable ? *static_cast<const DataTypeNullable &>(type).getNestedType() : type;
182 column_holder = holder_type.createColumn()->cloneResized(numSpecialValues());
183 index.setColumn(getRawColumnPtr());
184 createNullMask();
185
186 if (column_holder->valuesHaveFixedSize())
187 size_of_value_if_fixed = column_holder->sizeOfValueIfFixed();
188}
189
190template <typename ColumnType>
191ColumnUnique<ColumnType>::ColumnUnique(MutableColumnPtr && holder, bool is_nullable_)
192 : column_holder(std::move(holder))
193 , is_nullable(is_nullable_)
194 , index(numSpecialValues(is_nullable_), 0)
195{
196 if (column_holder->size() < numSpecialValues())
197 throw Exception("Too small holder column for ColumnUnique.", ErrorCodes::ILLEGAL_COLUMN);
198 if (isColumnNullable(*column_holder))
199 throw Exception("Holder column for ColumnUnique can't be nullable.", ErrorCodes::ILLEGAL_COLUMN);
200
201 index.setColumn(getRawColumnPtr());
202 createNullMask();
203
204 if (column_holder->valuesHaveFixedSize())
205 size_of_value_if_fixed = column_holder->sizeOfValueIfFixed();
206}
207
208template <typename ColumnType>
209void ColumnUnique<ColumnType>::createNullMask()
210{
211 if (is_nullable)
212 {
213 size_t size = getRawColumnPtr()->size();
214 if (!nested_null_mask)
215 {
216 ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0));
217 null_mask->getData()[getNullValueIndex()] = 1;
218 nested_null_mask = std::move(null_mask);
219 nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask);
220 }
221 else
222 throw Exception("Null mask for ColumnUnique is already created.", ErrorCodes::LOGICAL_ERROR);
223 }
224}
225
226template <typename ColumnType>
227void ColumnUnique<ColumnType>::updateNullMask()
228{
229 if (is_nullable)
230 {
231 if (!nested_null_mask)
232 throw Exception("Null mask for ColumnUnique is was not created.", ErrorCodes::LOGICAL_ERROR);
233
234 size_t size = getRawColumnPtr()->size();
235
236 if (nested_null_mask->size() != size)
237 assert_cast<ColumnUInt8 &>(*nested_null_mask).getData().resize_fill(size);
238 }
239}
240
241template <typename ColumnType>
242const ColumnPtr & ColumnUnique<ColumnType>::getNestedColumn() const
243{
244 if (is_nullable)
245 return nested_column_nullable;
246
247 return column_holder;
248}
249
250template <typename ColumnType>
251size_t ColumnUnique<ColumnType>::getNullValueIndex() const
252{
253 if (!is_nullable)
254 throw Exception("ColumnUnique can't contain null values.", ErrorCodes::LOGICAL_ERROR);
255
256 return 0;
257}
258
259template <typename ColumnType>
260size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
261{
262 if (x.getType() == Field::Types::Null)
263 return getNullValueIndex();
264
265 if (size_of_value_if_fixed)
266 return uniqueInsertData(&x.reinterpret<char>(), size_of_value_if_fixed);
267
268 auto & val = x.get<String>();
269 return uniqueInsertData(val.data(), val.size());
270}
271
272template <typename ColumnType>
273size_t ColumnUnique<ColumnType>::uniqueInsertFrom(const IColumn & src, size_t n)
274{
275 if (is_nullable && src.isNullAt(n))
276 return getNullValueIndex();
277
278 if (auto * nullable = checkAndGetColumn<ColumnNullable>(src))
279 return uniqueInsertFrom(nullable->getNestedColumn(), n);
280
281 auto ref = src.getDataAt(n);
282 return uniqueInsertData(ref.data, ref.size);
283}
284
285template <typename ColumnType>
286size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t length)
287{
288 auto column = getRawColumnPtr();
289
290 if (column->getDataAt(getNestedTypeDefaultValueIndex()) == StringRef(pos, length))
291 return getNestedTypeDefaultValueIndex();
292
293 auto insertion_point = index.insert(StringRef(pos, length));
294
295 updateNullMask();
296
297 return insertion_point;
298}
299
300template <typename ColumnType>
301StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
302{
303 if (is_nullable)
304 {
305 static constexpr auto s = sizeof(UInt8);
306
307 auto pos = arena.allocContinue(s, begin);
308 UInt8 flag = (n == getNullValueIndex() ? 1 : 0);
309 unalignedStore<UInt8>(pos, flag);
310
311 if (n == getNullValueIndex())
312 return StringRef(pos, s);
313
314 auto nested_ref = column_holder->serializeValueIntoArena(n, arena, begin);
315
316 /// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
317 return StringRef(nested_ref.data - s, nested_ref.size + s);
318 }
319
320 return column_holder->serializeValueIntoArena(n, arena, begin);
321}
322
323template <typename ColumnType>
324size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos)
325{
326 if (is_nullable)
327 {
328 UInt8 val = *reinterpret_cast<const UInt8 *>(pos);
329 pos += sizeof(val);
330
331 if (val)
332 {
333 new_pos = pos;
334 return getNullValueIndex();
335 }
336 }
337
338 /// Numbers, FixedString
339 if (size_of_value_if_fixed)
340 {
341 new_pos = pos + size_of_value_if_fixed;
342 return uniqueInsertData(pos, size_of_value_if_fixed);
343 }
344
345 /// String
346 const size_t string_size = unalignedLoad<size_t>(pos);
347 pos += sizeof(string_size);
348 new_pos = pos + string_size;
349
350 /// -1 because of terminating zero
351 return uniqueInsertData(pos, string_size - 1);
352}
353
354template <typename ColumnType>
355int ColumnUnique<ColumnType>::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const
356{
357 if (is_nullable)
358 {
359 /// See ColumnNullable::compareAt
360 bool lval_is_null = n == getNullValueIndex();
361 bool rval_is_null = m == getNullValueIndex();
362
363 if (unlikely(lval_is_null || rval_is_null))
364 {
365 if (lval_is_null && rval_is_null)
366 return 0;
367 else
368 return lval_is_null ? nan_direction_hint : -nan_direction_hint;
369 }
370 }
371
372 auto & column_unique = static_cast<const IColumnUnique &>(rhs);
373 return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint);
374}
375
376template <typename IndexType>
377static void checkIndexes(const ColumnVector<IndexType> & indexes, size_t max_dictionary_size)
378{
379 auto & data = indexes.getData();
380 for (size_t i = 0; i < data.size(); ++i)
381 {
382 if (data[i] >= max_dictionary_size)
383 {
384 throw Exception("Found index " + toString(data[i]) + " at position " + toString(i)
385 + " which is grated or equal than dictionary size " + toString(max_dictionary_size),
386 ErrorCodes::LOGICAL_ERROR);
387 }
388 }
389}
390
391template <typename ColumnType>
392template <typename IndexType>
393MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl(
394 const IColumn & src,
395 size_t start,
396 size_t length,
397 size_t num_added_rows,
398 typename ColumnVector<IndexType>::MutablePtr && positions_column,
399 ReverseIndex<UInt64, ColumnType> * secondary_index,
400 size_t max_dictionary_size)
401{
402 const ColumnType * src_column;
403 const NullMap * null_map = nullptr;
404 auto & positions = positions_column->getData();
405
406 auto update_position = [&](UInt64 & next_position) -> MutableColumnPtr
407 {
408 constexpr auto next_size = NumberTraits::nextSize(sizeof(IndexType));
409 using SuperiorIndexType = typename NumberTraits::Construct<false, false, next_size>::Type;
410
411 ++next_position;
412
413 if (next_position > std::numeric_limits<IndexType>::max())
414 {
415 if (sizeof(SuperiorIndexType) == sizeof(IndexType))
416 throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()),
417 ErrorCodes::LOGICAL_ERROR);
418
419 auto expanded_column = ColumnVector<SuperiorIndexType>::create(length);
420 auto & expanded_data = expanded_column->getData();
421 for (size_t i = 0; i < num_added_rows; ++i)
422 expanded_data[i] = positions[i];
423
424 return uniqueInsertRangeImpl<SuperiorIndexType>(
425 src,
426 start,
427 length,
428 num_added_rows,
429 std::move(expanded_column),
430 secondary_index,
431 max_dictionary_size);
432 }
433
434 return nullptr;
435 };
436
437 if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(src))
438 {
439 src_column = typeid_cast<const ColumnType *>(&nullable_column->getNestedColumn());
440 null_map = &nullable_column->getNullMapData();
441 }
442 else
443 src_column = typeid_cast<const ColumnType *>(&src);
444
445 if (src_column == nullptr)
446 throw Exception("Invalid column type for ColumnUnique::insertRangeFrom. Expected " + column_holder->getName() +
447 ", got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
448
449 auto column = getRawColumnPtr();
450
451 UInt64 next_position = column->size();
452 if (secondary_index)
453 next_position += secondary_index->size();
454
455 auto insert_key = [&](const StringRef & ref, ReverseIndex<UInt64, ColumnType> & cur_index) -> MutableColumnPtr
456 {
457 auto inserted_pos = cur_index.insert(ref);
458 positions[num_added_rows] = inserted_pos;
459 if (inserted_pos == next_position)
460 return update_position(next_position);
461
462 return nullptr;
463 };
464
465 for (; num_added_rows < length; ++num_added_rows)
466 {
467 auto row = start + num_added_rows;
468
469 if (null_map && (*null_map)[row])
470 positions[num_added_rows] = getNullValueIndex();
471 else if (column->compareAt(getNestedTypeDefaultValueIndex(), row, *src_column, 1) == 0)
472 positions[num_added_rows] = getNestedTypeDefaultValueIndex();
473 else
474 {
475 auto ref = src_column->getDataAt(row);
476 MutableColumnPtr res = nullptr;
477
478 if (secondary_index && next_position >= max_dictionary_size)
479 {
480 auto insertion_point = index.getInsertionPoint(ref);
481 if (insertion_point == index.lastInsertionPoint())
482 res = insert_key(ref, *secondary_index);
483 else
484 positions[num_added_rows] = insertion_point;
485 }
486 else
487 res = insert_key(ref, index);
488
489 if (res)
490 return res;
491 }
492 }
493
494 // checkIndexes(*positions_column, column->size() + (overflowed_keys ? overflowed_keys->size() : 0));
495 return std::move(positions_column);
496}
497
498template <typename ColumnType>
499MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length)
500{
501 auto callForType = [this, &src, start, length](auto x) -> MutableColumnPtr
502 {
503 size_t size = getRawColumnPtr()->size();
504
505 using IndexType = decltype(x);
506 if (size <= std::numeric_limits<IndexType>::max())
507 {
508 auto positions = ColumnVector<IndexType>::create(length);
509 return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions), nullptr, 0);
510 }
511
512 return nullptr;
513 };
514
515 MutableColumnPtr positions_column;
516 if (!positions_column)
517 positions_column = callForType(UInt8());
518 if (!positions_column)
519 positions_column = callForType(UInt16());
520 if (!positions_column)
521 positions_column = callForType(UInt32());
522 if (!positions_column)
523 positions_column = callForType(UInt64());
524 if (!positions_column)
525 throw Exception("Can't find index type for ColumnUnique", ErrorCodes::LOGICAL_ERROR);
526
527 updateNullMask();
528
529 return positions_column;
530}
531
532template <typename ColumnType>
533IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWithOverflow(
534 const IColumn & src,
535 size_t start,
536 size_t length,
537 size_t max_dictionary_size)
538{
539 auto overflowed_keys = column_holder->cloneEmpty();
540 auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get());
541 if (!overflowed_keys_ptr)
542 throw Exception("Invalid keys type for ColumnUnique.", ErrorCodes::LOGICAL_ERROR);
543
544 auto callForType = [this, &src, start, length, overflowed_keys_ptr, max_dictionary_size](auto x) -> MutableColumnPtr
545 {
546 size_t size = getRawColumnPtr()->size();
547
548 using IndexType = decltype(x);
549 if (size <= std::numeric_limits<IndexType>::max())
550 {
551 auto positions = ColumnVector<IndexType>::create(length);
552 ReverseIndex<UInt64, ColumnType> secondary_index(0, max_dictionary_size);
553 secondary_index.setColumn(overflowed_keys_ptr);
554 return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions),
555 &secondary_index, max_dictionary_size);
556 }
557
558 return nullptr;
559 };
560
561 MutableColumnPtr positions_column;
562 if (!positions_column)
563 positions_column = callForType(UInt8());
564 if (!positions_column)
565 positions_column = callForType(UInt16());
566 if (!positions_column)
567 positions_column = callForType(UInt32());
568 if (!positions_column)
569 positions_column = callForType(UInt64());
570 if (!positions_column)
571 throw Exception("Can't find index type for ColumnUnique", ErrorCodes::LOGICAL_ERROR);
572
573 updateNullMask();
574
575 IColumnUnique::IndexesWithOverflow indexes_with_overflow;
576 indexes_with_overflow.indexes = std::move(positions_column);
577 indexes_with_overflow.overflowed_keys = std::move(overflowed_keys);
578 return indexes_with_overflow;
579}
580
581template <typename ColumnType>
582UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column)
583{
584 size_t column_size = column.size();
585 UInt128 cur_hash;
586
587 if (column_size != num_added_rows.load())
588 {
589 SipHash sip_hash;
590 for (size_t i = 0; i < column_size; ++i)
591 column.updateHashWithValue(i, sip_hash);
592
593 std::lock_guard lock(mutex);
594 sip_hash.get128(hash.low, hash.high);
595 cur_hash = hash;
596 num_added_rows.store(column_size);
597 }
598 else
599 {
600 std::lock_guard lock(mutex);
601 cur_hash = hash;
602 }
603
604 return cur_hash;
605}
606
607}
608