| 1 | #pragma once |
| 2 | #include <Columns/IColumnUnique.h> |
| 3 | #include <Columns/ReverseIndex.h> |
| 4 | |
| 5 | #include <Columns/ColumnVector.h> |
| 6 | #include <Columns/ColumnNullable.h> |
| 7 | #include <Columns/ColumnString.h> |
| 8 | #include <Columns/ColumnFixedString.h> |
| 9 | |
| 10 | #include <DataTypes/DataTypeNullable.h> |
| 11 | #include <DataTypes/NumberTraits.h> |
| 12 | |
| 13 | #include <Common/typeid_cast.h> |
| 14 | #include <Common/assert_cast.h> |
| 15 | #include <ext/range.h> |
| 16 | |
| 17 | #include <common/unaligned.h> |
| 18 | |
| 19 | |
| 20 | namespace DB |
| 21 | { |
| 22 | |
| 23 | namespace ErrorCodes |
| 24 | { |
| 25 | extern const int ILLEGAL_COLUMN; |
| 26 | } |
| 27 | |
| 28 | template <typename ColumnType> |
| 29 | class ColumnUnique final : public COWHelper<IColumnUnique, ColumnUnique<ColumnType>> |
| 30 | { |
| 31 | friend class COWHelper<IColumnUnique, ColumnUnique<ColumnType>>; |
| 32 | |
| 33 | private: |
| 34 | explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable); |
| 35 | explicit ColumnUnique(const IDataType & type); |
| 36 | ColumnUnique(const ColumnUnique & other); |
| 37 | |
| 38 | public: |
| 39 | MutableColumnPtr cloneEmpty() const override; |
| 40 | |
| 41 | const ColumnPtr & getNestedColumn() const override; |
| 42 | const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; } |
| 43 | bool nestedColumnIsNullable() const override { return is_nullable; } |
| 44 | |
| 45 | size_t uniqueInsert(const Field & x) override; |
| 46 | size_t uniqueInsertFrom(const IColumn & src, size_t n) override; |
| 47 | MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override; |
| 48 | IColumnUnique::IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start, size_t length, |
| 49 | size_t max_dictionary_size) override; |
| 50 | size_t uniqueInsertData(const char * pos, size_t length) override; |
| 51 | size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override; |
| 52 | |
| 53 | size_t getDefaultValueIndex() const override { return 0; } |
| 54 | size_t getNullValueIndex() const override; |
| 55 | size_t getNestedTypeDefaultValueIndex() const override { return is_nullable ? 1 : 0; } |
| 56 | bool canContainNulls() const override { return is_nullable; } |
| 57 | |
| 58 | Field operator[](size_t n) const override { return (*getNestedColumn())[n]; } |
| 59 | void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); } |
| 60 | StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); } |
| 61 | StringRef getDataAtWithTerminatingZero(size_t n) const override |
| 62 | { |
| 63 | return getNestedColumn()->getDataAtWithTerminatingZero(n); |
| 64 | } |
| 65 | UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); } |
| 66 | UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); } |
| 67 | Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); } |
| 68 | Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); } |
| 69 | Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); } |
| 70 | bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); } |
| 71 | bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); } |
| 72 | StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; |
| 73 | void updateHashWithValue(size_t n, SipHash & hash_func) const override |
| 74 | { |
| 75 | return getNestedColumn()->updateHashWithValue(n, hash_func); |
| 76 | } |
| 77 | |
| 78 | int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override; |
| 79 | |
| 80 | void getExtremes(Field & min, Field & max) const override { column_holder->getExtremes(min, max); } |
| 81 | bool valuesHaveFixedSize() const override { return column_holder->valuesHaveFixedSize(); } |
| 82 | bool isFixedAndContiguous() const override { return column_holder->isFixedAndContiguous(); } |
| 83 | size_t sizeOfValueIfFixed() const override { return column_holder->sizeOfValueIfFixed(); } |
| 84 | bool isNumeric() const override { return column_holder->isNumeric(); } |
| 85 | |
| 86 | size_t byteSize() const override { return column_holder->byteSize(); } |
| 87 | void protect() override { column_holder->protect(); } |
| 88 | size_t allocatedBytes() const override |
| 89 | { |
| 90 | return column_holder->allocatedBytes() |
| 91 | + index.allocatedBytes() |
| 92 | + (nested_null_mask ? nested_null_mask->allocatedBytes() : 0); |
| 93 | } |
| 94 | void forEachSubcolumn(IColumn::ColumnCallback callback) override |
| 95 | { |
| 96 | callback(column_holder); |
| 97 | index.setColumn(getRawColumnPtr()); |
| 98 | if (is_nullable) |
| 99 | nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask); |
| 100 | } |
| 101 | |
| 102 | bool structureEquals(const IColumn & rhs) const override |
| 103 | { |
| 104 | if (auto rhs_concrete = typeid_cast<const ColumnUnique *>(&rhs)) |
| 105 | return column_holder->structureEquals(*rhs_concrete->column_holder); |
| 106 | return false; |
| 107 | } |
| 108 | |
| 109 | const UInt64 * tryGetSavedHash() const override { return index.tryGetSavedHash(); } |
| 110 | |
| 111 | UInt128 getHash() const override { return hash.getHash(*getRawColumnPtr()); } |
| 112 | |
| 113 | private: |
| 114 | |
| 115 | IColumn::WrappedPtr column_holder; |
| 116 | bool is_nullable; |
| 117 | size_t size_of_value_if_fixed = 0; |
| 118 | ReverseIndex<UInt64, ColumnType> index; |
| 119 | |
| 120 | /// For DataTypeNullable, stores null map. |
| 121 | IColumn::WrappedPtr nested_null_mask; |
| 122 | IColumn::WrappedPtr nested_column_nullable; |
| 123 | |
| 124 | class IncrementalHash |
| 125 | { |
| 126 | private: |
| 127 | UInt128 hash; |
| 128 | std::atomic<size_t> num_added_rows; |
| 129 | |
| 130 | std::mutex mutex; |
| 131 | public: |
| 132 | IncrementalHash() : num_added_rows(0) {} |
| 133 | |
| 134 | UInt128 getHash(const ColumnType & column); |
| 135 | }; |
| 136 | |
| 137 | mutable IncrementalHash hash; |
| 138 | |
| 139 | void createNullMask(); |
| 140 | void updateNullMask(); |
| 141 | |
| 142 | static size_t numSpecialValues(bool is_nullable) { return is_nullable ? 2 : 1; } |
| 143 | size_t numSpecialValues() const { return numSpecialValues(is_nullable); } |
| 144 | |
| 145 | ColumnType * getRawColumnPtr() { return assert_cast<ColumnType *>(column_holder.get()); } |
| 146 | const ColumnType * getRawColumnPtr() const { return assert_cast<const ColumnType *>(column_holder.get()); } |
| 147 | |
| 148 | template <typename IndexType> |
| 149 | MutableColumnPtr uniqueInsertRangeImpl( |
| 150 | const IColumn & src, |
| 151 | size_t start, |
| 152 | size_t length, |
| 153 | size_t num_added_rows, |
| 154 | typename ColumnVector<IndexType>::MutablePtr && positions_column, |
| 155 | ReverseIndex<UInt64, ColumnType> * secondary_index, |
| 156 | size_t max_dictionary_size); |
| 157 | }; |
| 158 | |
| 159 | template <typename ColumnType> |
| 160 | MutableColumnPtr ColumnUnique<ColumnType>::cloneEmpty() const |
| 161 | { |
| 162 | return ColumnUnique<ColumnType>::create(column_holder->cloneResized(numSpecialValues()), is_nullable); |
| 163 | } |
| 164 | |
| 165 | template <typename ColumnType> |
| 166 | ColumnUnique<ColumnType>::ColumnUnique(const ColumnUnique & other) |
| 167 | : column_holder(other.column_holder) |
| 168 | , is_nullable(other.is_nullable) |
| 169 | , size_of_value_if_fixed (other.size_of_value_if_fixed) |
| 170 | , index(numSpecialValues(is_nullable), 0) |
| 171 | { |
| 172 | index.setColumn(getRawColumnPtr()); |
| 173 | createNullMask(); |
| 174 | } |
| 175 | |
| 176 | template <typename ColumnType> |
| 177 | ColumnUnique<ColumnType>::ColumnUnique(const IDataType & type) |
| 178 | : is_nullable(type.isNullable()) |
| 179 | , index(numSpecialValues(is_nullable), 0) |
| 180 | { |
| 181 | const auto & holder_type = is_nullable ? *static_cast<const DataTypeNullable &>(type).getNestedType() : type; |
| 182 | column_holder = holder_type.createColumn()->cloneResized(numSpecialValues()); |
| 183 | index.setColumn(getRawColumnPtr()); |
| 184 | createNullMask(); |
| 185 | |
| 186 | if (column_holder->valuesHaveFixedSize()) |
| 187 | size_of_value_if_fixed = column_holder->sizeOfValueIfFixed(); |
| 188 | } |
| 189 | |
| 190 | template <typename ColumnType> |
| 191 | ColumnUnique<ColumnType>::ColumnUnique(MutableColumnPtr && holder, bool is_nullable_) |
| 192 | : column_holder(std::move(holder)) |
| 193 | , is_nullable(is_nullable_) |
| 194 | , index(numSpecialValues(is_nullable_), 0) |
| 195 | { |
| 196 | if (column_holder->size() < numSpecialValues()) |
| 197 | throw Exception("Too small holder column for ColumnUnique." , ErrorCodes::ILLEGAL_COLUMN); |
| 198 | if (isColumnNullable(*column_holder)) |
| 199 | throw Exception("Holder column for ColumnUnique can't be nullable." , ErrorCodes::ILLEGAL_COLUMN); |
| 200 | |
| 201 | index.setColumn(getRawColumnPtr()); |
| 202 | createNullMask(); |
| 203 | |
| 204 | if (column_holder->valuesHaveFixedSize()) |
| 205 | size_of_value_if_fixed = column_holder->sizeOfValueIfFixed(); |
| 206 | } |
| 207 | |
| 208 | template <typename ColumnType> |
| 209 | void ColumnUnique<ColumnType>::createNullMask() |
| 210 | { |
| 211 | if (is_nullable) |
| 212 | { |
| 213 | size_t size = getRawColumnPtr()->size(); |
| 214 | if (!nested_null_mask) |
| 215 | { |
| 216 | ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0)); |
| 217 | null_mask->getData()[getNullValueIndex()] = 1; |
| 218 | nested_null_mask = std::move(null_mask); |
| 219 | nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask); |
| 220 | } |
| 221 | else |
| 222 | throw Exception("Null mask for ColumnUnique is already created." , ErrorCodes::LOGICAL_ERROR); |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | template <typename ColumnType> |
| 227 | void ColumnUnique<ColumnType>::updateNullMask() |
| 228 | { |
| 229 | if (is_nullable) |
| 230 | { |
| 231 | if (!nested_null_mask) |
| 232 | throw Exception("Null mask for ColumnUnique is was not created." , ErrorCodes::LOGICAL_ERROR); |
| 233 | |
| 234 | size_t size = getRawColumnPtr()->size(); |
| 235 | |
| 236 | if (nested_null_mask->size() != size) |
| 237 | assert_cast<ColumnUInt8 &>(*nested_null_mask).getData().resize_fill(size); |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | template <typename ColumnType> |
| 242 | const ColumnPtr & ColumnUnique<ColumnType>::getNestedColumn() const |
| 243 | { |
| 244 | if (is_nullable) |
| 245 | return nested_column_nullable; |
| 246 | |
| 247 | return column_holder; |
| 248 | } |
| 249 | |
| 250 | template <typename ColumnType> |
| 251 | size_t ColumnUnique<ColumnType>::getNullValueIndex() const |
| 252 | { |
| 253 | if (!is_nullable) |
| 254 | throw Exception("ColumnUnique can't contain null values." , ErrorCodes::LOGICAL_ERROR); |
| 255 | |
| 256 | return 0; |
| 257 | } |
| 258 | |
| 259 | template <typename ColumnType> |
| 260 | size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x) |
| 261 | { |
| 262 | if (x.getType() == Field::Types::Null) |
| 263 | return getNullValueIndex(); |
| 264 | |
| 265 | if (size_of_value_if_fixed) |
| 266 | return uniqueInsertData(&x.reinterpret<char>(), size_of_value_if_fixed); |
| 267 | |
| 268 | auto & val = x.get<String>(); |
| 269 | return uniqueInsertData(val.data(), val.size()); |
| 270 | } |
| 271 | |
| 272 | template <typename ColumnType> |
| 273 | size_t ColumnUnique<ColumnType>::uniqueInsertFrom(const IColumn & src, size_t n) |
| 274 | { |
| 275 | if (is_nullable && src.isNullAt(n)) |
| 276 | return getNullValueIndex(); |
| 277 | |
| 278 | if (auto * nullable = checkAndGetColumn<ColumnNullable>(src)) |
| 279 | return uniqueInsertFrom(nullable->getNestedColumn(), n); |
| 280 | |
| 281 | auto ref = src.getDataAt(n); |
| 282 | return uniqueInsertData(ref.data, ref.size); |
| 283 | } |
| 284 | |
| 285 | template <typename ColumnType> |
| 286 | size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t length) |
| 287 | { |
| 288 | auto column = getRawColumnPtr(); |
| 289 | |
| 290 | if (column->getDataAt(getNestedTypeDefaultValueIndex()) == StringRef(pos, length)) |
| 291 | return getNestedTypeDefaultValueIndex(); |
| 292 | |
| 293 | auto insertion_point = index.insert(StringRef(pos, length)); |
| 294 | |
| 295 | updateNullMask(); |
| 296 | |
| 297 | return insertion_point; |
| 298 | } |
| 299 | |
| 300 | template <typename ColumnType> |
| 301 | StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const |
| 302 | { |
| 303 | if (is_nullable) |
| 304 | { |
| 305 | static constexpr auto s = sizeof(UInt8); |
| 306 | |
| 307 | auto pos = arena.allocContinue(s, begin); |
| 308 | UInt8 flag = (n == getNullValueIndex() ? 1 : 0); |
| 309 | unalignedStore<UInt8>(pos, flag); |
| 310 | |
| 311 | if (n == getNullValueIndex()) |
| 312 | return StringRef(pos, s); |
| 313 | |
| 314 | auto nested_ref = column_holder->serializeValueIntoArena(n, arena, begin); |
| 315 | |
| 316 | /// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back. |
| 317 | return StringRef(nested_ref.data - s, nested_ref.size + s); |
| 318 | } |
| 319 | |
| 320 | return column_holder->serializeValueIntoArena(n, arena, begin); |
| 321 | } |
| 322 | |
| 323 | template <typename ColumnType> |
| 324 | size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) |
| 325 | { |
| 326 | if (is_nullable) |
| 327 | { |
| 328 | UInt8 val = *reinterpret_cast<const UInt8 *>(pos); |
| 329 | pos += sizeof(val); |
| 330 | |
| 331 | if (val) |
| 332 | { |
| 333 | new_pos = pos; |
| 334 | return getNullValueIndex(); |
| 335 | } |
| 336 | } |
| 337 | |
| 338 | /// Numbers, FixedString |
| 339 | if (size_of_value_if_fixed) |
| 340 | { |
| 341 | new_pos = pos + size_of_value_if_fixed; |
| 342 | return uniqueInsertData(pos, size_of_value_if_fixed); |
| 343 | } |
| 344 | |
| 345 | /// String |
| 346 | const size_t string_size = unalignedLoad<size_t>(pos); |
| 347 | pos += sizeof(string_size); |
| 348 | new_pos = pos + string_size; |
| 349 | |
| 350 | /// -1 because of terminating zero |
| 351 | return uniqueInsertData(pos, string_size - 1); |
| 352 | } |
| 353 | |
| 354 | template <typename ColumnType> |
| 355 | int ColumnUnique<ColumnType>::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const |
| 356 | { |
| 357 | if (is_nullable) |
| 358 | { |
| 359 | /// See ColumnNullable::compareAt |
| 360 | bool lval_is_null = n == getNullValueIndex(); |
| 361 | bool rval_is_null = m == getNullValueIndex(); |
| 362 | |
| 363 | if (unlikely(lval_is_null || rval_is_null)) |
| 364 | { |
| 365 | if (lval_is_null && rval_is_null) |
| 366 | return 0; |
| 367 | else |
| 368 | return lval_is_null ? nan_direction_hint : -nan_direction_hint; |
| 369 | } |
| 370 | } |
| 371 | |
| 372 | auto & column_unique = static_cast<const IColumnUnique &>(rhs); |
| 373 | return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint); |
| 374 | } |
| 375 | |
| 376 | template <typename IndexType> |
| 377 | static void checkIndexes(const ColumnVector<IndexType> & indexes, size_t max_dictionary_size) |
| 378 | { |
| 379 | auto & data = indexes.getData(); |
| 380 | for (size_t i = 0; i < data.size(); ++i) |
| 381 | { |
| 382 | if (data[i] >= max_dictionary_size) |
| 383 | { |
| 384 | throw Exception("Found index " + toString(data[i]) + " at position " + toString(i) |
| 385 | + " which is grated or equal than dictionary size " + toString(max_dictionary_size), |
| 386 | ErrorCodes::LOGICAL_ERROR); |
| 387 | } |
| 388 | } |
| 389 | } |
| 390 | |
| 391 | template <typename ColumnType> |
| 392 | template <typename IndexType> |
| 393 | MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl( |
| 394 | const IColumn & src, |
| 395 | size_t start, |
| 396 | size_t length, |
| 397 | size_t num_added_rows, |
| 398 | typename ColumnVector<IndexType>::MutablePtr && positions_column, |
| 399 | ReverseIndex<UInt64, ColumnType> * secondary_index, |
| 400 | size_t max_dictionary_size) |
| 401 | { |
| 402 | const ColumnType * src_column; |
| 403 | const NullMap * null_map = nullptr; |
| 404 | auto & positions = positions_column->getData(); |
| 405 | |
| 406 | auto update_position = [&](UInt64 & next_position) -> MutableColumnPtr |
| 407 | { |
| 408 | constexpr auto next_size = NumberTraits::nextSize(sizeof(IndexType)); |
| 409 | using SuperiorIndexType = typename NumberTraits::Construct<false, false, next_size>::Type; |
| 410 | |
| 411 | ++next_position; |
| 412 | |
| 413 | if (next_position > std::numeric_limits<IndexType>::max()) |
| 414 | { |
| 415 | if (sizeof(SuperiorIndexType) == sizeof(IndexType)) |
| 416 | throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()), |
| 417 | ErrorCodes::LOGICAL_ERROR); |
| 418 | |
| 419 | auto expanded_column = ColumnVector<SuperiorIndexType>::create(length); |
| 420 | auto & expanded_data = expanded_column->getData(); |
| 421 | for (size_t i = 0; i < num_added_rows; ++i) |
| 422 | expanded_data[i] = positions[i]; |
| 423 | |
| 424 | return uniqueInsertRangeImpl<SuperiorIndexType>( |
| 425 | src, |
| 426 | start, |
| 427 | length, |
| 428 | num_added_rows, |
| 429 | std::move(expanded_column), |
| 430 | secondary_index, |
| 431 | max_dictionary_size); |
| 432 | } |
| 433 | |
| 434 | return nullptr; |
| 435 | }; |
| 436 | |
| 437 | if (auto * nullable_column = checkAndGetColumn<ColumnNullable>(src)) |
| 438 | { |
| 439 | src_column = typeid_cast<const ColumnType *>(&nullable_column->getNestedColumn()); |
| 440 | null_map = &nullable_column->getNullMapData(); |
| 441 | } |
| 442 | else |
| 443 | src_column = typeid_cast<const ColumnType *>(&src); |
| 444 | |
| 445 | if (src_column == nullptr) |
| 446 | throw Exception("Invalid column type for ColumnUnique::insertRangeFrom. Expected " + column_holder->getName() + |
| 447 | ", got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN); |
| 448 | |
| 449 | auto column = getRawColumnPtr(); |
| 450 | |
| 451 | UInt64 next_position = column->size(); |
| 452 | if (secondary_index) |
| 453 | next_position += secondary_index->size(); |
| 454 | |
| 455 | auto insert_key = [&](const StringRef & ref, ReverseIndex<UInt64, ColumnType> & cur_index) -> MutableColumnPtr |
| 456 | { |
| 457 | auto inserted_pos = cur_index.insert(ref); |
| 458 | positions[num_added_rows] = inserted_pos; |
| 459 | if (inserted_pos == next_position) |
| 460 | return update_position(next_position); |
| 461 | |
| 462 | return nullptr; |
| 463 | }; |
| 464 | |
| 465 | for (; num_added_rows < length; ++num_added_rows) |
| 466 | { |
| 467 | auto row = start + num_added_rows; |
| 468 | |
| 469 | if (null_map && (*null_map)[row]) |
| 470 | positions[num_added_rows] = getNullValueIndex(); |
| 471 | else if (column->compareAt(getNestedTypeDefaultValueIndex(), row, *src_column, 1) == 0) |
| 472 | positions[num_added_rows] = getNestedTypeDefaultValueIndex(); |
| 473 | else |
| 474 | { |
| 475 | auto ref = src_column->getDataAt(row); |
| 476 | MutableColumnPtr res = nullptr; |
| 477 | |
| 478 | if (secondary_index && next_position >= max_dictionary_size) |
| 479 | { |
| 480 | auto insertion_point = index.getInsertionPoint(ref); |
| 481 | if (insertion_point == index.lastInsertionPoint()) |
| 482 | res = insert_key(ref, *secondary_index); |
| 483 | else |
| 484 | positions[num_added_rows] = insertion_point; |
| 485 | } |
| 486 | else |
| 487 | res = insert_key(ref, index); |
| 488 | |
| 489 | if (res) |
| 490 | return res; |
| 491 | } |
| 492 | } |
| 493 | |
| 494 | // checkIndexes(*positions_column, column->size() + (overflowed_keys ? overflowed_keys->size() : 0)); |
| 495 | return std::move(positions_column); |
| 496 | } |
| 497 | |
| 498 | template <typename ColumnType> |
| 499 | MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) |
| 500 | { |
| 501 | auto callForType = [this, &src, start, length](auto x) -> MutableColumnPtr |
| 502 | { |
| 503 | size_t size = getRawColumnPtr()->size(); |
| 504 | |
| 505 | using IndexType = decltype(x); |
| 506 | if (size <= std::numeric_limits<IndexType>::max()) |
| 507 | { |
| 508 | auto positions = ColumnVector<IndexType>::create(length); |
| 509 | return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions), nullptr, 0); |
| 510 | } |
| 511 | |
| 512 | return nullptr; |
| 513 | }; |
| 514 | |
| 515 | MutableColumnPtr positions_column; |
| 516 | if (!positions_column) |
| 517 | positions_column = callForType(UInt8()); |
| 518 | if (!positions_column) |
| 519 | positions_column = callForType(UInt16()); |
| 520 | if (!positions_column) |
| 521 | positions_column = callForType(UInt32()); |
| 522 | if (!positions_column) |
| 523 | positions_column = callForType(UInt64()); |
| 524 | if (!positions_column) |
| 525 | throw Exception("Can't find index type for ColumnUnique" , ErrorCodes::LOGICAL_ERROR); |
| 526 | |
| 527 | updateNullMask(); |
| 528 | |
| 529 | return positions_column; |
| 530 | } |
| 531 | |
| 532 | template <typename ColumnType> |
| 533 | IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWithOverflow( |
| 534 | const IColumn & src, |
| 535 | size_t start, |
| 536 | size_t length, |
| 537 | size_t max_dictionary_size) |
| 538 | { |
| 539 | auto overflowed_keys = column_holder->cloneEmpty(); |
| 540 | auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get()); |
| 541 | if (!overflowed_keys_ptr) |
| 542 | throw Exception("Invalid keys type for ColumnUnique." , ErrorCodes::LOGICAL_ERROR); |
| 543 | |
| 544 | auto callForType = [this, &src, start, length, overflowed_keys_ptr, max_dictionary_size](auto x) -> MutableColumnPtr |
| 545 | { |
| 546 | size_t size = getRawColumnPtr()->size(); |
| 547 | |
| 548 | using IndexType = decltype(x); |
| 549 | if (size <= std::numeric_limits<IndexType>::max()) |
| 550 | { |
| 551 | auto positions = ColumnVector<IndexType>::create(length); |
| 552 | ReverseIndex<UInt64, ColumnType> secondary_index(0, max_dictionary_size); |
| 553 | secondary_index.setColumn(overflowed_keys_ptr); |
| 554 | return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions), |
| 555 | &secondary_index, max_dictionary_size); |
| 556 | } |
| 557 | |
| 558 | return nullptr; |
| 559 | }; |
| 560 | |
| 561 | MutableColumnPtr positions_column; |
| 562 | if (!positions_column) |
| 563 | positions_column = callForType(UInt8()); |
| 564 | if (!positions_column) |
| 565 | positions_column = callForType(UInt16()); |
| 566 | if (!positions_column) |
| 567 | positions_column = callForType(UInt32()); |
| 568 | if (!positions_column) |
| 569 | positions_column = callForType(UInt64()); |
| 570 | if (!positions_column) |
| 571 | throw Exception("Can't find index type for ColumnUnique" , ErrorCodes::LOGICAL_ERROR); |
| 572 | |
| 573 | updateNullMask(); |
| 574 | |
| 575 | IColumnUnique::IndexesWithOverflow indexes_with_overflow; |
| 576 | indexes_with_overflow.indexes = std::move(positions_column); |
| 577 | indexes_with_overflow.overflowed_keys = std::move(overflowed_keys); |
| 578 | return indexes_with_overflow; |
| 579 | } |
| 580 | |
| 581 | template <typename ColumnType> |
| 582 | UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column) |
| 583 | { |
| 584 | size_t column_size = column.size(); |
| 585 | UInt128 cur_hash; |
| 586 | |
| 587 | if (column_size != num_added_rows.load()) |
| 588 | { |
| 589 | SipHash sip_hash; |
| 590 | for (size_t i = 0; i < column_size; ++i) |
| 591 | column.updateHashWithValue(i, sip_hash); |
| 592 | |
| 593 | std::lock_guard lock(mutex); |
| 594 | sip_hash.get128(hash.low, hash.high); |
| 595 | cur_hash = hash; |
| 596 | num_added_rows.store(column_size); |
| 597 | } |
| 598 | else |
| 599 | { |
| 600 | std::lock_guard lock(mutex); |
| 601 | cur_hash = hash; |
| 602 | } |
| 603 | |
| 604 | return cur_hash; |
| 605 | } |
| 606 | |
| 607 | } |
| 608 | |