| 1 | #include <Columns/ColumnLowCardinality.h> |
| 2 | #include <Columns/ColumnUnique.h> |
| 3 | #include <Columns/ColumnFixedString.h> |
| 4 | #include <Columns/ColumnsCommon.h> |
| 5 | #include <Common/HashTable/HashMap.h> |
| 6 | #include <Common/typeid_cast.h> |
| 7 | #include <Common/assert_cast.h> |
| 8 | #include <Core/Field.h> |
| 9 | #include <Core/TypeListNumber.h> |
| 10 | #include <DataTypes/DataTypeFactory.h> |
| 11 | #include <DataTypes/DataTypeLowCardinality.h> |
| 12 | #include <DataTypes/DataTypeNullable.h> |
| 13 | #include <DataTypes/DataTypeDate.h> |
| 14 | #include <DataTypes/DataTypeDateTime.h> |
| 15 | #include <Parsers/IAST.h> |
| 16 | |
| 17 | namespace DB |
| 18 | { |
| 19 | |
| 20 | namespace ErrorCodes |
| 21 | { |
| 22 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 23 | extern const int LOGICAL_ERROR; |
| 24 | extern const int ILLEGAL_TYPE_OF_ARGUMENT; |
| 25 | } |
| 26 | |
| 27 | namespace |
| 28 | { |
| 29 | const ColumnLowCardinality & getColumnLowCardinality(const IColumn & column) |
| 30 | { |
| 31 | return typeid_cast<const ColumnLowCardinality &>(column); |
| 32 | } |
| 33 | |
| 34 | ColumnLowCardinality & getColumnLowCardinality(IColumn & column) |
| 35 | { |
| 36 | return typeid_cast<ColumnLowCardinality &>(column); |
| 37 | } |
| 38 | } |
| 39 | |
| 40 | DataTypeLowCardinality::DataTypeLowCardinality(DataTypePtr dictionary_type_) |
| 41 | : dictionary_type(std::move(dictionary_type_)) |
| 42 | { |
| 43 | auto inner_type = dictionary_type; |
| 44 | if (dictionary_type->isNullable()) |
| 45 | inner_type = static_cast<const DataTypeNullable &>(*dictionary_type).getNestedType(); |
| 46 | |
| 47 | if (!inner_type->canBeInsideLowCardinality()) |
| 48 | throw Exception("DataTypeLowCardinality is supported only for numbers, strings, Date or DateTime, but got " |
| 49 | + dictionary_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); |
| 50 | } |
| 51 | |
| 52 | void DataTypeLowCardinality::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const |
| 53 | { |
| 54 | path.push_back(Substream::DictionaryKeys); |
| 55 | dictionary_type->enumerateStreams(callback, path); |
| 56 | path.back() = Substream::DictionaryIndexes; |
| 57 | callback(path); |
| 58 | path.pop_back(); |
| 59 | } |
| 60 | |
| 61 | struct KeysSerializationVersion |
| 62 | { |
| 63 | enum Value |
| 64 | { |
| 65 | /// Version is written at the start of <name.dict.bin>. |
| 66 | /// Dictionary is written as number N and N keys after them. |
| 67 | /// Dictionary can be shared for continuous range of granules, so some marks may point to the same position. |
| 68 | /// Shared dictionary is stored in state and is read once. |
| 69 | SharedDictionariesWithAdditionalKeys = 1, |
| 70 | }; |
| 71 | |
| 72 | Value value; |
| 73 | |
| 74 | static void checkVersion(UInt64 version) |
| 75 | { |
| 76 | if (version != SharedDictionariesWithAdditionalKeys) |
| 77 | throw Exception("Invalid version for DataTypeLowCardinality key column." , ErrorCodes::LOGICAL_ERROR); |
| 78 | } |
| 79 | |
| 80 | KeysSerializationVersion(UInt64 version) : value(static_cast<Value>(version)) { checkVersion(version); } |
| 81 | }; |
| 82 | |
| 83 | /// Version is stored at the start of each granule. It's used to store indexes type and flags. |
| 84 | struct IndexesSerializationType |
| 85 | { |
| 86 | using SerializationType = UInt64; |
| 87 | /// Need to read dictionary if it wasn't. |
| 88 | static constexpr SerializationType NeedGlobalDictionaryBit = 1u << 8u; |
| 89 | /// Need to read additional keys. Additional keys are stored before indexes as value N and N keys after them. |
| 90 | static constexpr SerializationType HasAdditionalKeysBit = 1u << 9u; |
| 91 | /// Need to update dictionary. It means that previous granule has different dictionary. |
| 92 | static constexpr SerializationType NeedUpdateDictionary = 1u << 10u; |
| 93 | |
| 94 | enum Type |
| 95 | { |
| 96 | TUInt8 = 0, |
| 97 | TUInt16, |
| 98 | TUInt32, |
| 99 | TUInt64, |
| 100 | }; |
| 101 | |
| 102 | Type type; |
| 103 | bool has_additional_keys; |
| 104 | bool need_global_dictionary; |
| 105 | bool need_update_dictionary; |
| 106 | |
| 107 | static constexpr SerializationType resetFlags(SerializationType type) |
| 108 | { |
| 109 | return type & (~(HasAdditionalKeysBit | NeedGlobalDictionaryBit | NeedUpdateDictionary)); |
| 110 | } |
| 111 | |
| 112 | static void checkType(SerializationType type) |
| 113 | { |
| 114 | UInt64 value = resetFlags(type); |
| 115 | if (value <= TUInt64) |
| 116 | return; |
| 117 | |
| 118 | throw Exception("Invalid type for DataTypeLowCardinality index column." , ErrorCodes::LOGICAL_ERROR); |
| 119 | } |
| 120 | |
| 121 | void serialize(WriteBuffer & buffer) const |
| 122 | { |
| 123 | SerializationType val = type; |
| 124 | if (has_additional_keys) |
| 125 | val |= HasAdditionalKeysBit; |
| 126 | if (need_global_dictionary) |
| 127 | val |= NeedGlobalDictionaryBit; |
| 128 | if (need_update_dictionary) |
| 129 | val |= NeedUpdateDictionary; |
| 130 | writeIntBinary(val, buffer); |
| 131 | } |
| 132 | |
| 133 | void deserialize(ReadBuffer & buffer) |
| 134 | { |
| 135 | SerializationType val; |
| 136 | readIntBinary(val, buffer); |
| 137 | checkType(val); |
| 138 | has_additional_keys = (val & HasAdditionalKeysBit) != 0; |
| 139 | need_global_dictionary = (val & NeedGlobalDictionaryBit) != 0; |
| 140 | need_update_dictionary = (val & NeedUpdateDictionary) != 0; |
| 141 | type = static_cast<Type>(resetFlags(val)); |
| 142 | } |
| 143 | |
| 144 | IndexesSerializationType(const IColumn & column, |
| 145 | bool has_additional_keys_, |
| 146 | bool need_global_dictionary_, |
| 147 | bool enumerate_dictionaries) |
| 148 | : has_additional_keys(has_additional_keys_) |
| 149 | , need_global_dictionary(need_global_dictionary_) |
| 150 | , need_update_dictionary(enumerate_dictionaries) |
| 151 | { |
| 152 | if (typeid_cast<const ColumnUInt8 *>(&column)) |
| 153 | type = TUInt8; |
| 154 | else if (typeid_cast<const ColumnUInt16 *>(&column)) |
| 155 | type = TUInt16; |
| 156 | else if (typeid_cast<const ColumnUInt32 *>(&column)) |
| 157 | type = TUInt32; |
| 158 | else if (typeid_cast<const ColumnUInt64 *>(&column)) |
| 159 | type = TUInt64; |
| 160 | else |
| 161 | throw Exception("Invalid Indexes column for IndexesSerializationType. Expected ColumnUInt*, got " |
| 162 | + column.getName(), ErrorCodes::LOGICAL_ERROR); |
| 163 | } |
| 164 | |
| 165 | DataTypePtr getDataType() const |
| 166 | { |
| 167 | if (type == TUInt8) |
| 168 | return std::make_shared<DataTypeUInt8>(); |
| 169 | if (type == TUInt16) |
| 170 | return std::make_shared<DataTypeUInt16>(); |
| 171 | if (type == TUInt32) |
| 172 | return std::make_shared<DataTypeUInt32>(); |
| 173 | if (type == TUInt64) |
| 174 | return std::make_shared<DataTypeUInt64>(); |
| 175 | |
| 176 | throw Exception("Can't create DataType from IndexesSerializationType." , ErrorCodes::LOGICAL_ERROR); |
| 177 | } |
| 178 | |
| 179 | IndexesSerializationType() = default; |
| 180 | }; |
| 181 | |
| 182 | struct SerializeStateLowCardinality : public IDataType::SerializeBinaryBulkState |
| 183 | { |
| 184 | KeysSerializationVersion key_version; |
| 185 | MutableColumnUniquePtr shared_dictionary; |
| 186 | |
| 187 | explicit SerializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} |
| 188 | }; |
| 189 | |
| 190 | struct DeserializeStateLowCardinality : public IDataType::DeserializeBinaryBulkState |
| 191 | { |
| 192 | KeysSerializationVersion key_version; |
| 193 | ColumnUniquePtr global_dictionary; |
| 194 | |
| 195 | IndexesSerializationType index_type; |
| 196 | ColumnPtr additional_keys; |
| 197 | ColumnPtr null_map; |
| 198 | UInt64 num_pending_rows = 0; |
| 199 | |
| 200 | /// If dictionary should be updated. |
| 201 | /// Can happen is some granules was skipped while reading from MergeTree. |
| 202 | /// We should store this flag in State because |
| 203 | /// in case of long block of empty arrays we may not need read dictionary at first reading. |
| 204 | bool need_update_dictionary = false; |
| 205 | |
| 206 | explicit DeserializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} |
| 207 | }; |
| 208 | |
| 209 | static SerializeStateLowCardinality * checkAndGetLowCardinalitySerializeState( |
| 210 | IDataType::SerializeBinaryBulkStatePtr & state) |
| 211 | { |
| 212 | if (!state) |
| 213 | throw Exception("Got empty state for DataTypeLowCardinality." , ErrorCodes::LOGICAL_ERROR); |
| 214 | |
| 215 | auto * low_cardinality_state = typeid_cast<SerializeStateLowCardinality *>(state.get()); |
| 216 | if (!low_cardinality_state) |
| 217 | { |
| 218 | auto & state_ref = *state; |
| 219 | throw Exception("Invalid SerializeBinaryBulkState for DataTypeLowCardinality. Expected: " |
| 220 | + demangle(typeid(SerializeStateLowCardinality).name()) + ", got " |
| 221 | + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); |
| 222 | } |
| 223 | |
| 224 | return low_cardinality_state; |
| 225 | } |
| 226 | |
| 227 | static DeserializeStateLowCardinality * checkAndGetLowCardinalityDeserializeState( |
| 228 | IDataType::DeserializeBinaryBulkStatePtr & state) |
| 229 | { |
| 230 | if (!state) |
| 231 | throw Exception("Got empty state for DataTypeLowCardinality." , ErrorCodes::LOGICAL_ERROR); |
| 232 | |
| 233 | auto * low_cardinality_state = typeid_cast<DeserializeStateLowCardinality *>(state.get()); |
| 234 | if (!low_cardinality_state) |
| 235 | { |
| 236 | auto & state_ref = *state; |
| 237 | throw Exception("Invalid DeserializeBinaryBulkState for DataTypeLowCardinality. Expected: " |
| 238 | + demangle(typeid(DeserializeStateLowCardinality).name()) + ", got " |
| 239 | + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); |
| 240 | } |
| 241 | |
| 242 | return low_cardinality_state; |
| 243 | } |
| 244 | |
| 245 | void DataTypeLowCardinality::serializeBinaryBulkStatePrefix( |
| 246 | SerializeBinaryBulkSettings & settings, |
| 247 | SerializeBinaryBulkStatePtr & state) const |
| 248 | { |
| 249 | settings.path.push_back(Substream::DictionaryKeys); |
| 250 | auto * stream = settings.getter(settings.path); |
| 251 | settings.path.pop_back(); |
| 252 | |
| 253 | if (!stream) |
| 254 | throw Exception("Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStatePrefix" , |
| 255 | ErrorCodes::LOGICAL_ERROR); |
| 256 | |
| 257 | /// Write version and create SerializeBinaryBulkState. |
| 258 | UInt64 key_version = KeysSerializationVersion::SharedDictionariesWithAdditionalKeys; |
| 259 | |
| 260 | writeIntBinary(key_version, *stream); |
| 261 | |
| 262 | state = std::make_shared<SerializeStateLowCardinality>(key_version); |
| 263 | } |
| 264 | |
| 265 | void DataTypeLowCardinality::serializeBinaryBulkStateSuffix( |
| 266 | SerializeBinaryBulkSettings & settings, |
| 267 | SerializeBinaryBulkStatePtr & state) const |
| 268 | { |
| 269 | auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); |
| 270 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); |
| 271 | |
| 272 | if (low_cardinality_state->shared_dictionary && settings.low_cardinality_max_dictionary_size) |
| 273 | { |
| 274 | auto nested_column = low_cardinality_state->shared_dictionary->getNestedNotNullableColumn(); |
| 275 | |
| 276 | settings.path.push_back(Substream::DictionaryKeys); |
| 277 | auto * stream = settings.getter(settings.path); |
| 278 | settings.path.pop_back(); |
| 279 | |
| 280 | if (!stream) |
| 281 | throw Exception("Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStateSuffix" , |
| 282 | ErrorCodes::LOGICAL_ERROR); |
| 283 | |
| 284 | UInt64 num_keys = nested_column->size(); |
| 285 | writeIntBinary(num_keys, *stream); |
| 286 | removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *stream, 0, num_keys); |
| 287 | low_cardinality_state->shared_dictionary = nullptr; |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | void DataTypeLowCardinality::deserializeBinaryBulkStatePrefix( |
| 292 | DeserializeBinaryBulkSettings & settings, |
| 293 | DeserializeBinaryBulkStatePtr & state) const |
| 294 | { |
| 295 | settings.path.push_back(Substream::DictionaryKeys); |
| 296 | auto * stream = settings.getter(settings.path); |
| 297 | settings.path.pop_back(); |
| 298 | |
| 299 | if (!stream) |
| 300 | return; |
| 301 | |
| 302 | UInt64 keys_version; |
| 303 | readIntBinary(keys_version, *stream); |
| 304 | |
| 305 | state = std::make_shared<DeserializeStateLowCardinality>(keys_version); |
| 306 | } |
| 307 | |
| 308 | namespace |
| 309 | { |
| 310 | template <typename T> |
| 311 | PaddedPODArray<T> * getIndexesData(IColumn & indexes) |
| 312 | { |
| 313 | auto * column = typeid_cast<ColumnVector<T> *>(&indexes); |
| 314 | if (column) |
| 315 | return &column->getData(); |
| 316 | |
| 317 | return nullptr; |
| 318 | } |
| 319 | |
| 320 | struct IndexMapsWithAdditionalKeys |
| 321 | { |
| 322 | MutableColumnPtr dictionary_map; |
| 323 | MutableColumnPtr additional_keys_map; |
| 324 | }; |
| 325 | |
| 326 | template <typename T> |
| 327 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeysRef(PaddedPODArray<T> & index, size_t dict_size) |
| 328 | { |
| 329 | PaddedPODArray<T> copy(index.cbegin(), index.cend()); |
| 330 | |
| 331 | HashMap<T, T> dict_map; |
| 332 | HashMap<T, T> add_keys_map; |
| 333 | |
| 334 | for (auto val : index) |
| 335 | { |
| 336 | if (val < dict_size) |
| 337 | dict_map.insert({val, dict_map.size()}); |
| 338 | else |
| 339 | add_keys_map.insert({val, add_keys_map.size()}); |
| 340 | } |
| 341 | |
| 342 | auto dictionary_map = ColumnVector<T>::create(dict_map.size()); |
| 343 | auto additional_keys_map = ColumnVector<T>::create(add_keys_map.size()); |
| 344 | auto & dict_data = dictionary_map->getData(); |
| 345 | auto & add_keys_data = additional_keys_map->getData(); |
| 346 | |
| 347 | for (auto val : dict_map) |
| 348 | dict_data[val.second] = val.first; |
| 349 | |
| 350 | for (auto val : add_keys_map) |
| 351 | add_keys_data[val.second] = val.first - dict_size; |
| 352 | |
| 353 | for (auto & val : index) |
| 354 | val = val < dict_size ? dict_map[val] |
| 355 | : add_keys_map[val] + dict_map.size(); |
| 356 | |
| 357 | for (size_t i = 0; i < index.size(); ++i) |
| 358 | { |
| 359 | T expected = index[i] < dict_data.size() ? dict_data[index[i]] |
| 360 | : add_keys_data[index[i] - dict_data.size()] + dict_size; |
| 361 | if (expected != copy[i]) |
| 362 | throw Exception("Expected " + toString(expected) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR); |
| 363 | |
| 364 | } |
| 365 | |
| 366 | return {std::move(dictionary_map), std::move(additional_keys_map)}; |
| 367 | } |
| 368 | |
| 369 | template <typename T> |
| 370 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(PaddedPODArray<T> & index, size_t dict_size) |
| 371 | { |
| 372 | T max_less_dict_size = 0; |
| 373 | T max_value = 0; |
| 374 | |
| 375 | auto size = index.size(); |
| 376 | if (size == 0) |
| 377 | return {ColumnVector<T>::create(), ColumnVector<T>::create()}; |
| 378 | |
| 379 | for (size_t i = 0; i < size; ++i) |
| 380 | { |
| 381 | auto val = index[i]; |
| 382 | if (val < dict_size) |
| 383 | max_less_dict_size = std::max(max_less_dict_size, val); |
| 384 | |
| 385 | max_value = std::max(max_value, val); |
| 386 | } |
| 387 | |
| 388 | auto map_size = UInt64(max_less_dict_size) + 1; |
| 389 | auto overflow_map_size = max_value >= dict_size ? (UInt64(max_value - dict_size) + 1) : 0; |
| 390 | PaddedPODArray<T> map(map_size, 0); |
| 391 | PaddedPODArray<T> overflow_map(overflow_map_size, 0); |
| 392 | |
| 393 | T zero_pos_value = 0; |
| 394 | T zero_pos_overflowed_value = 0; |
| 395 | UInt64 cur_pos = 0; |
| 396 | UInt64 cur_overflowed_pos = 0; |
| 397 | |
| 398 | for (size_t i = 0; i < size; ++i) |
| 399 | { |
| 400 | T val = index[i]; |
| 401 | if (val < dict_size) |
| 402 | { |
| 403 | if (cur_pos == 0) |
| 404 | { |
| 405 | zero_pos_value = val; |
| 406 | ++cur_pos; |
| 407 | } |
| 408 | else if (map[val] == 0 && val != zero_pos_value) |
| 409 | { |
| 410 | map[val] = cur_pos; |
| 411 | ++cur_pos; |
| 412 | } |
| 413 | } |
| 414 | else |
| 415 | { |
| 416 | T shifted_val = val - dict_size; |
| 417 | if (cur_overflowed_pos == 0) |
| 418 | { |
| 419 | zero_pos_overflowed_value = shifted_val; |
| 420 | ++cur_overflowed_pos; |
| 421 | } |
| 422 | else if (overflow_map[shifted_val] == 0 && shifted_val != zero_pos_overflowed_value) |
| 423 | { |
| 424 | overflow_map[shifted_val] = cur_overflowed_pos; |
| 425 | ++cur_overflowed_pos; |
| 426 | } |
| 427 | } |
| 428 | } |
| 429 | |
| 430 | auto dictionary_map = ColumnVector<T>::create(cur_pos); |
| 431 | auto additional_keys_map = ColumnVector<T>::create(cur_overflowed_pos); |
| 432 | auto & dict_data = dictionary_map->getData(); |
| 433 | auto & add_keys_data = additional_keys_map->getData(); |
| 434 | |
| 435 | for (size_t i = 0; i < map_size; ++i) |
| 436 | if (map[i]) |
| 437 | dict_data[map[i]] = static_cast<T>(i); |
| 438 | |
| 439 | for (size_t i = 0; i < overflow_map_size; ++i) |
| 440 | if (overflow_map[i]) |
| 441 | add_keys_data[overflow_map[i]] = static_cast<T>(i); |
| 442 | |
| 443 | if (cur_pos) |
| 444 | dict_data[0] = zero_pos_value; |
| 445 | if (cur_overflowed_pos) |
| 446 | add_keys_data[0] = zero_pos_overflowed_value; |
| 447 | |
| 448 | for (size_t i = 0; i < size; ++i) |
| 449 | { |
| 450 | T & val = index[i]; |
| 451 | if (val < dict_size) |
| 452 | val = map[val]; |
| 453 | else |
| 454 | val = overflow_map[val - dict_size] + cur_pos; |
| 455 | } |
| 456 | |
| 457 | return {std::move(dictionary_map), std::move(additional_keys_map)}; |
| 458 | } |
| 459 | |
| 460 | /// Update column and return map with old indexes. |
| 461 | /// Let N is the number of distinct values which are less than max_size; |
| 462 | /// old_column - column before function call; |
| 463 | /// new_column - column after function call: |
| 464 | /// * if old_column[i] < max_size, than |
| 465 | /// dictionary_map[new_column[i]] = old_column[i] |
| 466 | /// * else |
| 467 | /// additional_keys_map[new_column[i]] = old_column[i] - dict_size + N |
| 468 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(IColumn & column, size_t dict_size) |
| 469 | { |
| 470 | if (auto * data_uint8 = getIndexesData<UInt8>(column)) |
| 471 | return mapIndexWithAdditionalKeys(*data_uint8, dict_size); |
| 472 | else if (auto * data_uint16 = getIndexesData<UInt16>(column)) |
| 473 | return mapIndexWithAdditionalKeys(*data_uint16, dict_size); |
| 474 | else if (auto * data_uint32 = getIndexesData<UInt32>(column)) |
| 475 | return mapIndexWithAdditionalKeys(*data_uint32, dict_size); |
| 476 | else if (auto * data_uint64 = getIndexesData<UInt64>(column)) |
| 477 | return mapIndexWithAdditionalKeys(*data_uint64, dict_size); |
| 478 | else |
| 479 | throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got" + column.getName(), |
| 480 | ErrorCodes::LOGICAL_ERROR); |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams( |
| 485 | const IColumn & column, |
| 486 | size_t offset, |
| 487 | size_t limit, |
| 488 | SerializeBinaryBulkSettings & settings, |
| 489 | SerializeBinaryBulkStatePtr & state) const |
| 490 | { |
| 491 | settings.path.push_back(Substream::DictionaryKeys); |
| 492 | auto * keys_stream = settings.getter(settings.path); |
| 493 | settings.path.back() = Substream::DictionaryIndexes; |
| 494 | auto * indexes_stream = settings.getter(settings.path); |
| 495 | settings.path.pop_back(); |
| 496 | |
| 497 | if (!keys_stream && !indexes_stream) |
| 498 | return; |
| 499 | |
| 500 | if (!keys_stream) |
| 501 | throw Exception("Got empty stream for DataTypeLowCardinality keys." , ErrorCodes::LOGICAL_ERROR); |
| 502 | |
| 503 | if (!indexes_stream) |
| 504 | throw Exception("Got empty stream for DataTypeLowCardinality indexes." , ErrorCodes::LOGICAL_ERROR); |
| 505 | |
| 506 | const ColumnLowCardinality & low_cardinality_column = typeid_cast<const ColumnLowCardinality &>(column); |
| 507 | |
| 508 | auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); |
| 509 | auto & global_dictionary = low_cardinality_state->shared_dictionary; |
| 510 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); |
| 511 | |
| 512 | bool need_update_dictionary = global_dictionary == nullptr; |
| 513 | if (need_update_dictionary) |
| 514 | global_dictionary = createColumnUnique(*dictionary_type); |
| 515 | |
| 516 | size_t max_limit = column.size() - offset; |
| 517 | limit = limit ? std::min(limit, max_limit) : max_limit; |
| 518 | |
| 519 | /// Do not write anything for empty column. (May happen while writing empty arrays.) |
| 520 | if (limit == 0) |
| 521 | return; |
| 522 | |
| 523 | auto sub_column = low_cardinality_column.cutAndCompact(offset, limit); |
| 524 | ColumnPtr positions = sub_column->getIndexesPtr(); |
| 525 | ColumnPtr keys = sub_column->getDictionary().getNestedColumn(); |
| 526 | |
| 527 | if (settings.low_cardinality_max_dictionary_size) |
| 528 | { |
| 529 | /// Insert used_keys into global dictionary and update sub_index. |
| 530 | auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*keys, 0, keys->size(), |
| 531 | settings.low_cardinality_max_dictionary_size); |
| 532 | size_t max_size = settings.low_cardinality_max_dictionary_size + indexes_with_overflow.overflowed_keys->size(); |
| 533 | ColumnLowCardinality::Index(indexes_with_overflow.indexes->getPtr()).check(max_size); |
| 534 | |
| 535 | if (global_dictionary->size() > settings.low_cardinality_max_dictionary_size) |
| 536 | throw Exception("Got dictionary with size " + toString(global_dictionary->size()) + |
| 537 | " but max dictionary size is " + toString(settings.low_cardinality_max_dictionary_size), |
| 538 | ErrorCodes::LOGICAL_ERROR); |
| 539 | |
| 540 | positions = indexes_with_overflow.indexes->index(*positions, 0); |
| 541 | keys = std::move(indexes_with_overflow.overflowed_keys); |
| 542 | |
| 543 | if (global_dictionary->size() < settings.low_cardinality_max_dictionary_size && !keys->empty()) |
| 544 | throw Exception("Has additional keys, but dict size is " + toString(global_dictionary->size()) + |
| 545 | " which is less then max dictionary size (" + toString(settings.low_cardinality_max_dictionary_size) + ")" , |
| 546 | ErrorCodes::LOGICAL_ERROR); |
| 547 | } |
| 548 | |
| 549 | if (auto * nullable_keys = checkAndGetColumn<ColumnNullable>(*keys)) |
| 550 | keys = nullable_keys->getNestedColumnPtr(); |
| 551 | |
| 552 | bool need_additional_keys = !keys->empty(); |
| 553 | bool need_dictionary = settings.low_cardinality_max_dictionary_size != 0; |
| 554 | bool need_write_dictionary = !settings.low_cardinality_use_single_dictionary_for_part |
| 555 | && global_dictionary->size() >= settings.low_cardinality_max_dictionary_size; |
| 556 | |
| 557 | IndexesSerializationType index_version(*positions, need_additional_keys, need_dictionary, need_update_dictionary); |
| 558 | index_version.serialize(*indexes_stream); |
| 559 | |
| 560 | if (need_write_dictionary) |
| 561 | { |
| 562 | const auto & nested_column = global_dictionary->getNestedNotNullableColumn(); |
| 563 | UInt64 num_keys = nested_column->size(); |
| 564 | writeIntBinary(num_keys, *keys_stream); |
| 565 | removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *keys_stream, 0, num_keys); |
| 566 | low_cardinality_state->shared_dictionary = nullptr; |
| 567 | } |
| 568 | |
| 569 | if (need_additional_keys) |
| 570 | { |
| 571 | UInt64 num_keys = keys->size(); |
| 572 | writeIntBinary(num_keys, *indexes_stream); |
| 573 | removeNullable(dictionary_type)->serializeBinaryBulk(*keys, *indexes_stream, 0, num_keys); |
| 574 | } |
| 575 | |
| 576 | UInt64 num_rows = positions->size(); |
| 577 | writeIntBinary(num_rows, *indexes_stream); |
| 578 | index_version.getDataType()->serializeBinaryBulk(*positions, *indexes_stream, 0, num_rows); |
| 579 | } |
| 580 | |
| 581 | void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams( |
| 582 | IColumn & column, |
| 583 | size_t limit, |
| 584 | DeserializeBinaryBulkSettings & settings, |
| 585 | DeserializeBinaryBulkStatePtr & state) const |
| 586 | { |
| 587 | ColumnLowCardinality & low_cardinality_column = typeid_cast<ColumnLowCardinality &>(column); |
| 588 | |
| 589 | settings.path.push_back(Substream::DictionaryKeys); |
| 590 | auto * keys_stream = settings.getter(settings.path); |
| 591 | settings.path.back() = Substream::DictionaryIndexes; |
| 592 | auto * indexes_stream = settings.getter(settings.path); |
| 593 | settings.path.pop_back(); |
| 594 | |
| 595 | if (!keys_stream && !indexes_stream) |
| 596 | return; |
| 597 | |
| 598 | if (!keys_stream) |
| 599 | throw Exception("Got empty stream for DataTypeLowCardinality keys." , ErrorCodes::LOGICAL_ERROR); |
| 600 | |
| 601 | if (!indexes_stream) |
| 602 | throw Exception("Got empty stream for DataTypeLowCardinality indexes." , ErrorCodes::LOGICAL_ERROR); |
| 603 | |
| 604 | auto * low_cardinality_state = checkAndGetLowCardinalityDeserializeState(state); |
| 605 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); |
| 606 | |
| 607 | auto readDictionary = [this, low_cardinality_state, keys_stream]() |
| 608 | { |
| 609 | UInt64 num_keys; |
| 610 | readIntBinary(num_keys, *keys_stream); |
| 611 | |
| 612 | auto keys_type = removeNullable(dictionary_type); |
| 613 | auto global_dict_keys = keys_type->createColumn(); |
| 614 | keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0); |
| 615 | |
| 616 | auto column_unique = createColumnUnique(*dictionary_type, std::move(global_dict_keys)); |
| 617 | low_cardinality_state->global_dictionary = std::move(column_unique); |
| 618 | }; |
| 619 | |
| 620 | auto readAdditionalKeys = [this, low_cardinality_state, indexes_stream]() |
| 621 | { |
| 622 | UInt64 num_keys; |
| 623 | readIntBinary(num_keys, *indexes_stream); |
| 624 | auto keys_type = removeNullable(dictionary_type); |
| 625 | auto additional_keys = keys_type->createColumn(); |
| 626 | keys_type->deserializeBinaryBulk(*additional_keys, *indexes_stream, num_keys, 0); |
| 627 | low_cardinality_state->additional_keys = std::move(additional_keys); |
| 628 | |
| 629 | if (!low_cardinality_state->index_type.need_global_dictionary && dictionary_type->isNullable()) |
| 630 | { |
| 631 | auto null_map = ColumnUInt8::create(num_keys, 0); |
| 632 | if (num_keys) |
| 633 | null_map->getElement(0) = 1; |
| 634 | |
| 635 | low_cardinality_state->null_map = std::move(null_map); |
| 636 | } |
| 637 | }; |
| 638 | |
| 639 | auto readIndexes = [this, low_cardinality_state, indexes_stream, &low_cardinality_column](UInt64 num_rows) |
| 640 | { |
| 641 | auto indexes_type = low_cardinality_state->index_type.getDataType(); |
| 642 | MutableColumnPtr indexes_column = indexes_type->createColumn(); |
| 643 | indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0); |
| 644 | |
| 645 | auto & global_dictionary = low_cardinality_state->global_dictionary; |
| 646 | const auto & additional_keys = low_cardinality_state->additional_keys; |
| 647 | |
| 648 | bool has_additional_keys = low_cardinality_state->index_type.has_additional_keys; |
| 649 | bool column_is_empty = low_cardinality_column.empty(); |
| 650 | |
| 651 | if (!low_cardinality_state->index_type.need_global_dictionary) |
| 652 | { |
| 653 | ColumnPtr keys_column = additional_keys; |
| 654 | if (low_cardinality_state->null_map) |
| 655 | keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map); |
| 656 | low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*keys_column, *indexes_column); |
| 657 | } |
| 658 | else if (!has_additional_keys) |
| 659 | { |
| 660 | if (column_is_empty) |
| 661 | low_cardinality_column.setSharedDictionary(global_dictionary); |
| 662 | |
| 663 | auto local_column = ColumnLowCardinality::create(global_dictionary, std::move(indexes_column)); |
| 664 | low_cardinality_column.insertRangeFrom(*local_column, 0, num_rows); |
| 665 | } |
| 666 | else |
| 667 | { |
| 668 | auto maps = mapIndexWithAdditionalKeys(*indexes_column, global_dictionary->size()); |
| 669 | |
| 670 | ColumnLowCardinality::Index(maps.additional_keys_map->getPtr()).check(additional_keys->size()); |
| 671 | |
| 672 | ColumnLowCardinality::Index(indexes_column->getPtr()).check( |
| 673 | maps.dictionary_map->size() + maps.additional_keys_map->size()); |
| 674 | |
| 675 | auto used_keys = (*std::move(global_dictionary->getNestedColumn()->index(*maps.dictionary_map, 0))).mutate(); |
| 676 | |
| 677 | if (!maps.additional_keys_map->empty()) |
| 678 | { |
| 679 | auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0); |
| 680 | |
| 681 | if (dictionary_type->isNullable()) |
| 682 | { |
| 683 | ColumnPtr null_map = ColumnUInt8::create(used_add_keys->size(), 0); |
| 684 | used_add_keys = ColumnNullable::create(used_add_keys, null_map); |
| 685 | } |
| 686 | |
| 687 | used_keys->insertRangeFrom(*used_add_keys, 0, used_add_keys->size()); |
| 688 | } |
| 689 | |
| 690 | low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*used_keys, *indexes_column); |
| 691 | } |
| 692 | }; |
| 693 | |
| 694 | if (!settings.continuous_reading) |
| 695 | { |
| 696 | low_cardinality_state->num_pending_rows = 0; |
| 697 | |
| 698 | /// Remember in state that some granules were skipped and we need to update dictionary. |
| 699 | low_cardinality_state->need_update_dictionary = true; |
| 700 | } |
| 701 | |
| 702 | while (limit) |
| 703 | { |
| 704 | if (low_cardinality_state->num_pending_rows == 0) |
| 705 | { |
| 706 | if (indexes_stream->eof()) |
| 707 | break; |
| 708 | |
| 709 | auto & index_type = low_cardinality_state->index_type; |
| 710 | auto & global_dictionary = low_cardinality_state->global_dictionary; |
| 711 | |
| 712 | index_type.deserialize(*indexes_stream); |
| 713 | |
| 714 | bool need_update_dictionary = |
| 715 | !global_dictionary || index_type.need_update_dictionary || low_cardinality_state->need_update_dictionary; |
| 716 | if (index_type.need_global_dictionary && need_update_dictionary) |
| 717 | { |
| 718 | readDictionary(); |
| 719 | low_cardinality_state->need_update_dictionary = false; |
| 720 | } |
| 721 | |
| 722 | if (low_cardinality_state->index_type.has_additional_keys) |
| 723 | readAdditionalKeys(); |
| 724 | else |
| 725 | low_cardinality_state->additional_keys = nullptr; |
| 726 | |
| 727 | readIntBinary(low_cardinality_state->num_pending_rows, *indexes_stream); |
| 728 | } |
| 729 | |
| 730 | size_t num_rows_to_read = std::min<UInt64>(limit, low_cardinality_state->num_pending_rows); |
| 731 | readIndexes(num_rows_to_read); |
| 732 | limit -= num_rows_to_read; |
| 733 | low_cardinality_state->num_pending_rows -= num_rows_to_read; |
| 734 | } |
| 735 | } |
| 736 | |
| 737 | void DataTypeLowCardinality::serializeBinary(const Field & field, WriteBuffer & ostr) const |
| 738 | { |
| 739 | dictionary_type->serializeBinary(field, ostr); |
| 740 | } |
| 741 | void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr) const |
| 742 | { |
| 743 | dictionary_type->deserializeBinary(field, istr); |
| 744 | } |
| 745 | |
| 746 | void DataTypeLowCardinality::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const |
| 747 | { |
| 748 | serializeImpl(column, row_num, &IDataType::serializeBinary, ostr); |
| 749 | } |
| 750 | void DataTypeLowCardinality::deserializeBinary(IColumn & column, ReadBuffer & istr) const |
| 751 | { |
| 752 | deserializeImpl(column, &IDataType::deserializeBinary, istr); |
| 753 | } |
| 754 | |
| 755 | void DataTypeLowCardinality::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
| 756 | { |
| 757 | serializeImpl(column, row_num, &IDataType::serializeAsTextEscaped, ostr, settings); |
| 758 | } |
| 759 | |
| 760 | void DataTypeLowCardinality::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
| 761 | { |
| 762 | deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings); |
| 763 | } |
| 764 | |
| 765 | void DataTypeLowCardinality::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
| 766 | { |
| 767 | serializeImpl(column, row_num, &IDataType::serializeAsTextQuoted, ostr, settings); |
| 768 | } |
| 769 | |
| 770 | void DataTypeLowCardinality::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
| 771 | { |
| 772 | deserializeImpl(column, &IDataType::deserializeAsTextQuoted, istr, settings); |
| 773 | } |
| 774 | |
| 775 | void DataTypeLowCardinality::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
| 776 | { |
| 777 | deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings); |
| 778 | } |
| 779 | |
| 780 | void DataTypeLowCardinality::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
| 781 | { |
| 782 | serializeImpl(column, row_num, &IDataType::serializeAsTextCSV, ostr, settings); |
| 783 | } |
| 784 | |
| 785 | void DataTypeLowCardinality::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
| 786 | { |
| 787 | deserializeImpl(column, &IDataType::deserializeAsTextCSV, istr, settings); |
| 788 | } |
| 789 | |
| 790 | void DataTypeLowCardinality::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
| 791 | { |
| 792 | serializeImpl(column, row_num, &IDataType::serializeAsText, ostr, settings); |
| 793 | } |
| 794 | |
| 795 | void DataTypeLowCardinality::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
| 796 | { |
| 797 | serializeImpl(column, row_num, &IDataType::serializeAsTextJSON, ostr, settings); |
| 798 | } |
| 799 | void DataTypeLowCardinality::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
| 800 | { |
| 801 | deserializeImpl(column, &IDataType::deserializeAsTextJSON, istr, settings); |
| 802 | } |
| 803 | |
| 804 | void DataTypeLowCardinality::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
| 805 | { |
| 806 | serializeImpl(column, row_num, &IDataType::serializeAsTextXML, ostr, settings); |
| 807 | } |
| 808 | |
| 809 | void DataTypeLowCardinality::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const |
| 810 | { |
| 811 | serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf, value_index); |
| 812 | } |
| 813 | |
| 814 | void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const |
| 815 | { |
| 816 | if (allow_add_row) |
| 817 | { |
| 818 | deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added); |
| 819 | return; |
| 820 | } |
| 821 | |
| 822 | row_added = false; |
| 823 | auto & low_cardinality_column= getColumnLowCardinality(column); |
| 824 | auto nested_column = low_cardinality_column.getDictionary().getNestedColumn(); |
| 825 | auto temp_column = nested_column->cloneEmpty(); |
| 826 | size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1); |
| 827 | temp_column->insertFrom(*nested_column, unique_row_number); |
| 828 | bool dummy; |
| 829 | dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy); |
| 830 | low_cardinality_column.popBack(1); |
| 831 | low_cardinality_column.insertFromFullColumn(*temp_column, 0); |
| 832 | } |
| 833 | |
| 834 | template <typename... Params, typename... Args> |
| 835 | void DataTypeLowCardinality::serializeImpl( |
| 836 | const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const |
| 837 | { |
| 838 | auto & low_cardinality_column = getColumnLowCardinality(column); |
| 839 | size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num); |
| 840 | (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward<Args>(args)...); |
| 841 | } |
| 842 | |
| 843 | template <typename... Params, typename... Args> |
| 844 | void DataTypeLowCardinality::deserializeImpl( |
| 845 | IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr<Params...> func, Args &&... args) const |
| 846 | { |
| 847 | auto & low_cardinality_column= getColumnLowCardinality(column); |
| 848 | auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); |
| 849 | |
| 850 | (dictionary_type.get()->*func)(*temp_column, std::forward<Args>(args)...); |
| 851 | |
| 852 | low_cardinality_column.insertFromFullColumn(*temp_column, 0); |
| 853 | } |
| 854 | |
| 855 | namespace |
| 856 | { |
| 857 | template <typename Creator> |
| 858 | struct CreateColumnVector |
| 859 | { |
| 860 | MutableColumnUniquePtr & column; |
| 861 | const IDataType & keys_type; |
| 862 | const Creator & creator; |
| 863 | |
| 864 | CreateColumnVector(MutableColumnUniquePtr & column_, const IDataType & keys_type_, const Creator & creator_) |
| 865 | : column(column_), keys_type(keys_type_), creator(creator_) |
| 866 | { |
| 867 | } |
| 868 | |
| 869 | template <typename T, size_t> |
| 870 | void operator()() |
| 871 | { |
| 872 | if (typeid_cast<const DataTypeNumber<T> *>(&keys_type)) |
| 873 | column = creator(static_cast<ColumnVector<T> *>(nullptr)); |
| 874 | } |
| 875 | }; |
| 876 | } |
| 877 | |
| 878 | template <typename Creator> |
| 879 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDataType & keys_type, |
| 880 | const Creator & creator) |
| 881 | { |
| 882 | auto * type = &keys_type; |
| 883 | if (auto * nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type)) |
| 884 | type = nullable_type->getNestedType().get(); |
| 885 | |
| 886 | if (isString(type)) |
| 887 | return creator(static_cast<ColumnString *>(nullptr)); |
| 888 | if (isFixedString(type)) |
| 889 | return creator(static_cast<ColumnFixedString *>(nullptr)); |
| 890 | if (typeid_cast<const DataTypeDate *>(type)) |
| 891 | return creator(static_cast<ColumnVector<UInt16> *>(nullptr)); |
| 892 | if (typeid_cast<const DataTypeDateTime *>(type)) |
| 893 | return creator(static_cast<ColumnVector<UInt32> *>(nullptr)); |
| 894 | if (isColumnedAsNumber(type)) |
| 895 | { |
| 896 | MutableColumnUniquePtr column; |
| 897 | TypeListNativeNumbers::forEach(CreateColumnVector(column, *type, creator)); |
| 898 | |
| 899 | if (!column) |
| 900 | throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR); |
| 901 | |
| 902 | return column; |
| 903 | } |
| 904 | |
| 905 | throw Exception("Unexpected dictionary type for DataTypeLowCardinality: " + type->getName(), |
| 906 | ErrorCodes::LOGICAL_ERROR); |
| 907 | } |
| 908 | |
| 909 | |
| 910 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type) |
| 911 | { |
| 912 | auto creator = [&](auto x) |
| 913 | { |
| 914 | using ColumnType = typename std::remove_pointer<decltype(x)>::type; |
| 915 | return ColumnUnique<ColumnType>::create(keys_type); |
| 916 | }; |
| 917 | return createColumnUniqueImpl(keys_type, creator); |
| 918 | } |
| 919 | |
| 920 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys) |
| 921 | { |
| 922 | auto creator = [&](auto x) |
| 923 | { |
| 924 | using ColumnType = typename std::remove_pointer<decltype(x)>::type; |
| 925 | return ColumnUnique<ColumnType>::create(std::move(keys), keys_type.isNullable()); |
| 926 | }; |
| 927 | return createColumnUniqueImpl(keys_type, creator); |
| 928 | } |
| 929 | |
| 930 | MutableColumnPtr DataTypeLowCardinality::createColumn() const |
| 931 | { |
| 932 | MutableColumnPtr indexes = DataTypeUInt8().createColumn(); |
| 933 | MutableColumnPtr dictionary = createColumnUnique(*dictionary_type); |
| 934 | return ColumnLowCardinality::create(std::move(dictionary), std::move(indexes)); |
| 935 | } |
| 936 | |
| 937 | Field DataTypeLowCardinality::getDefault() const |
| 938 | { |
| 939 | return dictionary_type->getDefault(); |
| 940 | } |
| 941 | |
| 942 | bool DataTypeLowCardinality::equals(const IDataType & rhs) const |
| 943 | { |
| 944 | if (typeid(rhs) != typeid(*this)) |
| 945 | return false; |
| 946 | |
| 947 | auto & low_cardinality_rhs= static_cast<const DataTypeLowCardinality &>(rhs); |
| 948 | return dictionary_type->equals(*low_cardinality_rhs.dictionary_type); |
| 949 | } |
| 950 | |
| 951 | |
| 952 | static DataTypePtr create(const String & /*type_name*/, const ASTPtr & arguments) |
| 953 | { |
| 954 | if (!arguments || arguments->children.size() != 1) |
| 955 | throw Exception("LowCardinality data type family must have single argument - type of elements" , |
| 956 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 957 | |
| 958 | return std::make_shared<DataTypeLowCardinality>(DataTypeFactory::instance().get(arguments->children[0])); |
| 959 | } |
| 960 | |
| 961 | void registerDataTypeLowCardinality(DataTypeFactory & factory) |
| 962 | { |
| 963 | factory.registerDataType("LowCardinality" , create); |
| 964 | } |
| 965 | |
| 966 | |
| 967 | DataTypePtr removeLowCardinality(const DataTypePtr & type) |
| 968 | { |
| 969 | if (auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get())) |
| 970 | return low_cardinality_type->getDictionaryType(); |
| 971 | return type; |
| 972 | } |
| 973 | |
| 974 | } |
| 975 | |