| 1 | #include <Columns/ColumnLowCardinality.h> | 
|---|
| 2 | #include <Columns/ColumnUnique.h> | 
|---|
| 3 | #include <Columns/ColumnFixedString.h> | 
|---|
| 4 | #include <Columns/ColumnsCommon.h> | 
|---|
| 5 | #include <Common/HashTable/HashMap.h> | 
|---|
| 6 | #include <Common/typeid_cast.h> | 
|---|
| 7 | #include <Common/assert_cast.h> | 
|---|
| 8 | #include <Core/Field.h> | 
|---|
| 9 | #include <Core/TypeListNumber.h> | 
|---|
| 10 | #include <DataTypes/DataTypeFactory.h> | 
|---|
| 11 | #include <DataTypes/DataTypeLowCardinality.h> | 
|---|
| 12 | #include <DataTypes/DataTypeNullable.h> | 
|---|
| 13 | #include <DataTypes/DataTypeDate.h> | 
|---|
| 14 | #include <DataTypes/DataTypeDateTime.h> | 
|---|
| 15 | #include <Parsers/IAST.h> | 
|---|
| 16 |  | 
|---|
| 17 | namespace DB | 
|---|
| 18 | { | 
|---|
| 19 |  | 
|---|
| 20 | namespace ErrorCodes | 
|---|
| 21 | { | 
|---|
| 22 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; | 
|---|
| 23 | extern const int LOGICAL_ERROR; | 
|---|
| 24 | extern const int ILLEGAL_TYPE_OF_ARGUMENT; | 
|---|
| 25 | } | 
|---|
| 26 |  | 
|---|
| 27 | namespace | 
|---|
| 28 | { | 
|---|
| 29 | const ColumnLowCardinality & getColumnLowCardinality(const IColumn & column) | 
|---|
| 30 | { | 
|---|
| 31 | return typeid_cast<const ColumnLowCardinality &>(column); | 
|---|
| 32 | } | 
|---|
| 33 |  | 
|---|
| 34 | ColumnLowCardinality & getColumnLowCardinality(IColumn & column) | 
|---|
| 35 | { | 
|---|
| 36 | return typeid_cast<ColumnLowCardinality &>(column); | 
|---|
| 37 | } | 
|---|
| 38 | } | 
|---|
| 39 |  | 
|---|
| 40 | DataTypeLowCardinality::DataTypeLowCardinality(DataTypePtr dictionary_type_) | 
|---|
| 41 | : dictionary_type(std::move(dictionary_type_)) | 
|---|
| 42 | { | 
|---|
| 43 | auto inner_type = dictionary_type; | 
|---|
| 44 | if (dictionary_type->isNullable()) | 
|---|
| 45 | inner_type = static_cast<const DataTypeNullable &>(*dictionary_type).getNestedType(); | 
|---|
| 46 |  | 
|---|
| 47 | if (!inner_type->canBeInsideLowCardinality()) | 
|---|
| 48 | throw Exception( "DataTypeLowCardinality is supported only for numbers, strings, Date or DateTime, but got " | 
|---|
| 49 | + dictionary_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); | 
|---|
| 50 | } | 
|---|
| 51 |  | 
|---|
| 52 | void DataTypeLowCardinality::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const | 
|---|
| 53 | { | 
|---|
| 54 | path.push_back(Substream::DictionaryKeys); | 
|---|
| 55 | dictionary_type->enumerateStreams(callback, path); | 
|---|
| 56 | path.back() = Substream::DictionaryIndexes; | 
|---|
| 57 | callback(path); | 
|---|
| 58 | path.pop_back(); | 
|---|
| 59 | } | 
|---|
| 60 |  | 
|---|
| 61 | struct KeysSerializationVersion | 
|---|
| 62 | { | 
|---|
| 63 | enum Value | 
|---|
| 64 | { | 
|---|
| 65 | /// Version is written at the start of <name.dict.bin>. | 
|---|
| 66 | /// Dictionary is written as number N and N keys after them. | 
|---|
| 67 | /// Dictionary can be shared for continuous range of granules, so some marks may point to the same position. | 
|---|
| 68 | /// Shared dictionary is stored in state and is read once. | 
|---|
| 69 | SharedDictionariesWithAdditionalKeys = 1, | 
|---|
| 70 | }; | 
|---|
| 71 |  | 
|---|
| 72 | Value value; | 
|---|
| 73 |  | 
|---|
| 74 | static void checkVersion(UInt64 version) | 
|---|
| 75 | { | 
|---|
| 76 | if (version != SharedDictionariesWithAdditionalKeys) | 
|---|
| 77 | throw Exception( "Invalid version for DataTypeLowCardinality key column.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 78 | } | 
|---|
| 79 |  | 
|---|
| 80 | KeysSerializationVersion(UInt64 version) : value(static_cast<Value>(version)) { checkVersion(version); } | 
|---|
| 81 | }; | 
|---|
| 82 |  | 
|---|
| 83 | /// Version is stored at the start of each granule. It's used to store indexes type and flags. | 
|---|
| 84 | struct IndexesSerializationType | 
|---|
| 85 | { | 
|---|
| 86 | using SerializationType = UInt64; | 
|---|
| 87 | /// Need to read dictionary if it wasn't. | 
|---|
| 88 | static constexpr SerializationType NeedGlobalDictionaryBit = 1u << 8u; | 
|---|
| 89 | /// Need to read additional keys. Additional keys are stored before indexes as value N and N keys after them. | 
|---|
| 90 | static constexpr SerializationType HasAdditionalKeysBit = 1u << 9u; | 
|---|
| 91 | /// Need to update dictionary. It means that previous granule has different dictionary. | 
|---|
| 92 | static constexpr SerializationType NeedUpdateDictionary = 1u << 10u; | 
|---|
| 93 |  | 
|---|
| 94 | enum Type | 
|---|
| 95 | { | 
|---|
| 96 | TUInt8 = 0, | 
|---|
| 97 | TUInt16, | 
|---|
| 98 | TUInt32, | 
|---|
| 99 | TUInt64, | 
|---|
| 100 | }; | 
|---|
| 101 |  | 
|---|
| 102 | Type type; | 
|---|
| 103 | bool has_additional_keys; | 
|---|
| 104 | bool need_global_dictionary; | 
|---|
| 105 | bool need_update_dictionary; | 
|---|
| 106 |  | 
|---|
| 107 | static constexpr SerializationType resetFlags(SerializationType type) | 
|---|
| 108 | { | 
|---|
| 109 | return type & (~(HasAdditionalKeysBit | NeedGlobalDictionaryBit | NeedUpdateDictionary)); | 
|---|
| 110 | } | 
|---|
| 111 |  | 
|---|
| 112 | static void checkType(SerializationType type) | 
|---|
| 113 | { | 
|---|
| 114 | UInt64 value = resetFlags(type); | 
|---|
| 115 | if (value <= TUInt64) | 
|---|
| 116 | return; | 
|---|
| 117 |  | 
|---|
| 118 | throw Exception( "Invalid type for DataTypeLowCardinality index column.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 119 | } | 
|---|
| 120 |  | 
|---|
| 121 | void serialize(WriteBuffer & buffer) const | 
|---|
| 122 | { | 
|---|
| 123 | SerializationType val = type; | 
|---|
| 124 | if (has_additional_keys) | 
|---|
| 125 | val |= HasAdditionalKeysBit; | 
|---|
| 126 | if (need_global_dictionary) | 
|---|
| 127 | val |= NeedGlobalDictionaryBit; | 
|---|
| 128 | if (need_update_dictionary) | 
|---|
| 129 | val |= NeedUpdateDictionary; | 
|---|
| 130 | writeIntBinary(val, buffer); | 
|---|
| 131 | } | 
|---|
| 132 |  | 
|---|
| 133 | void deserialize(ReadBuffer & buffer) | 
|---|
| 134 | { | 
|---|
| 135 | SerializationType val; | 
|---|
| 136 | readIntBinary(val, buffer); | 
|---|
| 137 | checkType(val); | 
|---|
| 138 | has_additional_keys = (val & HasAdditionalKeysBit) != 0; | 
|---|
| 139 | need_global_dictionary = (val & NeedGlobalDictionaryBit) != 0; | 
|---|
| 140 | need_update_dictionary = (val & NeedUpdateDictionary) != 0; | 
|---|
| 141 | type = static_cast<Type>(resetFlags(val)); | 
|---|
| 142 | } | 
|---|
| 143 |  | 
|---|
| 144 | IndexesSerializationType(const IColumn & column, | 
|---|
| 145 | bool has_additional_keys_, | 
|---|
| 146 | bool need_global_dictionary_, | 
|---|
| 147 | bool enumerate_dictionaries) | 
|---|
| 148 | : has_additional_keys(has_additional_keys_) | 
|---|
| 149 | , need_global_dictionary(need_global_dictionary_) | 
|---|
| 150 | , need_update_dictionary(enumerate_dictionaries) | 
|---|
| 151 | { | 
|---|
| 152 | if (typeid_cast<const ColumnUInt8 *>(&column)) | 
|---|
| 153 | type = TUInt8; | 
|---|
| 154 | else if (typeid_cast<const ColumnUInt16 *>(&column)) | 
|---|
| 155 | type = TUInt16; | 
|---|
| 156 | else if (typeid_cast<const ColumnUInt32 *>(&column)) | 
|---|
| 157 | type = TUInt32; | 
|---|
| 158 | else if (typeid_cast<const ColumnUInt64 *>(&column)) | 
|---|
| 159 | type = TUInt64; | 
|---|
| 160 | else | 
|---|
| 161 | throw Exception( "Invalid Indexes column for IndexesSerializationType. Expected ColumnUInt*, got " | 
|---|
| 162 | + column.getName(), ErrorCodes::LOGICAL_ERROR); | 
|---|
| 163 | } | 
|---|
| 164 |  | 
|---|
| 165 | DataTypePtr getDataType() const | 
|---|
| 166 | { | 
|---|
| 167 | if (type == TUInt8) | 
|---|
| 168 | return std::make_shared<DataTypeUInt8>(); | 
|---|
| 169 | if (type == TUInt16) | 
|---|
| 170 | return std::make_shared<DataTypeUInt16>(); | 
|---|
| 171 | if (type == TUInt32) | 
|---|
| 172 | return std::make_shared<DataTypeUInt32>(); | 
|---|
| 173 | if (type == TUInt64) | 
|---|
| 174 | return std::make_shared<DataTypeUInt64>(); | 
|---|
| 175 |  | 
|---|
| 176 | throw Exception( "Can't create DataType from IndexesSerializationType.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 177 | } | 
|---|
| 178 |  | 
|---|
| 179 | IndexesSerializationType() = default; | 
|---|
| 180 | }; | 
|---|
| 181 |  | 
|---|
| 182 | struct SerializeStateLowCardinality : public IDataType::SerializeBinaryBulkState | 
|---|
| 183 | { | 
|---|
| 184 | KeysSerializationVersion key_version; | 
|---|
| 185 | MutableColumnUniquePtr shared_dictionary; | 
|---|
| 186 |  | 
|---|
| 187 | explicit SerializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} | 
|---|
| 188 | }; | 
|---|
| 189 |  | 
|---|
| 190 | struct DeserializeStateLowCardinality : public IDataType::DeserializeBinaryBulkState | 
|---|
| 191 | { | 
|---|
| 192 | KeysSerializationVersion key_version; | 
|---|
| 193 | ColumnUniquePtr global_dictionary; | 
|---|
| 194 |  | 
|---|
| 195 | IndexesSerializationType index_type; | 
|---|
| 196 | ColumnPtr additional_keys; | 
|---|
| 197 | ColumnPtr null_map; | 
|---|
| 198 | UInt64 num_pending_rows = 0; | 
|---|
| 199 |  | 
|---|
| 200 | /// If dictionary should be updated. | 
|---|
| 201 | /// Can happen is some granules was skipped while reading from MergeTree. | 
|---|
| 202 | /// We should store this flag in State because | 
|---|
| 203 | ///   in case of long block of empty arrays we may not need read dictionary at first reading. | 
|---|
| 204 | bool need_update_dictionary = false; | 
|---|
| 205 |  | 
|---|
| 206 | explicit DeserializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} | 
|---|
| 207 | }; | 
|---|
| 208 |  | 
|---|
| 209 | static SerializeStateLowCardinality * checkAndGetLowCardinalitySerializeState( | 
|---|
| 210 | IDataType::SerializeBinaryBulkStatePtr & state) | 
|---|
| 211 | { | 
|---|
| 212 | if (!state) | 
|---|
| 213 | throw Exception( "Got empty state for DataTypeLowCardinality.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 214 |  | 
|---|
| 215 | auto * low_cardinality_state = typeid_cast<SerializeStateLowCardinality *>(state.get()); | 
|---|
| 216 | if (!low_cardinality_state) | 
|---|
| 217 | { | 
|---|
| 218 | auto & state_ref = *state; | 
|---|
| 219 | throw Exception( "Invalid SerializeBinaryBulkState for DataTypeLowCardinality. Expected: " | 
|---|
| 220 | + demangle(typeid(SerializeStateLowCardinality).name()) + ", got " | 
|---|
| 221 | + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); | 
|---|
| 222 | } | 
|---|
| 223 |  | 
|---|
| 224 | return low_cardinality_state; | 
|---|
| 225 | } | 
|---|
| 226 |  | 
|---|
| 227 | static DeserializeStateLowCardinality * checkAndGetLowCardinalityDeserializeState( | 
|---|
| 228 | IDataType::DeserializeBinaryBulkStatePtr & state) | 
|---|
| 229 | { | 
|---|
| 230 | if (!state) | 
|---|
| 231 | throw Exception( "Got empty state for DataTypeLowCardinality.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 232 |  | 
|---|
| 233 | auto * low_cardinality_state = typeid_cast<DeserializeStateLowCardinality *>(state.get()); | 
|---|
| 234 | if (!low_cardinality_state) | 
|---|
| 235 | { | 
|---|
| 236 | auto & state_ref = *state; | 
|---|
| 237 | throw Exception( "Invalid DeserializeBinaryBulkState for DataTypeLowCardinality. Expected: " | 
|---|
| 238 | + demangle(typeid(DeserializeStateLowCardinality).name()) + ", got " | 
|---|
| 239 | + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); | 
|---|
| 240 | } | 
|---|
| 241 |  | 
|---|
| 242 | return low_cardinality_state; | 
|---|
| 243 | } | 
|---|
| 244 |  | 
|---|
| 245 | void DataTypeLowCardinality::serializeBinaryBulkStatePrefix( | 
|---|
| 246 | SerializeBinaryBulkSettings & settings, | 
|---|
| 247 | SerializeBinaryBulkStatePtr & state) const | 
|---|
| 248 | { | 
|---|
| 249 | settings.path.push_back(Substream::DictionaryKeys); | 
|---|
| 250 | auto * stream = settings.getter(settings.path); | 
|---|
| 251 | settings.path.pop_back(); | 
|---|
| 252 |  | 
|---|
| 253 | if (!stream) | 
|---|
| 254 | throw Exception( "Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStatePrefix", | 
|---|
| 255 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 256 |  | 
|---|
| 257 | /// Write version and create SerializeBinaryBulkState. | 
|---|
| 258 | UInt64 key_version = KeysSerializationVersion::SharedDictionariesWithAdditionalKeys; | 
|---|
| 259 |  | 
|---|
| 260 | writeIntBinary(key_version, *stream); | 
|---|
| 261 |  | 
|---|
| 262 | state = std::make_shared<SerializeStateLowCardinality>(key_version); | 
|---|
| 263 | } | 
|---|
| 264 |  | 
|---|
| 265 | void DataTypeLowCardinality::serializeBinaryBulkStateSuffix( | 
|---|
| 266 | SerializeBinaryBulkSettings & settings, | 
|---|
| 267 | SerializeBinaryBulkStatePtr & state) const | 
|---|
| 268 | { | 
|---|
| 269 | auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); | 
|---|
| 270 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); | 
|---|
| 271 |  | 
|---|
| 272 | if (low_cardinality_state->shared_dictionary && settings.low_cardinality_max_dictionary_size) | 
|---|
| 273 | { | 
|---|
| 274 | auto nested_column = low_cardinality_state->shared_dictionary->getNestedNotNullableColumn(); | 
|---|
| 275 |  | 
|---|
| 276 | settings.path.push_back(Substream::DictionaryKeys); | 
|---|
| 277 | auto * stream = settings.getter(settings.path); | 
|---|
| 278 | settings.path.pop_back(); | 
|---|
| 279 |  | 
|---|
| 280 | if (!stream) | 
|---|
| 281 | throw Exception( "Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStateSuffix", | 
|---|
| 282 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 283 |  | 
|---|
| 284 | UInt64 num_keys = nested_column->size(); | 
|---|
| 285 | writeIntBinary(num_keys, *stream); | 
|---|
| 286 | removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *stream, 0, num_keys); | 
|---|
| 287 | low_cardinality_state->shared_dictionary = nullptr; | 
|---|
| 288 | } | 
|---|
| 289 | } | 
|---|
| 290 |  | 
|---|
| 291 | void DataTypeLowCardinality::deserializeBinaryBulkStatePrefix( | 
|---|
| 292 | DeserializeBinaryBulkSettings & settings, | 
|---|
| 293 | DeserializeBinaryBulkStatePtr & state) const | 
|---|
| 294 | { | 
|---|
| 295 | settings.path.push_back(Substream::DictionaryKeys); | 
|---|
| 296 | auto * stream = settings.getter(settings.path); | 
|---|
| 297 | settings.path.pop_back(); | 
|---|
| 298 |  | 
|---|
| 299 | if (!stream) | 
|---|
| 300 | return; | 
|---|
| 301 |  | 
|---|
| 302 | UInt64 keys_version; | 
|---|
| 303 | readIntBinary(keys_version, *stream); | 
|---|
| 304 |  | 
|---|
| 305 | state = std::make_shared<DeserializeStateLowCardinality>(keys_version); | 
|---|
| 306 | } | 
|---|
| 307 |  | 
|---|
| 308 | namespace | 
|---|
| 309 | { | 
|---|
| 310 | template <typename T> | 
|---|
| 311 | PaddedPODArray<T> * getIndexesData(IColumn & indexes) | 
|---|
| 312 | { | 
|---|
| 313 | auto * column = typeid_cast<ColumnVector<T> *>(&indexes); | 
|---|
| 314 | if (column) | 
|---|
| 315 | return &column->getData(); | 
|---|
| 316 |  | 
|---|
| 317 | return nullptr; | 
|---|
| 318 | } | 
|---|
| 319 |  | 
|---|
| 320 | struct IndexMapsWithAdditionalKeys | 
|---|
| 321 | { | 
|---|
| 322 | MutableColumnPtr dictionary_map; | 
|---|
| 323 | MutableColumnPtr additional_keys_map; | 
|---|
| 324 | }; | 
|---|
| 325 |  | 
|---|
| 326 | template <typename T> | 
|---|
| 327 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeysRef(PaddedPODArray<T> & index, size_t dict_size) | 
|---|
| 328 | { | 
|---|
| 329 | PaddedPODArray<T> copy(index.cbegin(), index.cend()); | 
|---|
| 330 |  | 
|---|
| 331 | HashMap<T, T> dict_map; | 
|---|
| 332 | HashMap<T, T> add_keys_map; | 
|---|
| 333 |  | 
|---|
| 334 | for (auto val : index) | 
|---|
| 335 | { | 
|---|
| 336 | if (val < dict_size) | 
|---|
| 337 | dict_map.insert({val, dict_map.size()}); | 
|---|
| 338 | else | 
|---|
| 339 | add_keys_map.insert({val, add_keys_map.size()}); | 
|---|
| 340 | } | 
|---|
| 341 |  | 
|---|
| 342 | auto dictionary_map = ColumnVector<T>::create(dict_map.size()); | 
|---|
| 343 | auto additional_keys_map = ColumnVector<T>::create(add_keys_map.size()); | 
|---|
| 344 | auto & dict_data = dictionary_map->getData(); | 
|---|
| 345 | auto & add_keys_data = additional_keys_map->getData(); | 
|---|
| 346 |  | 
|---|
| 347 | for (auto val : dict_map) | 
|---|
| 348 | dict_data[val.second] = val.first; | 
|---|
| 349 |  | 
|---|
| 350 | for (auto val : add_keys_map) | 
|---|
| 351 | add_keys_data[val.second] = val.first - dict_size; | 
|---|
| 352 |  | 
|---|
| 353 | for (auto & val : index) | 
|---|
| 354 | val = val < dict_size ? dict_map[val] | 
|---|
| 355 | : add_keys_map[val] + dict_map.size(); | 
|---|
| 356 |  | 
|---|
| 357 | for (size_t i = 0; i < index.size(); ++i) | 
|---|
| 358 | { | 
|---|
| 359 | T expected = index[i] < dict_data.size() ? dict_data[index[i]] | 
|---|
| 360 | : add_keys_data[index[i] - dict_data.size()] + dict_size; | 
|---|
| 361 | if (expected != copy[i]) | 
|---|
| 362 | throw Exception( "Expected "+ toString(expected) + ", but got "+ toString(copy[i]), ErrorCodes::LOGICAL_ERROR); | 
|---|
| 363 |  | 
|---|
| 364 | } | 
|---|
| 365 |  | 
|---|
| 366 | return {std::move(dictionary_map), std::move(additional_keys_map)}; | 
|---|
| 367 | } | 
|---|
| 368 |  | 
|---|
| 369 | template <typename T> | 
|---|
| 370 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(PaddedPODArray<T> & index, size_t dict_size) | 
|---|
| 371 | { | 
|---|
| 372 | T max_less_dict_size = 0; | 
|---|
| 373 | T max_value = 0; | 
|---|
| 374 |  | 
|---|
| 375 | auto size = index.size(); | 
|---|
| 376 | if (size == 0) | 
|---|
| 377 | return {ColumnVector<T>::create(), ColumnVector<T>::create()}; | 
|---|
| 378 |  | 
|---|
| 379 | for (size_t i = 0; i < size; ++i) | 
|---|
| 380 | { | 
|---|
| 381 | auto val = index[i]; | 
|---|
| 382 | if (val < dict_size) | 
|---|
| 383 | max_less_dict_size = std::max(max_less_dict_size, val); | 
|---|
| 384 |  | 
|---|
| 385 | max_value = std::max(max_value, val); | 
|---|
| 386 | } | 
|---|
| 387 |  | 
|---|
| 388 | auto map_size = UInt64(max_less_dict_size) + 1; | 
|---|
| 389 | auto overflow_map_size = max_value >= dict_size ? (UInt64(max_value - dict_size) + 1) : 0; | 
|---|
| 390 | PaddedPODArray<T> map(map_size, 0); | 
|---|
| 391 | PaddedPODArray<T> overflow_map(overflow_map_size, 0); | 
|---|
| 392 |  | 
|---|
| 393 | T zero_pos_value = 0; | 
|---|
| 394 | T zero_pos_overflowed_value = 0; | 
|---|
| 395 | UInt64 cur_pos = 0; | 
|---|
| 396 | UInt64 cur_overflowed_pos = 0; | 
|---|
| 397 |  | 
|---|
| 398 | for (size_t i = 0; i < size; ++i) | 
|---|
| 399 | { | 
|---|
| 400 | T val = index[i]; | 
|---|
| 401 | if (val < dict_size) | 
|---|
| 402 | { | 
|---|
| 403 | if (cur_pos == 0) | 
|---|
| 404 | { | 
|---|
| 405 | zero_pos_value = val; | 
|---|
| 406 | ++cur_pos; | 
|---|
| 407 | } | 
|---|
| 408 | else if (map[val] == 0 && val != zero_pos_value) | 
|---|
| 409 | { | 
|---|
| 410 | map[val] = cur_pos; | 
|---|
| 411 | ++cur_pos; | 
|---|
| 412 | } | 
|---|
| 413 | } | 
|---|
| 414 | else | 
|---|
| 415 | { | 
|---|
| 416 | T shifted_val = val - dict_size; | 
|---|
| 417 | if (cur_overflowed_pos == 0) | 
|---|
| 418 | { | 
|---|
| 419 | zero_pos_overflowed_value = shifted_val; | 
|---|
| 420 | ++cur_overflowed_pos; | 
|---|
| 421 | } | 
|---|
| 422 | else if (overflow_map[shifted_val] == 0 && shifted_val != zero_pos_overflowed_value) | 
|---|
| 423 | { | 
|---|
| 424 | overflow_map[shifted_val] = cur_overflowed_pos; | 
|---|
| 425 | ++cur_overflowed_pos; | 
|---|
| 426 | } | 
|---|
| 427 | } | 
|---|
| 428 | } | 
|---|
| 429 |  | 
|---|
| 430 | auto dictionary_map = ColumnVector<T>::create(cur_pos); | 
|---|
| 431 | auto additional_keys_map = ColumnVector<T>::create(cur_overflowed_pos); | 
|---|
| 432 | auto & dict_data = dictionary_map->getData(); | 
|---|
| 433 | auto & add_keys_data = additional_keys_map->getData(); | 
|---|
| 434 |  | 
|---|
| 435 | for (size_t i = 0; i < map_size; ++i) | 
|---|
| 436 | if (map[i]) | 
|---|
| 437 | dict_data[map[i]] = static_cast<T>(i); | 
|---|
| 438 |  | 
|---|
| 439 | for (size_t i = 0; i < overflow_map_size; ++i) | 
|---|
| 440 | if (overflow_map[i]) | 
|---|
| 441 | add_keys_data[overflow_map[i]] = static_cast<T>(i); | 
|---|
| 442 |  | 
|---|
| 443 | if (cur_pos) | 
|---|
| 444 | dict_data[0] = zero_pos_value; | 
|---|
| 445 | if (cur_overflowed_pos) | 
|---|
| 446 | add_keys_data[0] = zero_pos_overflowed_value; | 
|---|
| 447 |  | 
|---|
| 448 | for (size_t i = 0; i < size; ++i) | 
|---|
| 449 | { | 
|---|
| 450 | T & val = index[i]; | 
|---|
| 451 | if (val < dict_size) | 
|---|
| 452 | val = map[val]; | 
|---|
| 453 | else | 
|---|
| 454 | val = overflow_map[val - dict_size] + cur_pos; | 
|---|
| 455 | } | 
|---|
| 456 |  | 
|---|
| 457 | return {std::move(dictionary_map), std::move(additional_keys_map)}; | 
|---|
| 458 | } | 
|---|
| 459 |  | 
|---|
| 460 | /// Update column and return map with old indexes. | 
|---|
| 461 | /// Let N is the number of distinct values which are less than max_size; | 
|---|
| 462 | ///     old_column - column before function call; | 
|---|
| 463 | ///     new_column - column after function call: | 
|---|
| 464 | /// * if old_column[i] < max_size, than | 
|---|
| 465 | ///       dictionary_map[new_column[i]] = old_column[i] | 
|---|
| 466 | /// * else | 
|---|
| 467 | ///       additional_keys_map[new_column[i]] = old_column[i] - dict_size + N | 
|---|
| 468 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(IColumn & column, size_t dict_size) | 
|---|
| 469 | { | 
|---|
| 470 | if (auto * data_uint8 = getIndexesData<UInt8>(column)) | 
|---|
| 471 | return mapIndexWithAdditionalKeys(*data_uint8, dict_size); | 
|---|
| 472 | else if (auto * data_uint16 = getIndexesData<UInt16>(column)) | 
|---|
| 473 | return mapIndexWithAdditionalKeys(*data_uint16, dict_size); | 
|---|
| 474 | else if (auto * data_uint32 = getIndexesData<UInt32>(column)) | 
|---|
| 475 | return mapIndexWithAdditionalKeys(*data_uint32, dict_size); | 
|---|
| 476 | else if (auto * data_uint64 = getIndexesData<UInt64>(column)) | 
|---|
| 477 | return mapIndexWithAdditionalKeys(*data_uint64, dict_size); | 
|---|
| 478 | else | 
|---|
| 479 | throw Exception( "Indexes column for mapIndexWithAdditionalKeys must be UInt, got"+ column.getName(), | 
|---|
| 480 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 481 | } | 
|---|
| 482 | } | 
|---|
| 483 |  | 
|---|
| 484 | void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams( | 
|---|
| 485 | const IColumn & column, | 
|---|
| 486 | size_t offset, | 
|---|
| 487 | size_t limit, | 
|---|
| 488 | SerializeBinaryBulkSettings & settings, | 
|---|
| 489 | SerializeBinaryBulkStatePtr & state) const | 
|---|
| 490 | { | 
|---|
| 491 | settings.path.push_back(Substream::DictionaryKeys); | 
|---|
| 492 | auto * keys_stream = settings.getter(settings.path); | 
|---|
| 493 | settings.path.back() = Substream::DictionaryIndexes; | 
|---|
| 494 | auto * indexes_stream = settings.getter(settings.path); | 
|---|
| 495 | settings.path.pop_back(); | 
|---|
| 496 |  | 
|---|
| 497 | if (!keys_stream && !indexes_stream) | 
|---|
| 498 | return; | 
|---|
| 499 |  | 
|---|
| 500 | if (!keys_stream) | 
|---|
| 501 | throw Exception( "Got empty stream for DataTypeLowCardinality keys.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 502 |  | 
|---|
| 503 | if (!indexes_stream) | 
|---|
| 504 | throw Exception( "Got empty stream for DataTypeLowCardinality indexes.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 505 |  | 
|---|
| 506 | const ColumnLowCardinality & low_cardinality_column = typeid_cast<const ColumnLowCardinality &>(column); | 
|---|
| 507 |  | 
|---|
| 508 | auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); | 
|---|
| 509 | auto & global_dictionary = low_cardinality_state->shared_dictionary; | 
|---|
| 510 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); | 
|---|
| 511 |  | 
|---|
| 512 | bool need_update_dictionary = global_dictionary == nullptr; | 
|---|
| 513 | if (need_update_dictionary) | 
|---|
| 514 | global_dictionary = createColumnUnique(*dictionary_type); | 
|---|
| 515 |  | 
|---|
| 516 | size_t max_limit = column.size() - offset; | 
|---|
| 517 | limit = limit ? std::min(limit, max_limit) : max_limit; | 
|---|
| 518 |  | 
|---|
| 519 | /// Do not write anything for empty column. (May happen while writing empty arrays.) | 
|---|
| 520 | if (limit == 0) | 
|---|
| 521 | return; | 
|---|
| 522 |  | 
|---|
| 523 | auto sub_column = low_cardinality_column.cutAndCompact(offset, limit); | 
|---|
| 524 | ColumnPtr positions = sub_column->getIndexesPtr(); | 
|---|
| 525 | ColumnPtr keys = sub_column->getDictionary().getNestedColumn(); | 
|---|
| 526 |  | 
|---|
| 527 | if (settings.low_cardinality_max_dictionary_size) | 
|---|
| 528 | { | 
|---|
| 529 | /// Insert used_keys into global dictionary and update sub_index. | 
|---|
| 530 | auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*keys, 0, keys->size(), | 
|---|
| 531 | settings.low_cardinality_max_dictionary_size); | 
|---|
| 532 | size_t max_size = settings.low_cardinality_max_dictionary_size + indexes_with_overflow.overflowed_keys->size(); | 
|---|
| 533 | ColumnLowCardinality::Index(indexes_with_overflow.indexes->getPtr()).check(max_size); | 
|---|
| 534 |  | 
|---|
| 535 | if (global_dictionary->size() > settings.low_cardinality_max_dictionary_size) | 
|---|
| 536 | throw Exception( "Got dictionary with size "+ toString(global_dictionary->size()) + | 
|---|
| 537 | " but max dictionary size is "+ toString(settings.low_cardinality_max_dictionary_size), | 
|---|
| 538 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 539 |  | 
|---|
| 540 | positions = indexes_with_overflow.indexes->index(*positions, 0); | 
|---|
| 541 | keys = std::move(indexes_with_overflow.overflowed_keys); | 
|---|
| 542 |  | 
|---|
| 543 | if (global_dictionary->size() < settings.low_cardinality_max_dictionary_size && !keys->empty()) | 
|---|
| 544 | throw Exception( "Has additional keys, but dict size is "+ toString(global_dictionary->size()) + | 
|---|
| 545 | " which is less then max dictionary size ("+ toString(settings.low_cardinality_max_dictionary_size) + ")", | 
|---|
| 546 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 547 | } | 
|---|
| 548 |  | 
|---|
| 549 | if (auto * nullable_keys = checkAndGetColumn<ColumnNullable>(*keys)) | 
|---|
| 550 | keys = nullable_keys->getNestedColumnPtr(); | 
|---|
| 551 |  | 
|---|
| 552 | bool need_additional_keys = !keys->empty(); | 
|---|
| 553 | bool need_dictionary = settings.low_cardinality_max_dictionary_size != 0; | 
|---|
| 554 | bool need_write_dictionary = !settings.low_cardinality_use_single_dictionary_for_part | 
|---|
| 555 | && global_dictionary->size() >= settings.low_cardinality_max_dictionary_size; | 
|---|
| 556 |  | 
|---|
| 557 | IndexesSerializationType index_version(*positions, need_additional_keys, need_dictionary, need_update_dictionary); | 
|---|
| 558 | index_version.serialize(*indexes_stream); | 
|---|
| 559 |  | 
|---|
| 560 | if (need_write_dictionary) | 
|---|
| 561 | { | 
|---|
| 562 | const auto & nested_column = global_dictionary->getNestedNotNullableColumn(); | 
|---|
| 563 | UInt64 num_keys = nested_column->size(); | 
|---|
| 564 | writeIntBinary(num_keys, *keys_stream); | 
|---|
| 565 | removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *keys_stream, 0, num_keys); | 
|---|
| 566 | low_cardinality_state->shared_dictionary = nullptr; | 
|---|
| 567 | } | 
|---|
| 568 |  | 
|---|
| 569 | if (need_additional_keys) | 
|---|
| 570 | { | 
|---|
| 571 | UInt64 num_keys = keys->size(); | 
|---|
| 572 | writeIntBinary(num_keys, *indexes_stream); | 
|---|
| 573 | removeNullable(dictionary_type)->serializeBinaryBulk(*keys, *indexes_stream, 0, num_keys); | 
|---|
| 574 | } | 
|---|
| 575 |  | 
|---|
| 576 | UInt64 num_rows = positions->size(); | 
|---|
| 577 | writeIntBinary(num_rows, *indexes_stream); | 
|---|
| 578 | index_version.getDataType()->serializeBinaryBulk(*positions, *indexes_stream, 0, num_rows); | 
|---|
| 579 | } | 
|---|
| 580 |  | 
|---|
| 581 | void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams( | 
|---|
| 582 | IColumn & column, | 
|---|
| 583 | size_t limit, | 
|---|
| 584 | DeserializeBinaryBulkSettings & settings, | 
|---|
| 585 | DeserializeBinaryBulkStatePtr & state) const | 
|---|
| 586 | { | 
|---|
| 587 | ColumnLowCardinality & low_cardinality_column = typeid_cast<ColumnLowCardinality &>(column); | 
|---|
| 588 |  | 
|---|
| 589 | settings.path.push_back(Substream::DictionaryKeys); | 
|---|
| 590 | auto * keys_stream = settings.getter(settings.path); | 
|---|
| 591 | settings.path.back() = Substream::DictionaryIndexes; | 
|---|
| 592 | auto * indexes_stream = settings.getter(settings.path); | 
|---|
| 593 | settings.path.pop_back(); | 
|---|
| 594 |  | 
|---|
| 595 | if (!keys_stream && !indexes_stream) | 
|---|
| 596 | return; | 
|---|
| 597 |  | 
|---|
| 598 | if (!keys_stream) | 
|---|
| 599 | throw Exception( "Got empty stream for DataTypeLowCardinality keys.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 600 |  | 
|---|
| 601 | if (!indexes_stream) | 
|---|
| 602 | throw Exception( "Got empty stream for DataTypeLowCardinality indexes.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 603 |  | 
|---|
| 604 | auto * low_cardinality_state = checkAndGetLowCardinalityDeserializeState(state); | 
|---|
| 605 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); | 
|---|
| 606 |  | 
|---|
| 607 | auto readDictionary = [this, low_cardinality_state, keys_stream]() | 
|---|
| 608 | { | 
|---|
| 609 | UInt64 num_keys; | 
|---|
| 610 | readIntBinary(num_keys, *keys_stream); | 
|---|
| 611 |  | 
|---|
| 612 | auto keys_type = removeNullable(dictionary_type); | 
|---|
| 613 | auto global_dict_keys = keys_type->createColumn(); | 
|---|
| 614 | keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0); | 
|---|
| 615 |  | 
|---|
| 616 | auto column_unique = createColumnUnique(*dictionary_type, std::move(global_dict_keys)); | 
|---|
| 617 | low_cardinality_state->global_dictionary = std::move(column_unique); | 
|---|
| 618 | }; | 
|---|
| 619 |  | 
|---|
| 620 | auto readAdditionalKeys = [this, low_cardinality_state, indexes_stream]() | 
|---|
| 621 | { | 
|---|
| 622 | UInt64 num_keys; | 
|---|
| 623 | readIntBinary(num_keys, *indexes_stream); | 
|---|
| 624 | auto keys_type = removeNullable(dictionary_type); | 
|---|
| 625 | auto additional_keys = keys_type->createColumn(); | 
|---|
| 626 | keys_type->deserializeBinaryBulk(*additional_keys, *indexes_stream, num_keys, 0); | 
|---|
| 627 | low_cardinality_state->additional_keys = std::move(additional_keys); | 
|---|
| 628 |  | 
|---|
| 629 | if (!low_cardinality_state->index_type.need_global_dictionary && dictionary_type->isNullable()) | 
|---|
| 630 | { | 
|---|
| 631 | auto null_map = ColumnUInt8::create(num_keys, 0); | 
|---|
| 632 | if (num_keys) | 
|---|
| 633 | null_map->getElement(0) = 1; | 
|---|
| 634 |  | 
|---|
| 635 | low_cardinality_state->null_map = std::move(null_map); | 
|---|
| 636 | } | 
|---|
| 637 | }; | 
|---|
| 638 |  | 
|---|
| 639 | auto readIndexes = [this, low_cardinality_state, indexes_stream, &low_cardinality_column](UInt64 num_rows) | 
|---|
| 640 | { | 
|---|
| 641 | auto indexes_type = low_cardinality_state->index_type.getDataType(); | 
|---|
| 642 | MutableColumnPtr indexes_column = indexes_type->createColumn(); | 
|---|
| 643 | indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0); | 
|---|
| 644 |  | 
|---|
| 645 | auto & global_dictionary = low_cardinality_state->global_dictionary; | 
|---|
| 646 | const auto & additional_keys = low_cardinality_state->additional_keys; | 
|---|
| 647 |  | 
|---|
| 648 | bool has_additional_keys = low_cardinality_state->index_type.has_additional_keys; | 
|---|
| 649 | bool column_is_empty = low_cardinality_column.empty(); | 
|---|
| 650 |  | 
|---|
| 651 | if (!low_cardinality_state->index_type.need_global_dictionary) | 
|---|
| 652 | { | 
|---|
| 653 | ColumnPtr keys_column = additional_keys; | 
|---|
| 654 | if (low_cardinality_state->null_map) | 
|---|
| 655 | keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map); | 
|---|
| 656 | low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*keys_column, *indexes_column); | 
|---|
| 657 | } | 
|---|
| 658 | else if (!has_additional_keys) | 
|---|
| 659 | { | 
|---|
| 660 | if (column_is_empty) | 
|---|
| 661 | low_cardinality_column.setSharedDictionary(global_dictionary); | 
|---|
| 662 |  | 
|---|
| 663 | auto local_column = ColumnLowCardinality::create(global_dictionary, std::move(indexes_column)); | 
|---|
| 664 | low_cardinality_column.insertRangeFrom(*local_column, 0, num_rows); | 
|---|
| 665 | } | 
|---|
| 666 | else | 
|---|
| 667 | { | 
|---|
| 668 | auto maps = mapIndexWithAdditionalKeys(*indexes_column, global_dictionary->size()); | 
|---|
| 669 |  | 
|---|
| 670 | ColumnLowCardinality::Index(maps.additional_keys_map->getPtr()).check(additional_keys->size()); | 
|---|
| 671 |  | 
|---|
| 672 | ColumnLowCardinality::Index(indexes_column->getPtr()).check( | 
|---|
| 673 | maps.dictionary_map->size() + maps.additional_keys_map->size()); | 
|---|
| 674 |  | 
|---|
| 675 | auto used_keys = (*std::move(global_dictionary->getNestedColumn()->index(*maps.dictionary_map, 0))).mutate(); | 
|---|
| 676 |  | 
|---|
| 677 | if (!maps.additional_keys_map->empty()) | 
|---|
| 678 | { | 
|---|
| 679 | auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0); | 
|---|
| 680 |  | 
|---|
| 681 | if (dictionary_type->isNullable()) | 
|---|
| 682 | { | 
|---|
| 683 | ColumnPtr null_map = ColumnUInt8::create(used_add_keys->size(), 0); | 
|---|
| 684 | used_add_keys = ColumnNullable::create(used_add_keys, null_map); | 
|---|
| 685 | } | 
|---|
| 686 |  | 
|---|
| 687 | used_keys->insertRangeFrom(*used_add_keys, 0, used_add_keys->size()); | 
|---|
| 688 | } | 
|---|
| 689 |  | 
|---|
| 690 | low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*used_keys, *indexes_column); | 
|---|
| 691 | } | 
|---|
| 692 | }; | 
|---|
| 693 |  | 
|---|
| 694 | if (!settings.continuous_reading) | 
|---|
| 695 | { | 
|---|
| 696 | low_cardinality_state->num_pending_rows = 0; | 
|---|
| 697 |  | 
|---|
| 698 | /// Remember in state that some granules were skipped and we need to update dictionary. | 
|---|
| 699 | low_cardinality_state->need_update_dictionary = true; | 
|---|
| 700 | } | 
|---|
| 701 |  | 
|---|
| 702 | while (limit) | 
|---|
| 703 | { | 
|---|
| 704 | if (low_cardinality_state->num_pending_rows == 0) | 
|---|
| 705 | { | 
|---|
| 706 | if (indexes_stream->eof()) | 
|---|
| 707 | break; | 
|---|
| 708 |  | 
|---|
| 709 | auto & index_type = low_cardinality_state->index_type; | 
|---|
| 710 | auto & global_dictionary = low_cardinality_state->global_dictionary; | 
|---|
| 711 |  | 
|---|
| 712 | index_type.deserialize(*indexes_stream); | 
|---|
| 713 |  | 
|---|
| 714 | bool need_update_dictionary = | 
|---|
| 715 | !global_dictionary || index_type.need_update_dictionary || low_cardinality_state->need_update_dictionary; | 
|---|
| 716 | if (index_type.need_global_dictionary && need_update_dictionary) | 
|---|
| 717 | { | 
|---|
| 718 | readDictionary(); | 
|---|
| 719 | low_cardinality_state->need_update_dictionary = false; | 
|---|
| 720 | } | 
|---|
| 721 |  | 
|---|
| 722 | if (low_cardinality_state->index_type.has_additional_keys) | 
|---|
| 723 | readAdditionalKeys(); | 
|---|
| 724 | else | 
|---|
| 725 | low_cardinality_state->additional_keys = nullptr; | 
|---|
| 726 |  | 
|---|
| 727 | readIntBinary(low_cardinality_state->num_pending_rows, *indexes_stream); | 
|---|
| 728 | } | 
|---|
| 729 |  | 
|---|
| 730 | size_t num_rows_to_read = std::min<UInt64>(limit, low_cardinality_state->num_pending_rows); | 
|---|
| 731 | readIndexes(num_rows_to_read); | 
|---|
| 732 | limit -= num_rows_to_read; | 
|---|
| 733 | low_cardinality_state->num_pending_rows -= num_rows_to_read; | 
|---|
| 734 | } | 
|---|
| 735 | } | 
|---|
| 736 |  | 
|---|
| 737 | void DataTypeLowCardinality::serializeBinary(const Field & field, WriteBuffer & ostr) const | 
|---|
| 738 | { | 
|---|
| 739 | dictionary_type->serializeBinary(field, ostr); | 
|---|
| 740 | } | 
|---|
| 741 | void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr) const | 
|---|
| 742 | { | 
|---|
| 743 | dictionary_type->deserializeBinary(field, istr); | 
|---|
| 744 | } | 
|---|
| 745 |  | 
|---|
| 746 | void DataTypeLowCardinality::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const | 
|---|
| 747 | { | 
|---|
| 748 | serializeImpl(column, row_num, &IDataType::serializeBinary, ostr); | 
|---|
| 749 | } | 
|---|
| 750 | void DataTypeLowCardinality::deserializeBinary(IColumn & column, ReadBuffer & istr) const | 
|---|
| 751 | { | 
|---|
| 752 | deserializeImpl(column, &IDataType::deserializeBinary, istr); | 
|---|
| 753 | } | 
|---|
| 754 |  | 
|---|
| 755 | void DataTypeLowCardinality::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const | 
|---|
| 756 | { | 
|---|
| 757 | serializeImpl(column, row_num, &IDataType::serializeAsTextEscaped, ostr, settings); | 
|---|
| 758 | } | 
|---|
| 759 |  | 
|---|
| 760 | void DataTypeLowCardinality::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const | 
|---|
| 761 | { | 
|---|
| 762 | deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings); | 
|---|
| 763 | } | 
|---|
| 764 |  | 
|---|
| 765 | void DataTypeLowCardinality::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const | 
|---|
| 766 | { | 
|---|
| 767 | serializeImpl(column, row_num, &IDataType::serializeAsTextQuoted, ostr, settings); | 
|---|
| 768 | } | 
|---|
| 769 |  | 
|---|
| 770 | void DataTypeLowCardinality::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const | 
|---|
| 771 | { | 
|---|
| 772 | deserializeImpl(column, &IDataType::deserializeAsTextQuoted, istr, settings); | 
|---|
| 773 | } | 
|---|
| 774 |  | 
|---|
| 775 | void DataTypeLowCardinality::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const | 
|---|
| 776 | { | 
|---|
| 777 | deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings); | 
|---|
| 778 | } | 
|---|
| 779 |  | 
|---|
| 780 | void DataTypeLowCardinality::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const | 
|---|
| 781 | { | 
|---|
| 782 | serializeImpl(column, row_num, &IDataType::serializeAsTextCSV, ostr, settings); | 
|---|
| 783 | } | 
|---|
| 784 |  | 
|---|
| 785 | void DataTypeLowCardinality::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const | 
|---|
| 786 | { | 
|---|
| 787 | deserializeImpl(column, &IDataType::deserializeAsTextCSV, istr, settings); | 
|---|
| 788 | } | 
|---|
| 789 |  | 
|---|
| 790 | void DataTypeLowCardinality::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const | 
|---|
| 791 | { | 
|---|
| 792 | serializeImpl(column, row_num, &IDataType::serializeAsText, ostr, settings); | 
|---|
| 793 | } | 
|---|
| 794 |  | 
|---|
| 795 | void DataTypeLowCardinality::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const | 
|---|
| 796 | { | 
|---|
| 797 | serializeImpl(column, row_num, &IDataType::serializeAsTextJSON, ostr, settings); | 
|---|
| 798 | } | 
|---|
| 799 | void DataTypeLowCardinality::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const | 
|---|
| 800 | { | 
|---|
| 801 | deserializeImpl(column, &IDataType::deserializeAsTextJSON, istr, settings); | 
|---|
| 802 | } | 
|---|
| 803 |  | 
|---|
| 804 | void DataTypeLowCardinality::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const | 
|---|
| 805 | { | 
|---|
| 806 | serializeImpl(column, row_num, &IDataType::serializeAsTextXML, ostr, settings); | 
|---|
| 807 | } | 
|---|
| 808 |  | 
|---|
| 809 | void DataTypeLowCardinality::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const | 
|---|
| 810 | { | 
|---|
| 811 | serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf, value_index); | 
|---|
| 812 | } | 
|---|
| 813 |  | 
|---|
| 814 | void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const | 
|---|
| 815 | { | 
|---|
| 816 | if (allow_add_row) | 
|---|
| 817 | { | 
|---|
| 818 | deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added); | 
|---|
| 819 | return; | 
|---|
| 820 | } | 
|---|
| 821 |  | 
|---|
| 822 | row_added = false; | 
|---|
| 823 | auto & low_cardinality_column= getColumnLowCardinality(column); | 
|---|
| 824 | auto  nested_column = low_cardinality_column.getDictionary().getNestedColumn(); | 
|---|
| 825 | auto temp_column = nested_column->cloneEmpty(); | 
|---|
| 826 | size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1); | 
|---|
| 827 | temp_column->insertFrom(*nested_column, unique_row_number); | 
|---|
| 828 | bool dummy; | 
|---|
| 829 | dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy); | 
|---|
| 830 | low_cardinality_column.popBack(1); | 
|---|
| 831 | low_cardinality_column.insertFromFullColumn(*temp_column, 0); | 
|---|
| 832 | } | 
|---|
| 833 |  | 
|---|
| 834 | template <typename... Params, typename... Args> | 
|---|
| 835 | void DataTypeLowCardinality::serializeImpl( | 
|---|
| 836 | const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const | 
|---|
| 837 | { | 
|---|
| 838 | auto & low_cardinality_column = getColumnLowCardinality(column); | 
|---|
| 839 | size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num); | 
|---|
| 840 | (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward<Args>(args)...); | 
|---|
| 841 | } | 
|---|
| 842 |  | 
|---|
| 843 | template <typename... Params, typename... Args> | 
|---|
| 844 | void DataTypeLowCardinality::deserializeImpl( | 
|---|
| 845 | IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr<Params...> func, Args &&... args) const | 
|---|
| 846 | { | 
|---|
| 847 | auto & low_cardinality_column= getColumnLowCardinality(column); | 
|---|
| 848 | auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); | 
|---|
| 849 |  | 
|---|
| 850 | (dictionary_type.get()->*func)(*temp_column, std::forward<Args>(args)...); | 
|---|
| 851 |  | 
|---|
| 852 | low_cardinality_column.insertFromFullColumn(*temp_column, 0); | 
|---|
| 853 | } | 
|---|
| 854 |  | 
|---|
| 855 | namespace | 
|---|
| 856 | { | 
|---|
| 857 | template <typename Creator> | 
|---|
| 858 | struct CreateColumnVector | 
|---|
| 859 | { | 
|---|
| 860 | MutableColumnUniquePtr & column; | 
|---|
| 861 | const IDataType & keys_type; | 
|---|
| 862 | const Creator & creator; | 
|---|
| 863 |  | 
|---|
| 864 | CreateColumnVector(MutableColumnUniquePtr & column_, const IDataType & keys_type_, const Creator & creator_) | 
|---|
| 865 | : column(column_), keys_type(keys_type_), creator(creator_) | 
|---|
| 866 | { | 
|---|
| 867 | } | 
|---|
| 868 |  | 
|---|
| 869 | template <typename T, size_t> | 
|---|
| 870 | void operator()() | 
|---|
| 871 | { | 
|---|
| 872 | if (typeid_cast<const DataTypeNumber<T> *>(&keys_type)) | 
|---|
| 873 | column = creator(static_cast<ColumnVector<T> *>(nullptr)); | 
|---|
| 874 | } | 
|---|
| 875 | }; | 
|---|
| 876 | } | 
|---|
| 877 |  | 
|---|
| 878 | template <typename Creator> | 
|---|
| 879 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDataType & keys_type, | 
|---|
| 880 | const Creator & creator) | 
|---|
| 881 | { | 
|---|
| 882 | auto * type = &keys_type; | 
|---|
| 883 | if (auto * nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type)) | 
|---|
| 884 | type = nullable_type->getNestedType().get(); | 
|---|
| 885 |  | 
|---|
| 886 | if (isString(type)) | 
|---|
| 887 | return creator(static_cast<ColumnString *>(nullptr)); | 
|---|
| 888 | if (isFixedString(type)) | 
|---|
| 889 | return creator(static_cast<ColumnFixedString *>(nullptr)); | 
|---|
| 890 | if (typeid_cast<const DataTypeDate *>(type)) | 
|---|
| 891 | return creator(static_cast<ColumnVector<UInt16> *>(nullptr)); | 
|---|
| 892 | if (typeid_cast<const DataTypeDateTime *>(type)) | 
|---|
| 893 | return creator(static_cast<ColumnVector<UInt32> *>(nullptr)); | 
|---|
| 894 | if (isColumnedAsNumber(type)) | 
|---|
| 895 | { | 
|---|
| 896 | MutableColumnUniquePtr column; | 
|---|
| 897 | TypeListNativeNumbers::forEach(CreateColumnVector(column, *type, creator)); | 
|---|
| 898 |  | 
|---|
| 899 | if (!column) | 
|---|
| 900 | throw Exception( "Unexpected numeric type: "+ type->getName(), ErrorCodes::LOGICAL_ERROR); | 
|---|
| 901 |  | 
|---|
| 902 | return column; | 
|---|
| 903 | } | 
|---|
| 904 |  | 
|---|
| 905 | throw Exception( "Unexpected dictionary type for DataTypeLowCardinality: "+ type->getName(), | 
|---|
| 906 | ErrorCodes::LOGICAL_ERROR); | 
|---|
| 907 | } | 
|---|
| 908 |  | 
|---|
| 909 |  | 
|---|
| 910 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type) | 
|---|
| 911 | { | 
|---|
| 912 | auto creator = [&](auto x) | 
|---|
| 913 | { | 
|---|
| 914 | using ColumnType = typename std::remove_pointer<decltype(x)>::type; | 
|---|
| 915 | return ColumnUnique<ColumnType>::create(keys_type); | 
|---|
| 916 | }; | 
|---|
| 917 | return createColumnUniqueImpl(keys_type, creator); | 
|---|
| 918 | } | 
|---|
| 919 |  | 
|---|
| 920 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys) | 
|---|
| 921 | { | 
|---|
| 922 | auto creator = [&](auto x) | 
|---|
| 923 | { | 
|---|
| 924 | using ColumnType = typename std::remove_pointer<decltype(x)>::type; | 
|---|
| 925 | return ColumnUnique<ColumnType>::create(std::move(keys), keys_type.isNullable()); | 
|---|
| 926 | }; | 
|---|
| 927 | return createColumnUniqueImpl(keys_type, creator); | 
|---|
| 928 | } | 
|---|
| 929 |  | 
|---|
| 930 | MutableColumnPtr DataTypeLowCardinality::createColumn() const | 
|---|
| 931 | { | 
|---|
| 932 | MutableColumnPtr indexes = DataTypeUInt8().createColumn(); | 
|---|
| 933 | MutableColumnPtr dictionary = createColumnUnique(*dictionary_type); | 
|---|
| 934 | return ColumnLowCardinality::create(std::move(dictionary), std::move(indexes)); | 
|---|
| 935 | } | 
|---|
| 936 |  | 
|---|
| 937 | Field DataTypeLowCardinality::getDefault() const | 
|---|
| 938 | { | 
|---|
| 939 | return dictionary_type->getDefault(); | 
|---|
| 940 | } | 
|---|
| 941 |  | 
|---|
| 942 | bool DataTypeLowCardinality::equals(const IDataType & rhs) const | 
|---|
| 943 | { | 
|---|
| 944 | if (typeid(rhs) != typeid(*this)) | 
|---|
| 945 | return false; | 
|---|
| 946 |  | 
|---|
| 947 | auto & low_cardinality_rhs= static_cast<const DataTypeLowCardinality &>(rhs); | 
|---|
| 948 | return dictionary_type->equals(*low_cardinality_rhs.dictionary_type); | 
|---|
| 949 | } | 
|---|
| 950 |  | 
|---|
| 951 |  | 
|---|
| 952 | static DataTypePtr create(const String & /*type_name*/, const ASTPtr & arguments) | 
|---|
| 953 | { | 
|---|
| 954 | if (!arguments || arguments->children.size() != 1) | 
|---|
| 955 | throw Exception( "LowCardinality data type family must have single argument - type of elements", | 
|---|
| 956 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); | 
|---|
| 957 |  | 
|---|
| 958 | return std::make_shared<DataTypeLowCardinality>(DataTypeFactory::instance().get(arguments->children[0])); | 
|---|
| 959 | } | 
|---|
| 960 |  | 
|---|
| 961 | void registerDataTypeLowCardinality(DataTypeFactory & factory) | 
|---|
| 962 | { | 
|---|
| 963 | factory.registerDataType( "LowCardinality", create); | 
|---|
| 964 | } | 
|---|
| 965 |  | 
|---|
| 966 |  | 
|---|
| 967 | DataTypePtr removeLowCardinality(const DataTypePtr & type) | 
|---|
| 968 | { | 
|---|
| 969 | if (auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get())) | 
|---|
| 970 | return low_cardinality_type->getDictionaryType(); | 
|---|
| 971 | return type; | 
|---|
| 972 | } | 
|---|
| 973 |  | 
|---|
| 974 | } | 
|---|
| 975 |  | 
|---|