1#include <Columns/ColumnLowCardinality.h>
2#include <Columns/ColumnUnique.h>
3#include <Columns/ColumnFixedString.h>
4#include <Columns/ColumnsCommon.h>
5#include <Common/HashTable/HashMap.h>
6#include <Common/typeid_cast.h>
7#include <Common/assert_cast.h>
8#include <Core/Field.h>
9#include <Core/TypeListNumber.h>
10#include <DataTypes/DataTypeFactory.h>
11#include <DataTypes/DataTypeLowCardinality.h>
12#include <DataTypes/DataTypeNullable.h>
13#include <DataTypes/DataTypeDate.h>
14#include <DataTypes/DataTypeDateTime.h>
15#include <Parsers/IAST.h>
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
23 extern const int LOGICAL_ERROR;
24 extern const int ILLEGAL_TYPE_OF_ARGUMENT;
25}
26
27namespace
28{
29 const ColumnLowCardinality & getColumnLowCardinality(const IColumn & column)
30 {
31 return typeid_cast<const ColumnLowCardinality &>(column);
32 }
33
34 ColumnLowCardinality & getColumnLowCardinality(IColumn & column)
35 {
36 return typeid_cast<ColumnLowCardinality &>(column);
37 }
38}
39
40DataTypeLowCardinality::DataTypeLowCardinality(DataTypePtr dictionary_type_)
41 : dictionary_type(std::move(dictionary_type_))
42{
43 auto inner_type = dictionary_type;
44 if (dictionary_type->isNullable())
45 inner_type = static_cast<const DataTypeNullable &>(*dictionary_type).getNestedType();
46
47 if (!inner_type->canBeInsideLowCardinality())
48 throw Exception("DataTypeLowCardinality is supported only for numbers, strings, Date or DateTime, but got "
49 + dictionary_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
50}
51
52void DataTypeLowCardinality::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
53{
54 path.push_back(Substream::DictionaryKeys);
55 dictionary_type->enumerateStreams(callback, path);
56 path.back() = Substream::DictionaryIndexes;
57 callback(path);
58 path.pop_back();
59}
60
61struct KeysSerializationVersion
62{
63 enum Value
64 {
65 /// Version is written at the start of <name.dict.bin>.
66 /// Dictionary is written as number N and N keys after them.
67 /// Dictionary can be shared for continuous range of granules, so some marks may point to the same position.
68 /// Shared dictionary is stored in state and is read once.
69 SharedDictionariesWithAdditionalKeys = 1,
70 };
71
72 Value value;
73
74 static void checkVersion(UInt64 version)
75 {
76 if (version != SharedDictionariesWithAdditionalKeys)
77 throw Exception("Invalid version for DataTypeLowCardinality key column.", ErrorCodes::LOGICAL_ERROR);
78 }
79
80 KeysSerializationVersion(UInt64 version) : value(static_cast<Value>(version)) { checkVersion(version); }
81};
82
83/// Version is stored at the start of each granule. It's used to store indexes type and flags.
84struct IndexesSerializationType
85{
86 using SerializationType = UInt64;
87 /// Need to read dictionary if it wasn't.
88 static constexpr SerializationType NeedGlobalDictionaryBit = 1u << 8u;
89 /// Need to read additional keys. Additional keys are stored before indexes as value N and N keys after them.
90 static constexpr SerializationType HasAdditionalKeysBit = 1u << 9u;
91 /// Need to update dictionary. It means that previous granule has different dictionary.
92 static constexpr SerializationType NeedUpdateDictionary = 1u << 10u;
93
94 enum Type
95 {
96 TUInt8 = 0,
97 TUInt16,
98 TUInt32,
99 TUInt64,
100 };
101
102 Type type;
103 bool has_additional_keys;
104 bool need_global_dictionary;
105 bool need_update_dictionary;
106
107 static constexpr SerializationType resetFlags(SerializationType type)
108 {
109 return type & (~(HasAdditionalKeysBit | NeedGlobalDictionaryBit | NeedUpdateDictionary));
110 }
111
112 static void checkType(SerializationType type)
113 {
114 UInt64 value = resetFlags(type);
115 if (value <= TUInt64)
116 return;
117
118 throw Exception("Invalid type for DataTypeLowCardinality index column.", ErrorCodes::LOGICAL_ERROR);
119 }
120
121 void serialize(WriteBuffer & buffer) const
122 {
123 SerializationType val = type;
124 if (has_additional_keys)
125 val |= HasAdditionalKeysBit;
126 if (need_global_dictionary)
127 val |= NeedGlobalDictionaryBit;
128 if (need_update_dictionary)
129 val |= NeedUpdateDictionary;
130 writeIntBinary(val, buffer);
131 }
132
133 void deserialize(ReadBuffer & buffer)
134 {
135 SerializationType val;
136 readIntBinary(val, buffer);
137 checkType(val);
138 has_additional_keys = (val & HasAdditionalKeysBit) != 0;
139 need_global_dictionary = (val & NeedGlobalDictionaryBit) != 0;
140 need_update_dictionary = (val & NeedUpdateDictionary) != 0;
141 type = static_cast<Type>(resetFlags(val));
142 }
143
144 IndexesSerializationType(const IColumn & column,
145 bool has_additional_keys_,
146 bool need_global_dictionary_,
147 bool enumerate_dictionaries)
148 : has_additional_keys(has_additional_keys_)
149 , need_global_dictionary(need_global_dictionary_)
150 , need_update_dictionary(enumerate_dictionaries)
151 {
152 if (typeid_cast<const ColumnUInt8 *>(&column))
153 type = TUInt8;
154 else if (typeid_cast<const ColumnUInt16 *>(&column))
155 type = TUInt16;
156 else if (typeid_cast<const ColumnUInt32 *>(&column))
157 type = TUInt32;
158 else if (typeid_cast<const ColumnUInt64 *>(&column))
159 type = TUInt64;
160 else
161 throw Exception("Invalid Indexes column for IndexesSerializationType. Expected ColumnUInt*, got "
162 + column.getName(), ErrorCodes::LOGICAL_ERROR);
163 }
164
165 DataTypePtr getDataType() const
166 {
167 if (type == TUInt8)
168 return std::make_shared<DataTypeUInt8>();
169 if (type == TUInt16)
170 return std::make_shared<DataTypeUInt16>();
171 if (type == TUInt32)
172 return std::make_shared<DataTypeUInt32>();
173 if (type == TUInt64)
174 return std::make_shared<DataTypeUInt64>();
175
176 throw Exception("Can't create DataType from IndexesSerializationType.", ErrorCodes::LOGICAL_ERROR);
177 }
178
179 IndexesSerializationType() = default;
180};
181
182struct SerializeStateLowCardinality : public IDataType::SerializeBinaryBulkState
183{
184 KeysSerializationVersion key_version;
185 MutableColumnUniquePtr shared_dictionary;
186
187 explicit SerializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {}
188};
189
190struct DeserializeStateLowCardinality : public IDataType::DeserializeBinaryBulkState
191{
192 KeysSerializationVersion key_version;
193 ColumnUniquePtr global_dictionary;
194
195 IndexesSerializationType index_type;
196 ColumnPtr additional_keys;
197 ColumnPtr null_map;
198 UInt64 num_pending_rows = 0;
199
200 /// If dictionary should be updated.
201 /// Can happen is some granules was skipped while reading from MergeTree.
202 /// We should store this flag in State because
203 /// in case of long block of empty arrays we may not need read dictionary at first reading.
204 bool need_update_dictionary = false;
205
206 explicit DeserializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {}
207};
208
209static SerializeStateLowCardinality * checkAndGetLowCardinalitySerializeState(
210 IDataType::SerializeBinaryBulkStatePtr & state)
211{
212 if (!state)
213 throw Exception("Got empty state for DataTypeLowCardinality.", ErrorCodes::LOGICAL_ERROR);
214
215 auto * low_cardinality_state = typeid_cast<SerializeStateLowCardinality *>(state.get());
216 if (!low_cardinality_state)
217 {
218 auto & state_ref = *state;
219 throw Exception("Invalid SerializeBinaryBulkState for DataTypeLowCardinality. Expected: "
220 + demangle(typeid(SerializeStateLowCardinality).name()) + ", got "
221 + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR);
222 }
223
224 return low_cardinality_state;
225}
226
227static DeserializeStateLowCardinality * checkAndGetLowCardinalityDeserializeState(
228 IDataType::DeserializeBinaryBulkStatePtr & state)
229{
230 if (!state)
231 throw Exception("Got empty state for DataTypeLowCardinality.", ErrorCodes::LOGICAL_ERROR);
232
233 auto * low_cardinality_state = typeid_cast<DeserializeStateLowCardinality *>(state.get());
234 if (!low_cardinality_state)
235 {
236 auto & state_ref = *state;
237 throw Exception("Invalid DeserializeBinaryBulkState for DataTypeLowCardinality. Expected: "
238 + demangle(typeid(DeserializeStateLowCardinality).name()) + ", got "
239 + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR);
240 }
241
242 return low_cardinality_state;
243}
244
245void DataTypeLowCardinality::serializeBinaryBulkStatePrefix(
246 SerializeBinaryBulkSettings & settings,
247 SerializeBinaryBulkStatePtr & state) const
248{
249 settings.path.push_back(Substream::DictionaryKeys);
250 auto * stream = settings.getter(settings.path);
251 settings.path.pop_back();
252
253 if (!stream)
254 throw Exception("Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStatePrefix",
255 ErrorCodes::LOGICAL_ERROR);
256
257 /// Write version and create SerializeBinaryBulkState.
258 UInt64 key_version = KeysSerializationVersion::SharedDictionariesWithAdditionalKeys;
259
260 writeIntBinary(key_version, *stream);
261
262 state = std::make_shared<SerializeStateLowCardinality>(key_version);
263}
264
265void DataTypeLowCardinality::serializeBinaryBulkStateSuffix(
266 SerializeBinaryBulkSettings & settings,
267 SerializeBinaryBulkStatePtr & state) const
268{
269 auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state);
270 KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value);
271
272 if (low_cardinality_state->shared_dictionary && settings.low_cardinality_max_dictionary_size)
273 {
274 auto nested_column = low_cardinality_state->shared_dictionary->getNestedNotNullableColumn();
275
276 settings.path.push_back(Substream::DictionaryKeys);
277 auto * stream = settings.getter(settings.path);
278 settings.path.pop_back();
279
280 if (!stream)
281 throw Exception("Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStateSuffix",
282 ErrorCodes::LOGICAL_ERROR);
283
284 UInt64 num_keys = nested_column->size();
285 writeIntBinary(num_keys, *stream);
286 removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *stream, 0, num_keys);
287 low_cardinality_state->shared_dictionary = nullptr;
288 }
289}
290
291void DataTypeLowCardinality::deserializeBinaryBulkStatePrefix(
292 DeserializeBinaryBulkSettings & settings,
293 DeserializeBinaryBulkStatePtr & state) const
294{
295 settings.path.push_back(Substream::DictionaryKeys);
296 auto * stream = settings.getter(settings.path);
297 settings.path.pop_back();
298
299 if (!stream)
300 return;
301
302 UInt64 keys_version;
303 readIntBinary(keys_version, *stream);
304
305 state = std::make_shared<DeserializeStateLowCardinality>(keys_version);
306}
307
308namespace
309{
310 template <typename T>
311 PaddedPODArray<T> * getIndexesData(IColumn & indexes)
312 {
313 auto * column = typeid_cast<ColumnVector<T> *>(&indexes);
314 if (column)
315 return &column->getData();
316
317 return nullptr;
318 }
319
320 struct IndexMapsWithAdditionalKeys
321 {
322 MutableColumnPtr dictionary_map;
323 MutableColumnPtr additional_keys_map;
324 };
325
326 template <typename T>
327 IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeysRef(PaddedPODArray<T> & index, size_t dict_size)
328 {
329 PaddedPODArray<T> copy(index.cbegin(), index.cend());
330
331 HashMap<T, T> dict_map;
332 HashMap<T, T> add_keys_map;
333
334 for (auto val : index)
335 {
336 if (val < dict_size)
337 dict_map.insert({val, dict_map.size()});
338 else
339 add_keys_map.insert({val, add_keys_map.size()});
340 }
341
342 auto dictionary_map = ColumnVector<T>::create(dict_map.size());
343 auto additional_keys_map = ColumnVector<T>::create(add_keys_map.size());
344 auto & dict_data = dictionary_map->getData();
345 auto & add_keys_data = additional_keys_map->getData();
346
347 for (auto val : dict_map)
348 dict_data[val.second] = val.first;
349
350 for (auto val : add_keys_map)
351 add_keys_data[val.second] = val.first - dict_size;
352
353 for (auto & val : index)
354 val = val < dict_size ? dict_map[val]
355 : add_keys_map[val] + dict_map.size();
356
357 for (size_t i = 0; i < index.size(); ++i)
358 {
359 T expected = index[i] < dict_data.size() ? dict_data[index[i]]
360 : add_keys_data[index[i] - dict_data.size()] + dict_size;
361 if (expected != copy[i])
362 throw Exception("Expected " + toString(expected) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR);
363
364 }
365
366 return {std::move(dictionary_map), std::move(additional_keys_map)};
367 }
368
369 template <typename T>
370 IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(PaddedPODArray<T> & index, size_t dict_size)
371 {
372 T max_less_dict_size = 0;
373 T max_value = 0;
374
375 auto size = index.size();
376 if (size == 0)
377 return {ColumnVector<T>::create(), ColumnVector<T>::create()};
378
379 for (size_t i = 0; i < size; ++i)
380 {
381 auto val = index[i];
382 if (val < dict_size)
383 max_less_dict_size = std::max(max_less_dict_size, val);
384
385 max_value = std::max(max_value, val);
386 }
387
388 auto map_size = UInt64(max_less_dict_size) + 1;
389 auto overflow_map_size = max_value >= dict_size ? (UInt64(max_value - dict_size) + 1) : 0;
390 PaddedPODArray<T> map(map_size, 0);
391 PaddedPODArray<T> overflow_map(overflow_map_size, 0);
392
393 T zero_pos_value = 0;
394 T zero_pos_overflowed_value = 0;
395 UInt64 cur_pos = 0;
396 UInt64 cur_overflowed_pos = 0;
397
398 for (size_t i = 0; i < size; ++i)
399 {
400 T val = index[i];
401 if (val < dict_size)
402 {
403 if (cur_pos == 0)
404 {
405 zero_pos_value = val;
406 ++cur_pos;
407 }
408 else if (map[val] == 0 && val != zero_pos_value)
409 {
410 map[val] = cur_pos;
411 ++cur_pos;
412 }
413 }
414 else
415 {
416 T shifted_val = val - dict_size;
417 if (cur_overflowed_pos == 0)
418 {
419 zero_pos_overflowed_value = shifted_val;
420 ++cur_overflowed_pos;
421 }
422 else if (overflow_map[shifted_val] == 0 && shifted_val != zero_pos_overflowed_value)
423 {
424 overflow_map[shifted_val] = cur_overflowed_pos;
425 ++cur_overflowed_pos;
426 }
427 }
428 }
429
430 auto dictionary_map = ColumnVector<T>::create(cur_pos);
431 auto additional_keys_map = ColumnVector<T>::create(cur_overflowed_pos);
432 auto & dict_data = dictionary_map->getData();
433 auto & add_keys_data = additional_keys_map->getData();
434
435 for (size_t i = 0; i < map_size; ++i)
436 if (map[i])
437 dict_data[map[i]] = static_cast<T>(i);
438
439 for (size_t i = 0; i < overflow_map_size; ++i)
440 if (overflow_map[i])
441 add_keys_data[overflow_map[i]] = static_cast<T>(i);
442
443 if (cur_pos)
444 dict_data[0] = zero_pos_value;
445 if (cur_overflowed_pos)
446 add_keys_data[0] = zero_pos_overflowed_value;
447
448 for (size_t i = 0; i < size; ++i)
449 {
450 T & val = index[i];
451 if (val < dict_size)
452 val = map[val];
453 else
454 val = overflow_map[val - dict_size] + cur_pos;
455 }
456
457 return {std::move(dictionary_map), std::move(additional_keys_map)};
458 }
459
460 /// Update column and return map with old indexes.
461 /// Let N is the number of distinct values which are less than max_size;
462 /// old_column - column before function call;
463 /// new_column - column after function call:
464 /// * if old_column[i] < max_size, than
465 /// dictionary_map[new_column[i]] = old_column[i]
466 /// * else
467 /// additional_keys_map[new_column[i]] = old_column[i] - dict_size + N
468 IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(IColumn & column, size_t dict_size)
469 {
470 if (auto * data_uint8 = getIndexesData<UInt8>(column))
471 return mapIndexWithAdditionalKeys(*data_uint8, dict_size);
472 else if (auto * data_uint16 = getIndexesData<UInt16>(column))
473 return mapIndexWithAdditionalKeys(*data_uint16, dict_size);
474 else if (auto * data_uint32 = getIndexesData<UInt32>(column))
475 return mapIndexWithAdditionalKeys(*data_uint32, dict_size);
476 else if (auto * data_uint64 = getIndexesData<UInt64>(column))
477 return mapIndexWithAdditionalKeys(*data_uint64, dict_size);
478 else
479 throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got" + column.getName(),
480 ErrorCodes::LOGICAL_ERROR);
481 }
482}
483
484void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams(
485 const IColumn & column,
486 size_t offset,
487 size_t limit,
488 SerializeBinaryBulkSettings & settings,
489 SerializeBinaryBulkStatePtr & state) const
490{
491 settings.path.push_back(Substream::DictionaryKeys);
492 auto * keys_stream = settings.getter(settings.path);
493 settings.path.back() = Substream::DictionaryIndexes;
494 auto * indexes_stream = settings.getter(settings.path);
495 settings.path.pop_back();
496
497 if (!keys_stream && !indexes_stream)
498 return;
499
500 if (!keys_stream)
501 throw Exception("Got empty stream for DataTypeLowCardinality keys.", ErrorCodes::LOGICAL_ERROR);
502
503 if (!indexes_stream)
504 throw Exception("Got empty stream for DataTypeLowCardinality indexes.", ErrorCodes::LOGICAL_ERROR);
505
506 const ColumnLowCardinality & low_cardinality_column = typeid_cast<const ColumnLowCardinality &>(column);
507
508 auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state);
509 auto & global_dictionary = low_cardinality_state->shared_dictionary;
510 KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value);
511
512 bool need_update_dictionary = global_dictionary == nullptr;
513 if (need_update_dictionary)
514 global_dictionary = createColumnUnique(*dictionary_type);
515
516 size_t max_limit = column.size() - offset;
517 limit = limit ? std::min(limit, max_limit) : max_limit;
518
519 /// Do not write anything for empty column. (May happen while writing empty arrays.)
520 if (limit == 0)
521 return;
522
523 auto sub_column = low_cardinality_column.cutAndCompact(offset, limit);
524 ColumnPtr positions = sub_column->getIndexesPtr();
525 ColumnPtr keys = sub_column->getDictionary().getNestedColumn();
526
527 if (settings.low_cardinality_max_dictionary_size)
528 {
529 /// Insert used_keys into global dictionary and update sub_index.
530 auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*keys, 0, keys->size(),
531 settings.low_cardinality_max_dictionary_size);
532 size_t max_size = settings.low_cardinality_max_dictionary_size + indexes_with_overflow.overflowed_keys->size();
533 ColumnLowCardinality::Index(indexes_with_overflow.indexes->getPtr()).check(max_size);
534
535 if (global_dictionary->size() > settings.low_cardinality_max_dictionary_size)
536 throw Exception("Got dictionary with size " + toString(global_dictionary->size()) +
537 " but max dictionary size is " + toString(settings.low_cardinality_max_dictionary_size),
538 ErrorCodes::LOGICAL_ERROR);
539
540 positions = indexes_with_overflow.indexes->index(*positions, 0);
541 keys = std::move(indexes_with_overflow.overflowed_keys);
542
543 if (global_dictionary->size() < settings.low_cardinality_max_dictionary_size && !keys->empty())
544 throw Exception("Has additional keys, but dict size is " + toString(global_dictionary->size()) +
545 " which is less then max dictionary size (" + toString(settings.low_cardinality_max_dictionary_size) + ")",
546 ErrorCodes::LOGICAL_ERROR);
547 }
548
549 if (auto * nullable_keys = checkAndGetColumn<ColumnNullable>(*keys))
550 keys = nullable_keys->getNestedColumnPtr();
551
552 bool need_additional_keys = !keys->empty();
553 bool need_dictionary = settings.low_cardinality_max_dictionary_size != 0;
554 bool need_write_dictionary = !settings.low_cardinality_use_single_dictionary_for_part
555 && global_dictionary->size() >= settings.low_cardinality_max_dictionary_size;
556
557 IndexesSerializationType index_version(*positions, need_additional_keys, need_dictionary, need_update_dictionary);
558 index_version.serialize(*indexes_stream);
559
560 if (need_write_dictionary)
561 {
562 const auto & nested_column = global_dictionary->getNestedNotNullableColumn();
563 UInt64 num_keys = nested_column->size();
564 writeIntBinary(num_keys, *keys_stream);
565 removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *keys_stream, 0, num_keys);
566 low_cardinality_state->shared_dictionary = nullptr;
567 }
568
569 if (need_additional_keys)
570 {
571 UInt64 num_keys = keys->size();
572 writeIntBinary(num_keys, *indexes_stream);
573 removeNullable(dictionary_type)->serializeBinaryBulk(*keys, *indexes_stream, 0, num_keys);
574 }
575
576 UInt64 num_rows = positions->size();
577 writeIntBinary(num_rows, *indexes_stream);
578 index_version.getDataType()->serializeBinaryBulk(*positions, *indexes_stream, 0, num_rows);
579}
580
581void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams(
582 IColumn & column,
583 size_t limit,
584 DeserializeBinaryBulkSettings & settings,
585 DeserializeBinaryBulkStatePtr & state) const
586{
587 ColumnLowCardinality & low_cardinality_column = typeid_cast<ColumnLowCardinality &>(column);
588
589 settings.path.push_back(Substream::DictionaryKeys);
590 auto * keys_stream = settings.getter(settings.path);
591 settings.path.back() = Substream::DictionaryIndexes;
592 auto * indexes_stream = settings.getter(settings.path);
593 settings.path.pop_back();
594
595 if (!keys_stream && !indexes_stream)
596 return;
597
598 if (!keys_stream)
599 throw Exception("Got empty stream for DataTypeLowCardinality keys.", ErrorCodes::LOGICAL_ERROR);
600
601 if (!indexes_stream)
602 throw Exception("Got empty stream for DataTypeLowCardinality indexes.", ErrorCodes::LOGICAL_ERROR);
603
604 auto * low_cardinality_state = checkAndGetLowCardinalityDeserializeState(state);
605 KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value);
606
607 auto readDictionary = [this, low_cardinality_state, keys_stream]()
608 {
609 UInt64 num_keys;
610 readIntBinary(num_keys, *keys_stream);
611
612 auto keys_type = removeNullable(dictionary_type);
613 auto global_dict_keys = keys_type->createColumn();
614 keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0);
615
616 auto column_unique = createColumnUnique(*dictionary_type, std::move(global_dict_keys));
617 low_cardinality_state->global_dictionary = std::move(column_unique);
618 };
619
620 auto readAdditionalKeys = [this, low_cardinality_state, indexes_stream]()
621 {
622 UInt64 num_keys;
623 readIntBinary(num_keys, *indexes_stream);
624 auto keys_type = removeNullable(dictionary_type);
625 auto additional_keys = keys_type->createColumn();
626 keys_type->deserializeBinaryBulk(*additional_keys, *indexes_stream, num_keys, 0);
627 low_cardinality_state->additional_keys = std::move(additional_keys);
628
629 if (!low_cardinality_state->index_type.need_global_dictionary && dictionary_type->isNullable())
630 {
631 auto null_map = ColumnUInt8::create(num_keys, 0);
632 if (num_keys)
633 null_map->getElement(0) = 1;
634
635 low_cardinality_state->null_map = std::move(null_map);
636 }
637 };
638
639 auto readIndexes = [this, low_cardinality_state, indexes_stream, &low_cardinality_column](UInt64 num_rows)
640 {
641 auto indexes_type = low_cardinality_state->index_type.getDataType();
642 MutableColumnPtr indexes_column = indexes_type->createColumn();
643 indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0);
644
645 auto & global_dictionary = low_cardinality_state->global_dictionary;
646 const auto & additional_keys = low_cardinality_state->additional_keys;
647
648 bool has_additional_keys = low_cardinality_state->index_type.has_additional_keys;
649 bool column_is_empty = low_cardinality_column.empty();
650
651 if (!low_cardinality_state->index_type.need_global_dictionary)
652 {
653 ColumnPtr keys_column = additional_keys;
654 if (low_cardinality_state->null_map)
655 keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map);
656 low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*keys_column, *indexes_column);
657 }
658 else if (!has_additional_keys)
659 {
660 if (column_is_empty)
661 low_cardinality_column.setSharedDictionary(global_dictionary);
662
663 auto local_column = ColumnLowCardinality::create(global_dictionary, std::move(indexes_column));
664 low_cardinality_column.insertRangeFrom(*local_column, 0, num_rows);
665 }
666 else
667 {
668 auto maps = mapIndexWithAdditionalKeys(*indexes_column, global_dictionary->size());
669
670 ColumnLowCardinality::Index(maps.additional_keys_map->getPtr()).check(additional_keys->size());
671
672 ColumnLowCardinality::Index(indexes_column->getPtr()).check(
673 maps.dictionary_map->size() + maps.additional_keys_map->size());
674
675 auto used_keys = (*std::move(global_dictionary->getNestedColumn()->index(*maps.dictionary_map, 0))).mutate();
676
677 if (!maps.additional_keys_map->empty())
678 {
679 auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0);
680
681 if (dictionary_type->isNullable())
682 {
683 ColumnPtr null_map = ColumnUInt8::create(used_add_keys->size(), 0);
684 used_add_keys = ColumnNullable::create(used_add_keys, null_map);
685 }
686
687 used_keys->insertRangeFrom(*used_add_keys, 0, used_add_keys->size());
688 }
689
690 low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*used_keys, *indexes_column);
691 }
692 };
693
694 if (!settings.continuous_reading)
695 {
696 low_cardinality_state->num_pending_rows = 0;
697
698 /// Remember in state that some granules were skipped and we need to update dictionary.
699 low_cardinality_state->need_update_dictionary = true;
700 }
701
702 while (limit)
703 {
704 if (low_cardinality_state->num_pending_rows == 0)
705 {
706 if (indexes_stream->eof())
707 break;
708
709 auto & index_type = low_cardinality_state->index_type;
710 auto & global_dictionary = low_cardinality_state->global_dictionary;
711
712 index_type.deserialize(*indexes_stream);
713
714 bool need_update_dictionary =
715 !global_dictionary || index_type.need_update_dictionary || low_cardinality_state->need_update_dictionary;
716 if (index_type.need_global_dictionary && need_update_dictionary)
717 {
718 readDictionary();
719 low_cardinality_state->need_update_dictionary = false;
720 }
721
722 if (low_cardinality_state->index_type.has_additional_keys)
723 readAdditionalKeys();
724 else
725 low_cardinality_state->additional_keys = nullptr;
726
727 readIntBinary(low_cardinality_state->num_pending_rows, *indexes_stream);
728 }
729
730 size_t num_rows_to_read = std::min<UInt64>(limit, low_cardinality_state->num_pending_rows);
731 readIndexes(num_rows_to_read);
732 limit -= num_rows_to_read;
733 low_cardinality_state->num_pending_rows -= num_rows_to_read;
734 }
735}
736
737void DataTypeLowCardinality::serializeBinary(const Field & field, WriteBuffer & ostr) const
738{
739 dictionary_type->serializeBinary(field, ostr);
740}
741void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr) const
742{
743 dictionary_type->deserializeBinary(field, istr);
744}
745
746void DataTypeLowCardinality::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
747{
748 serializeImpl(column, row_num, &IDataType::serializeBinary, ostr);
749}
750void DataTypeLowCardinality::deserializeBinary(IColumn & column, ReadBuffer & istr) const
751{
752 deserializeImpl(column, &IDataType::deserializeBinary, istr);
753}
754
755void DataTypeLowCardinality::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
756{
757 serializeImpl(column, row_num, &IDataType::serializeAsTextEscaped, ostr, settings);
758}
759
760void DataTypeLowCardinality::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
761{
762 deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings);
763}
764
765void DataTypeLowCardinality::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
766{
767 serializeImpl(column, row_num, &IDataType::serializeAsTextQuoted, ostr, settings);
768}
769
770void DataTypeLowCardinality::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
771{
772 deserializeImpl(column, &IDataType::deserializeAsTextQuoted, istr, settings);
773}
774
775void DataTypeLowCardinality::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
776{
777 deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings);
778}
779
780void DataTypeLowCardinality::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
781{
782 serializeImpl(column, row_num, &IDataType::serializeAsTextCSV, ostr, settings);
783}
784
785void DataTypeLowCardinality::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
786{
787 deserializeImpl(column, &IDataType::deserializeAsTextCSV, istr, settings);
788}
789
790void DataTypeLowCardinality::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
791{
792 serializeImpl(column, row_num, &IDataType::serializeAsText, ostr, settings);
793}
794
795void DataTypeLowCardinality::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
796{
797 serializeImpl(column, row_num, &IDataType::serializeAsTextJSON, ostr, settings);
798}
799void DataTypeLowCardinality::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
800{
801 deserializeImpl(column, &IDataType::deserializeAsTextJSON, istr, settings);
802}
803
804void DataTypeLowCardinality::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
805{
806 serializeImpl(column, row_num, &IDataType::serializeAsTextXML, ostr, settings);
807}
808
809void DataTypeLowCardinality::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
810{
811 serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf, value_index);
812}
813
814void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
815{
816 if (allow_add_row)
817 {
818 deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added);
819 return;
820 }
821
822 row_added = false;
823 auto & low_cardinality_column= getColumnLowCardinality(column);
824 auto nested_column = low_cardinality_column.getDictionary().getNestedColumn();
825 auto temp_column = nested_column->cloneEmpty();
826 size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1);
827 temp_column->insertFrom(*nested_column, unique_row_number);
828 bool dummy;
829 dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy);
830 low_cardinality_column.popBack(1);
831 low_cardinality_column.insertFromFullColumn(*temp_column, 0);
832}
833
834template <typename... Params, typename... Args>
835void DataTypeLowCardinality::serializeImpl(
836 const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const
837{
838 auto & low_cardinality_column = getColumnLowCardinality(column);
839 size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num);
840 (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward<Args>(args)...);
841}
842
843template <typename... Params, typename... Args>
844void DataTypeLowCardinality::deserializeImpl(
845 IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr<Params...> func, Args &&... args) const
846{
847 auto & low_cardinality_column= getColumnLowCardinality(column);
848 auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty();
849
850 (dictionary_type.get()->*func)(*temp_column, std::forward<Args>(args)...);
851
852 low_cardinality_column.insertFromFullColumn(*temp_column, 0);
853}
854
855namespace
856{
857 template <typename Creator>
858 struct CreateColumnVector
859 {
860 MutableColumnUniquePtr & column;
861 const IDataType & keys_type;
862 const Creator & creator;
863
864 CreateColumnVector(MutableColumnUniquePtr & column_, const IDataType & keys_type_, const Creator & creator_)
865 : column(column_), keys_type(keys_type_), creator(creator_)
866 {
867 }
868
869 template <typename T, size_t>
870 void operator()()
871 {
872 if (typeid_cast<const DataTypeNumber<T> *>(&keys_type))
873 column = creator(static_cast<ColumnVector<T> *>(nullptr));
874 }
875 };
876}
877
878template <typename Creator>
879MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDataType & keys_type,
880 const Creator & creator)
881{
882 auto * type = &keys_type;
883 if (auto * nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type))
884 type = nullable_type->getNestedType().get();
885
886 if (isString(type))
887 return creator(static_cast<ColumnString *>(nullptr));
888 if (isFixedString(type))
889 return creator(static_cast<ColumnFixedString *>(nullptr));
890 if (typeid_cast<const DataTypeDate *>(type))
891 return creator(static_cast<ColumnVector<UInt16> *>(nullptr));
892 if (typeid_cast<const DataTypeDateTime *>(type))
893 return creator(static_cast<ColumnVector<UInt32> *>(nullptr));
894 if (isColumnedAsNumber(type))
895 {
896 MutableColumnUniquePtr column;
897 TypeListNativeNumbers::forEach(CreateColumnVector(column, *type, creator));
898
899 if (!column)
900 throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR);
901
902 return column;
903 }
904
905 throw Exception("Unexpected dictionary type for DataTypeLowCardinality: " + type->getName(),
906 ErrorCodes::LOGICAL_ERROR);
907}
908
909
910MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type)
911{
912 auto creator = [&](auto x)
913 {
914 using ColumnType = typename std::remove_pointer<decltype(x)>::type;
915 return ColumnUnique<ColumnType>::create(keys_type);
916 };
917 return createColumnUniqueImpl(keys_type, creator);
918}
919
920MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys)
921{
922 auto creator = [&](auto x)
923 {
924 using ColumnType = typename std::remove_pointer<decltype(x)>::type;
925 return ColumnUnique<ColumnType>::create(std::move(keys), keys_type.isNullable());
926 };
927 return createColumnUniqueImpl(keys_type, creator);
928}
929
930MutableColumnPtr DataTypeLowCardinality::createColumn() const
931{
932 MutableColumnPtr indexes = DataTypeUInt8().createColumn();
933 MutableColumnPtr dictionary = createColumnUnique(*dictionary_type);
934 return ColumnLowCardinality::create(std::move(dictionary), std::move(indexes));
935}
936
937Field DataTypeLowCardinality::getDefault() const
938{
939 return dictionary_type->getDefault();
940}
941
942bool DataTypeLowCardinality::equals(const IDataType & rhs) const
943{
944 if (typeid(rhs) != typeid(*this))
945 return false;
946
947 auto & low_cardinality_rhs= static_cast<const DataTypeLowCardinality &>(rhs);
948 return dictionary_type->equals(*low_cardinality_rhs.dictionary_type);
949}
950
951
952static DataTypePtr create(const String & /*type_name*/, const ASTPtr & arguments)
953{
954 if (!arguments || arguments->children.size() != 1)
955 throw Exception("LowCardinality data type family must have single argument - type of elements",
956 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
957
958 return std::make_shared<DataTypeLowCardinality>(DataTypeFactory::instance().get(arguments->children[0]));
959}
960
961void registerDataTypeLowCardinality(DataTypeFactory & factory)
962{
963 factory.registerDataType("LowCardinality", create);
964}
965
966
967DataTypePtr removeLowCardinality(const DataTypePtr & type)
968{
969 if (auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
970 return low_cardinality_type->getDictionaryType();
971 return type;
972}
973
974}
975