1 | #include <Columns/ColumnLowCardinality.h> |
2 | #include <Columns/ColumnUnique.h> |
3 | #include <Columns/ColumnFixedString.h> |
4 | #include <Columns/ColumnsCommon.h> |
5 | #include <Common/HashTable/HashMap.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include <Common/assert_cast.h> |
8 | #include <Core/Field.h> |
9 | #include <Core/TypeListNumber.h> |
10 | #include <DataTypes/DataTypeFactory.h> |
11 | #include <DataTypes/DataTypeLowCardinality.h> |
12 | #include <DataTypes/DataTypeNullable.h> |
13 | #include <DataTypes/DataTypeDate.h> |
14 | #include <DataTypes/DataTypeDateTime.h> |
15 | #include <Parsers/IAST.h> |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | namespace ErrorCodes |
21 | { |
22 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
23 | extern const int LOGICAL_ERROR; |
24 | extern const int ILLEGAL_TYPE_OF_ARGUMENT; |
25 | } |
26 | |
27 | namespace |
28 | { |
29 | const ColumnLowCardinality & getColumnLowCardinality(const IColumn & column) |
30 | { |
31 | return typeid_cast<const ColumnLowCardinality &>(column); |
32 | } |
33 | |
34 | ColumnLowCardinality & getColumnLowCardinality(IColumn & column) |
35 | { |
36 | return typeid_cast<ColumnLowCardinality &>(column); |
37 | } |
38 | } |
39 | |
40 | DataTypeLowCardinality::DataTypeLowCardinality(DataTypePtr dictionary_type_) |
41 | : dictionary_type(std::move(dictionary_type_)) |
42 | { |
43 | auto inner_type = dictionary_type; |
44 | if (dictionary_type->isNullable()) |
45 | inner_type = static_cast<const DataTypeNullable &>(*dictionary_type).getNestedType(); |
46 | |
47 | if (!inner_type->canBeInsideLowCardinality()) |
48 | throw Exception("DataTypeLowCardinality is supported only for numbers, strings, Date or DateTime, but got " |
49 | + dictionary_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); |
50 | } |
51 | |
52 | void DataTypeLowCardinality::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const |
53 | { |
54 | path.push_back(Substream::DictionaryKeys); |
55 | dictionary_type->enumerateStreams(callback, path); |
56 | path.back() = Substream::DictionaryIndexes; |
57 | callback(path); |
58 | path.pop_back(); |
59 | } |
60 | |
61 | struct KeysSerializationVersion |
62 | { |
63 | enum Value |
64 | { |
65 | /// Version is written at the start of <name.dict.bin>. |
66 | /// Dictionary is written as number N and N keys after them. |
67 | /// Dictionary can be shared for continuous range of granules, so some marks may point to the same position. |
68 | /// Shared dictionary is stored in state and is read once. |
69 | SharedDictionariesWithAdditionalKeys = 1, |
70 | }; |
71 | |
72 | Value value; |
73 | |
74 | static void checkVersion(UInt64 version) |
75 | { |
76 | if (version != SharedDictionariesWithAdditionalKeys) |
77 | throw Exception("Invalid version for DataTypeLowCardinality key column." , ErrorCodes::LOGICAL_ERROR); |
78 | } |
79 | |
80 | KeysSerializationVersion(UInt64 version) : value(static_cast<Value>(version)) { checkVersion(version); } |
81 | }; |
82 | |
83 | /// Version is stored at the start of each granule. It's used to store indexes type and flags. |
84 | struct IndexesSerializationType |
85 | { |
86 | using SerializationType = UInt64; |
87 | /// Need to read dictionary if it wasn't. |
88 | static constexpr SerializationType NeedGlobalDictionaryBit = 1u << 8u; |
89 | /// Need to read additional keys. Additional keys are stored before indexes as value N and N keys after them. |
90 | static constexpr SerializationType HasAdditionalKeysBit = 1u << 9u; |
91 | /// Need to update dictionary. It means that previous granule has different dictionary. |
92 | static constexpr SerializationType NeedUpdateDictionary = 1u << 10u; |
93 | |
94 | enum Type |
95 | { |
96 | TUInt8 = 0, |
97 | TUInt16, |
98 | TUInt32, |
99 | TUInt64, |
100 | }; |
101 | |
102 | Type type; |
103 | bool has_additional_keys; |
104 | bool need_global_dictionary; |
105 | bool need_update_dictionary; |
106 | |
107 | static constexpr SerializationType resetFlags(SerializationType type) |
108 | { |
109 | return type & (~(HasAdditionalKeysBit | NeedGlobalDictionaryBit | NeedUpdateDictionary)); |
110 | } |
111 | |
112 | static void checkType(SerializationType type) |
113 | { |
114 | UInt64 value = resetFlags(type); |
115 | if (value <= TUInt64) |
116 | return; |
117 | |
118 | throw Exception("Invalid type for DataTypeLowCardinality index column." , ErrorCodes::LOGICAL_ERROR); |
119 | } |
120 | |
121 | void serialize(WriteBuffer & buffer) const |
122 | { |
123 | SerializationType val = type; |
124 | if (has_additional_keys) |
125 | val |= HasAdditionalKeysBit; |
126 | if (need_global_dictionary) |
127 | val |= NeedGlobalDictionaryBit; |
128 | if (need_update_dictionary) |
129 | val |= NeedUpdateDictionary; |
130 | writeIntBinary(val, buffer); |
131 | } |
132 | |
133 | void deserialize(ReadBuffer & buffer) |
134 | { |
135 | SerializationType val; |
136 | readIntBinary(val, buffer); |
137 | checkType(val); |
138 | has_additional_keys = (val & HasAdditionalKeysBit) != 0; |
139 | need_global_dictionary = (val & NeedGlobalDictionaryBit) != 0; |
140 | need_update_dictionary = (val & NeedUpdateDictionary) != 0; |
141 | type = static_cast<Type>(resetFlags(val)); |
142 | } |
143 | |
144 | IndexesSerializationType(const IColumn & column, |
145 | bool has_additional_keys_, |
146 | bool need_global_dictionary_, |
147 | bool enumerate_dictionaries) |
148 | : has_additional_keys(has_additional_keys_) |
149 | , need_global_dictionary(need_global_dictionary_) |
150 | , need_update_dictionary(enumerate_dictionaries) |
151 | { |
152 | if (typeid_cast<const ColumnUInt8 *>(&column)) |
153 | type = TUInt8; |
154 | else if (typeid_cast<const ColumnUInt16 *>(&column)) |
155 | type = TUInt16; |
156 | else if (typeid_cast<const ColumnUInt32 *>(&column)) |
157 | type = TUInt32; |
158 | else if (typeid_cast<const ColumnUInt64 *>(&column)) |
159 | type = TUInt64; |
160 | else |
161 | throw Exception("Invalid Indexes column for IndexesSerializationType. Expected ColumnUInt*, got " |
162 | + column.getName(), ErrorCodes::LOGICAL_ERROR); |
163 | } |
164 | |
165 | DataTypePtr getDataType() const |
166 | { |
167 | if (type == TUInt8) |
168 | return std::make_shared<DataTypeUInt8>(); |
169 | if (type == TUInt16) |
170 | return std::make_shared<DataTypeUInt16>(); |
171 | if (type == TUInt32) |
172 | return std::make_shared<DataTypeUInt32>(); |
173 | if (type == TUInt64) |
174 | return std::make_shared<DataTypeUInt64>(); |
175 | |
176 | throw Exception("Can't create DataType from IndexesSerializationType." , ErrorCodes::LOGICAL_ERROR); |
177 | } |
178 | |
179 | IndexesSerializationType() = default; |
180 | }; |
181 | |
182 | struct SerializeStateLowCardinality : public IDataType::SerializeBinaryBulkState |
183 | { |
184 | KeysSerializationVersion key_version; |
185 | MutableColumnUniquePtr shared_dictionary; |
186 | |
187 | explicit SerializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} |
188 | }; |
189 | |
190 | struct DeserializeStateLowCardinality : public IDataType::DeserializeBinaryBulkState |
191 | { |
192 | KeysSerializationVersion key_version; |
193 | ColumnUniquePtr global_dictionary; |
194 | |
195 | IndexesSerializationType index_type; |
196 | ColumnPtr additional_keys; |
197 | ColumnPtr null_map; |
198 | UInt64 num_pending_rows = 0; |
199 | |
200 | /// If dictionary should be updated. |
201 | /// Can happen is some granules was skipped while reading from MergeTree. |
202 | /// We should store this flag in State because |
203 | /// in case of long block of empty arrays we may not need read dictionary at first reading. |
204 | bool need_update_dictionary = false; |
205 | |
206 | explicit DeserializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} |
207 | }; |
208 | |
209 | static SerializeStateLowCardinality * checkAndGetLowCardinalitySerializeState( |
210 | IDataType::SerializeBinaryBulkStatePtr & state) |
211 | { |
212 | if (!state) |
213 | throw Exception("Got empty state for DataTypeLowCardinality." , ErrorCodes::LOGICAL_ERROR); |
214 | |
215 | auto * low_cardinality_state = typeid_cast<SerializeStateLowCardinality *>(state.get()); |
216 | if (!low_cardinality_state) |
217 | { |
218 | auto & state_ref = *state; |
219 | throw Exception("Invalid SerializeBinaryBulkState for DataTypeLowCardinality. Expected: " |
220 | + demangle(typeid(SerializeStateLowCardinality).name()) + ", got " |
221 | + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); |
222 | } |
223 | |
224 | return low_cardinality_state; |
225 | } |
226 | |
227 | static DeserializeStateLowCardinality * checkAndGetLowCardinalityDeserializeState( |
228 | IDataType::DeserializeBinaryBulkStatePtr & state) |
229 | { |
230 | if (!state) |
231 | throw Exception("Got empty state for DataTypeLowCardinality." , ErrorCodes::LOGICAL_ERROR); |
232 | |
233 | auto * low_cardinality_state = typeid_cast<DeserializeStateLowCardinality *>(state.get()); |
234 | if (!low_cardinality_state) |
235 | { |
236 | auto & state_ref = *state; |
237 | throw Exception("Invalid DeserializeBinaryBulkState for DataTypeLowCardinality. Expected: " |
238 | + demangle(typeid(DeserializeStateLowCardinality).name()) + ", got " |
239 | + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); |
240 | } |
241 | |
242 | return low_cardinality_state; |
243 | } |
244 | |
245 | void DataTypeLowCardinality::serializeBinaryBulkStatePrefix( |
246 | SerializeBinaryBulkSettings & settings, |
247 | SerializeBinaryBulkStatePtr & state) const |
248 | { |
249 | settings.path.push_back(Substream::DictionaryKeys); |
250 | auto * stream = settings.getter(settings.path); |
251 | settings.path.pop_back(); |
252 | |
253 | if (!stream) |
254 | throw Exception("Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStatePrefix" , |
255 | ErrorCodes::LOGICAL_ERROR); |
256 | |
257 | /// Write version and create SerializeBinaryBulkState. |
258 | UInt64 key_version = KeysSerializationVersion::SharedDictionariesWithAdditionalKeys; |
259 | |
260 | writeIntBinary(key_version, *stream); |
261 | |
262 | state = std::make_shared<SerializeStateLowCardinality>(key_version); |
263 | } |
264 | |
265 | void DataTypeLowCardinality::serializeBinaryBulkStateSuffix( |
266 | SerializeBinaryBulkSettings & settings, |
267 | SerializeBinaryBulkStatePtr & state) const |
268 | { |
269 | auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); |
270 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); |
271 | |
272 | if (low_cardinality_state->shared_dictionary && settings.low_cardinality_max_dictionary_size) |
273 | { |
274 | auto nested_column = low_cardinality_state->shared_dictionary->getNestedNotNullableColumn(); |
275 | |
276 | settings.path.push_back(Substream::DictionaryKeys); |
277 | auto * stream = settings.getter(settings.path); |
278 | settings.path.pop_back(); |
279 | |
280 | if (!stream) |
281 | throw Exception("Got empty stream in DataTypeLowCardinality::serializeBinaryBulkStateSuffix" , |
282 | ErrorCodes::LOGICAL_ERROR); |
283 | |
284 | UInt64 num_keys = nested_column->size(); |
285 | writeIntBinary(num_keys, *stream); |
286 | removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *stream, 0, num_keys); |
287 | low_cardinality_state->shared_dictionary = nullptr; |
288 | } |
289 | } |
290 | |
291 | void DataTypeLowCardinality::deserializeBinaryBulkStatePrefix( |
292 | DeserializeBinaryBulkSettings & settings, |
293 | DeserializeBinaryBulkStatePtr & state) const |
294 | { |
295 | settings.path.push_back(Substream::DictionaryKeys); |
296 | auto * stream = settings.getter(settings.path); |
297 | settings.path.pop_back(); |
298 | |
299 | if (!stream) |
300 | return; |
301 | |
302 | UInt64 keys_version; |
303 | readIntBinary(keys_version, *stream); |
304 | |
305 | state = std::make_shared<DeserializeStateLowCardinality>(keys_version); |
306 | } |
307 | |
308 | namespace |
309 | { |
310 | template <typename T> |
311 | PaddedPODArray<T> * getIndexesData(IColumn & indexes) |
312 | { |
313 | auto * column = typeid_cast<ColumnVector<T> *>(&indexes); |
314 | if (column) |
315 | return &column->getData(); |
316 | |
317 | return nullptr; |
318 | } |
319 | |
320 | struct IndexMapsWithAdditionalKeys |
321 | { |
322 | MutableColumnPtr dictionary_map; |
323 | MutableColumnPtr additional_keys_map; |
324 | }; |
325 | |
326 | template <typename T> |
327 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeysRef(PaddedPODArray<T> & index, size_t dict_size) |
328 | { |
329 | PaddedPODArray<T> copy(index.cbegin(), index.cend()); |
330 | |
331 | HashMap<T, T> dict_map; |
332 | HashMap<T, T> add_keys_map; |
333 | |
334 | for (auto val : index) |
335 | { |
336 | if (val < dict_size) |
337 | dict_map.insert({val, dict_map.size()}); |
338 | else |
339 | add_keys_map.insert({val, add_keys_map.size()}); |
340 | } |
341 | |
342 | auto dictionary_map = ColumnVector<T>::create(dict_map.size()); |
343 | auto additional_keys_map = ColumnVector<T>::create(add_keys_map.size()); |
344 | auto & dict_data = dictionary_map->getData(); |
345 | auto & add_keys_data = additional_keys_map->getData(); |
346 | |
347 | for (auto val : dict_map) |
348 | dict_data[val.second] = val.first; |
349 | |
350 | for (auto val : add_keys_map) |
351 | add_keys_data[val.second] = val.first - dict_size; |
352 | |
353 | for (auto & val : index) |
354 | val = val < dict_size ? dict_map[val] |
355 | : add_keys_map[val] + dict_map.size(); |
356 | |
357 | for (size_t i = 0; i < index.size(); ++i) |
358 | { |
359 | T expected = index[i] < dict_data.size() ? dict_data[index[i]] |
360 | : add_keys_data[index[i] - dict_data.size()] + dict_size; |
361 | if (expected != copy[i]) |
362 | throw Exception("Expected " + toString(expected) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR); |
363 | |
364 | } |
365 | |
366 | return {std::move(dictionary_map), std::move(additional_keys_map)}; |
367 | } |
368 | |
369 | template <typename T> |
370 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(PaddedPODArray<T> & index, size_t dict_size) |
371 | { |
372 | T max_less_dict_size = 0; |
373 | T max_value = 0; |
374 | |
375 | auto size = index.size(); |
376 | if (size == 0) |
377 | return {ColumnVector<T>::create(), ColumnVector<T>::create()}; |
378 | |
379 | for (size_t i = 0; i < size; ++i) |
380 | { |
381 | auto val = index[i]; |
382 | if (val < dict_size) |
383 | max_less_dict_size = std::max(max_less_dict_size, val); |
384 | |
385 | max_value = std::max(max_value, val); |
386 | } |
387 | |
388 | auto map_size = UInt64(max_less_dict_size) + 1; |
389 | auto overflow_map_size = max_value >= dict_size ? (UInt64(max_value - dict_size) + 1) : 0; |
390 | PaddedPODArray<T> map(map_size, 0); |
391 | PaddedPODArray<T> overflow_map(overflow_map_size, 0); |
392 | |
393 | T zero_pos_value = 0; |
394 | T zero_pos_overflowed_value = 0; |
395 | UInt64 cur_pos = 0; |
396 | UInt64 cur_overflowed_pos = 0; |
397 | |
398 | for (size_t i = 0; i < size; ++i) |
399 | { |
400 | T val = index[i]; |
401 | if (val < dict_size) |
402 | { |
403 | if (cur_pos == 0) |
404 | { |
405 | zero_pos_value = val; |
406 | ++cur_pos; |
407 | } |
408 | else if (map[val] == 0 && val != zero_pos_value) |
409 | { |
410 | map[val] = cur_pos; |
411 | ++cur_pos; |
412 | } |
413 | } |
414 | else |
415 | { |
416 | T shifted_val = val - dict_size; |
417 | if (cur_overflowed_pos == 0) |
418 | { |
419 | zero_pos_overflowed_value = shifted_val; |
420 | ++cur_overflowed_pos; |
421 | } |
422 | else if (overflow_map[shifted_val] == 0 && shifted_val != zero_pos_overflowed_value) |
423 | { |
424 | overflow_map[shifted_val] = cur_overflowed_pos; |
425 | ++cur_overflowed_pos; |
426 | } |
427 | } |
428 | } |
429 | |
430 | auto dictionary_map = ColumnVector<T>::create(cur_pos); |
431 | auto additional_keys_map = ColumnVector<T>::create(cur_overflowed_pos); |
432 | auto & dict_data = dictionary_map->getData(); |
433 | auto & add_keys_data = additional_keys_map->getData(); |
434 | |
435 | for (size_t i = 0; i < map_size; ++i) |
436 | if (map[i]) |
437 | dict_data[map[i]] = static_cast<T>(i); |
438 | |
439 | for (size_t i = 0; i < overflow_map_size; ++i) |
440 | if (overflow_map[i]) |
441 | add_keys_data[overflow_map[i]] = static_cast<T>(i); |
442 | |
443 | if (cur_pos) |
444 | dict_data[0] = zero_pos_value; |
445 | if (cur_overflowed_pos) |
446 | add_keys_data[0] = zero_pos_overflowed_value; |
447 | |
448 | for (size_t i = 0; i < size; ++i) |
449 | { |
450 | T & val = index[i]; |
451 | if (val < dict_size) |
452 | val = map[val]; |
453 | else |
454 | val = overflow_map[val - dict_size] + cur_pos; |
455 | } |
456 | |
457 | return {std::move(dictionary_map), std::move(additional_keys_map)}; |
458 | } |
459 | |
460 | /// Update column and return map with old indexes. |
461 | /// Let N is the number of distinct values which are less than max_size; |
462 | /// old_column - column before function call; |
463 | /// new_column - column after function call: |
464 | /// * if old_column[i] < max_size, than |
465 | /// dictionary_map[new_column[i]] = old_column[i] |
466 | /// * else |
467 | /// additional_keys_map[new_column[i]] = old_column[i] - dict_size + N |
468 | IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(IColumn & column, size_t dict_size) |
469 | { |
470 | if (auto * data_uint8 = getIndexesData<UInt8>(column)) |
471 | return mapIndexWithAdditionalKeys(*data_uint8, dict_size); |
472 | else if (auto * data_uint16 = getIndexesData<UInt16>(column)) |
473 | return mapIndexWithAdditionalKeys(*data_uint16, dict_size); |
474 | else if (auto * data_uint32 = getIndexesData<UInt32>(column)) |
475 | return mapIndexWithAdditionalKeys(*data_uint32, dict_size); |
476 | else if (auto * data_uint64 = getIndexesData<UInt64>(column)) |
477 | return mapIndexWithAdditionalKeys(*data_uint64, dict_size); |
478 | else |
479 | throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got" + column.getName(), |
480 | ErrorCodes::LOGICAL_ERROR); |
481 | } |
482 | } |
483 | |
484 | void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams( |
485 | const IColumn & column, |
486 | size_t offset, |
487 | size_t limit, |
488 | SerializeBinaryBulkSettings & settings, |
489 | SerializeBinaryBulkStatePtr & state) const |
490 | { |
491 | settings.path.push_back(Substream::DictionaryKeys); |
492 | auto * keys_stream = settings.getter(settings.path); |
493 | settings.path.back() = Substream::DictionaryIndexes; |
494 | auto * indexes_stream = settings.getter(settings.path); |
495 | settings.path.pop_back(); |
496 | |
497 | if (!keys_stream && !indexes_stream) |
498 | return; |
499 | |
500 | if (!keys_stream) |
501 | throw Exception("Got empty stream for DataTypeLowCardinality keys." , ErrorCodes::LOGICAL_ERROR); |
502 | |
503 | if (!indexes_stream) |
504 | throw Exception("Got empty stream for DataTypeLowCardinality indexes." , ErrorCodes::LOGICAL_ERROR); |
505 | |
506 | const ColumnLowCardinality & low_cardinality_column = typeid_cast<const ColumnLowCardinality &>(column); |
507 | |
508 | auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); |
509 | auto & global_dictionary = low_cardinality_state->shared_dictionary; |
510 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); |
511 | |
512 | bool need_update_dictionary = global_dictionary == nullptr; |
513 | if (need_update_dictionary) |
514 | global_dictionary = createColumnUnique(*dictionary_type); |
515 | |
516 | size_t max_limit = column.size() - offset; |
517 | limit = limit ? std::min(limit, max_limit) : max_limit; |
518 | |
519 | /// Do not write anything for empty column. (May happen while writing empty arrays.) |
520 | if (limit == 0) |
521 | return; |
522 | |
523 | auto sub_column = low_cardinality_column.cutAndCompact(offset, limit); |
524 | ColumnPtr positions = sub_column->getIndexesPtr(); |
525 | ColumnPtr keys = sub_column->getDictionary().getNestedColumn(); |
526 | |
527 | if (settings.low_cardinality_max_dictionary_size) |
528 | { |
529 | /// Insert used_keys into global dictionary and update sub_index. |
530 | auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*keys, 0, keys->size(), |
531 | settings.low_cardinality_max_dictionary_size); |
532 | size_t max_size = settings.low_cardinality_max_dictionary_size + indexes_with_overflow.overflowed_keys->size(); |
533 | ColumnLowCardinality::Index(indexes_with_overflow.indexes->getPtr()).check(max_size); |
534 | |
535 | if (global_dictionary->size() > settings.low_cardinality_max_dictionary_size) |
536 | throw Exception("Got dictionary with size " + toString(global_dictionary->size()) + |
537 | " but max dictionary size is " + toString(settings.low_cardinality_max_dictionary_size), |
538 | ErrorCodes::LOGICAL_ERROR); |
539 | |
540 | positions = indexes_with_overflow.indexes->index(*positions, 0); |
541 | keys = std::move(indexes_with_overflow.overflowed_keys); |
542 | |
543 | if (global_dictionary->size() < settings.low_cardinality_max_dictionary_size && !keys->empty()) |
544 | throw Exception("Has additional keys, but dict size is " + toString(global_dictionary->size()) + |
545 | " which is less then max dictionary size (" + toString(settings.low_cardinality_max_dictionary_size) + ")" , |
546 | ErrorCodes::LOGICAL_ERROR); |
547 | } |
548 | |
549 | if (auto * nullable_keys = checkAndGetColumn<ColumnNullable>(*keys)) |
550 | keys = nullable_keys->getNestedColumnPtr(); |
551 | |
552 | bool need_additional_keys = !keys->empty(); |
553 | bool need_dictionary = settings.low_cardinality_max_dictionary_size != 0; |
554 | bool need_write_dictionary = !settings.low_cardinality_use_single_dictionary_for_part |
555 | && global_dictionary->size() >= settings.low_cardinality_max_dictionary_size; |
556 | |
557 | IndexesSerializationType index_version(*positions, need_additional_keys, need_dictionary, need_update_dictionary); |
558 | index_version.serialize(*indexes_stream); |
559 | |
560 | if (need_write_dictionary) |
561 | { |
562 | const auto & nested_column = global_dictionary->getNestedNotNullableColumn(); |
563 | UInt64 num_keys = nested_column->size(); |
564 | writeIntBinary(num_keys, *keys_stream); |
565 | removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *keys_stream, 0, num_keys); |
566 | low_cardinality_state->shared_dictionary = nullptr; |
567 | } |
568 | |
569 | if (need_additional_keys) |
570 | { |
571 | UInt64 num_keys = keys->size(); |
572 | writeIntBinary(num_keys, *indexes_stream); |
573 | removeNullable(dictionary_type)->serializeBinaryBulk(*keys, *indexes_stream, 0, num_keys); |
574 | } |
575 | |
576 | UInt64 num_rows = positions->size(); |
577 | writeIntBinary(num_rows, *indexes_stream); |
578 | index_version.getDataType()->serializeBinaryBulk(*positions, *indexes_stream, 0, num_rows); |
579 | } |
580 | |
581 | void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams( |
582 | IColumn & column, |
583 | size_t limit, |
584 | DeserializeBinaryBulkSettings & settings, |
585 | DeserializeBinaryBulkStatePtr & state) const |
586 | { |
587 | ColumnLowCardinality & low_cardinality_column = typeid_cast<ColumnLowCardinality &>(column); |
588 | |
589 | settings.path.push_back(Substream::DictionaryKeys); |
590 | auto * keys_stream = settings.getter(settings.path); |
591 | settings.path.back() = Substream::DictionaryIndexes; |
592 | auto * indexes_stream = settings.getter(settings.path); |
593 | settings.path.pop_back(); |
594 | |
595 | if (!keys_stream && !indexes_stream) |
596 | return; |
597 | |
598 | if (!keys_stream) |
599 | throw Exception("Got empty stream for DataTypeLowCardinality keys." , ErrorCodes::LOGICAL_ERROR); |
600 | |
601 | if (!indexes_stream) |
602 | throw Exception("Got empty stream for DataTypeLowCardinality indexes." , ErrorCodes::LOGICAL_ERROR); |
603 | |
604 | auto * low_cardinality_state = checkAndGetLowCardinalityDeserializeState(state); |
605 | KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); |
606 | |
607 | auto readDictionary = [this, low_cardinality_state, keys_stream]() |
608 | { |
609 | UInt64 num_keys; |
610 | readIntBinary(num_keys, *keys_stream); |
611 | |
612 | auto keys_type = removeNullable(dictionary_type); |
613 | auto global_dict_keys = keys_type->createColumn(); |
614 | keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0); |
615 | |
616 | auto column_unique = createColumnUnique(*dictionary_type, std::move(global_dict_keys)); |
617 | low_cardinality_state->global_dictionary = std::move(column_unique); |
618 | }; |
619 | |
620 | auto readAdditionalKeys = [this, low_cardinality_state, indexes_stream]() |
621 | { |
622 | UInt64 num_keys; |
623 | readIntBinary(num_keys, *indexes_stream); |
624 | auto keys_type = removeNullable(dictionary_type); |
625 | auto additional_keys = keys_type->createColumn(); |
626 | keys_type->deserializeBinaryBulk(*additional_keys, *indexes_stream, num_keys, 0); |
627 | low_cardinality_state->additional_keys = std::move(additional_keys); |
628 | |
629 | if (!low_cardinality_state->index_type.need_global_dictionary && dictionary_type->isNullable()) |
630 | { |
631 | auto null_map = ColumnUInt8::create(num_keys, 0); |
632 | if (num_keys) |
633 | null_map->getElement(0) = 1; |
634 | |
635 | low_cardinality_state->null_map = std::move(null_map); |
636 | } |
637 | }; |
638 | |
639 | auto readIndexes = [this, low_cardinality_state, indexes_stream, &low_cardinality_column](UInt64 num_rows) |
640 | { |
641 | auto indexes_type = low_cardinality_state->index_type.getDataType(); |
642 | MutableColumnPtr indexes_column = indexes_type->createColumn(); |
643 | indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0); |
644 | |
645 | auto & global_dictionary = low_cardinality_state->global_dictionary; |
646 | const auto & additional_keys = low_cardinality_state->additional_keys; |
647 | |
648 | bool has_additional_keys = low_cardinality_state->index_type.has_additional_keys; |
649 | bool column_is_empty = low_cardinality_column.empty(); |
650 | |
651 | if (!low_cardinality_state->index_type.need_global_dictionary) |
652 | { |
653 | ColumnPtr keys_column = additional_keys; |
654 | if (low_cardinality_state->null_map) |
655 | keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map); |
656 | low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*keys_column, *indexes_column); |
657 | } |
658 | else if (!has_additional_keys) |
659 | { |
660 | if (column_is_empty) |
661 | low_cardinality_column.setSharedDictionary(global_dictionary); |
662 | |
663 | auto local_column = ColumnLowCardinality::create(global_dictionary, std::move(indexes_column)); |
664 | low_cardinality_column.insertRangeFrom(*local_column, 0, num_rows); |
665 | } |
666 | else |
667 | { |
668 | auto maps = mapIndexWithAdditionalKeys(*indexes_column, global_dictionary->size()); |
669 | |
670 | ColumnLowCardinality::Index(maps.additional_keys_map->getPtr()).check(additional_keys->size()); |
671 | |
672 | ColumnLowCardinality::Index(indexes_column->getPtr()).check( |
673 | maps.dictionary_map->size() + maps.additional_keys_map->size()); |
674 | |
675 | auto used_keys = (*std::move(global_dictionary->getNestedColumn()->index(*maps.dictionary_map, 0))).mutate(); |
676 | |
677 | if (!maps.additional_keys_map->empty()) |
678 | { |
679 | auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0); |
680 | |
681 | if (dictionary_type->isNullable()) |
682 | { |
683 | ColumnPtr null_map = ColumnUInt8::create(used_add_keys->size(), 0); |
684 | used_add_keys = ColumnNullable::create(used_add_keys, null_map); |
685 | } |
686 | |
687 | used_keys->insertRangeFrom(*used_add_keys, 0, used_add_keys->size()); |
688 | } |
689 | |
690 | low_cardinality_column.insertRangeFromDictionaryEncodedColumn(*used_keys, *indexes_column); |
691 | } |
692 | }; |
693 | |
694 | if (!settings.continuous_reading) |
695 | { |
696 | low_cardinality_state->num_pending_rows = 0; |
697 | |
698 | /// Remember in state that some granules were skipped and we need to update dictionary. |
699 | low_cardinality_state->need_update_dictionary = true; |
700 | } |
701 | |
702 | while (limit) |
703 | { |
704 | if (low_cardinality_state->num_pending_rows == 0) |
705 | { |
706 | if (indexes_stream->eof()) |
707 | break; |
708 | |
709 | auto & index_type = low_cardinality_state->index_type; |
710 | auto & global_dictionary = low_cardinality_state->global_dictionary; |
711 | |
712 | index_type.deserialize(*indexes_stream); |
713 | |
714 | bool need_update_dictionary = |
715 | !global_dictionary || index_type.need_update_dictionary || low_cardinality_state->need_update_dictionary; |
716 | if (index_type.need_global_dictionary && need_update_dictionary) |
717 | { |
718 | readDictionary(); |
719 | low_cardinality_state->need_update_dictionary = false; |
720 | } |
721 | |
722 | if (low_cardinality_state->index_type.has_additional_keys) |
723 | readAdditionalKeys(); |
724 | else |
725 | low_cardinality_state->additional_keys = nullptr; |
726 | |
727 | readIntBinary(low_cardinality_state->num_pending_rows, *indexes_stream); |
728 | } |
729 | |
730 | size_t num_rows_to_read = std::min<UInt64>(limit, low_cardinality_state->num_pending_rows); |
731 | readIndexes(num_rows_to_read); |
732 | limit -= num_rows_to_read; |
733 | low_cardinality_state->num_pending_rows -= num_rows_to_read; |
734 | } |
735 | } |
736 | |
737 | void DataTypeLowCardinality::serializeBinary(const Field & field, WriteBuffer & ostr) const |
738 | { |
739 | dictionary_type->serializeBinary(field, ostr); |
740 | } |
741 | void DataTypeLowCardinality::deserializeBinary(Field & field, ReadBuffer & istr) const |
742 | { |
743 | dictionary_type->deserializeBinary(field, istr); |
744 | } |
745 | |
746 | void DataTypeLowCardinality::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const |
747 | { |
748 | serializeImpl(column, row_num, &IDataType::serializeBinary, ostr); |
749 | } |
750 | void DataTypeLowCardinality::deserializeBinary(IColumn & column, ReadBuffer & istr) const |
751 | { |
752 | deserializeImpl(column, &IDataType::deserializeBinary, istr); |
753 | } |
754 | |
755 | void DataTypeLowCardinality::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
756 | { |
757 | serializeImpl(column, row_num, &IDataType::serializeAsTextEscaped, ostr, settings); |
758 | } |
759 | |
760 | void DataTypeLowCardinality::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
761 | { |
762 | deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings); |
763 | } |
764 | |
765 | void DataTypeLowCardinality::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
766 | { |
767 | serializeImpl(column, row_num, &IDataType::serializeAsTextQuoted, ostr, settings); |
768 | } |
769 | |
770 | void DataTypeLowCardinality::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
771 | { |
772 | deserializeImpl(column, &IDataType::deserializeAsTextQuoted, istr, settings); |
773 | } |
774 | |
775 | void DataTypeLowCardinality::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
776 | { |
777 | deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings); |
778 | } |
779 | |
780 | void DataTypeLowCardinality::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
781 | { |
782 | serializeImpl(column, row_num, &IDataType::serializeAsTextCSV, ostr, settings); |
783 | } |
784 | |
785 | void DataTypeLowCardinality::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
786 | { |
787 | deserializeImpl(column, &IDataType::deserializeAsTextCSV, istr, settings); |
788 | } |
789 | |
790 | void DataTypeLowCardinality::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
791 | { |
792 | serializeImpl(column, row_num, &IDataType::serializeAsText, ostr, settings); |
793 | } |
794 | |
795 | void DataTypeLowCardinality::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
796 | { |
797 | serializeImpl(column, row_num, &IDataType::serializeAsTextJSON, ostr, settings); |
798 | } |
799 | void DataTypeLowCardinality::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
800 | { |
801 | deserializeImpl(column, &IDataType::deserializeAsTextJSON, istr, settings); |
802 | } |
803 | |
804 | void DataTypeLowCardinality::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
805 | { |
806 | serializeImpl(column, row_num, &IDataType::serializeAsTextXML, ostr, settings); |
807 | } |
808 | |
809 | void DataTypeLowCardinality::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const |
810 | { |
811 | serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf, value_index); |
812 | } |
813 | |
814 | void DataTypeLowCardinality::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const |
815 | { |
816 | if (allow_add_row) |
817 | { |
818 | deserializeImpl(column, &IDataType::deserializeProtobuf, protobuf, true, row_added); |
819 | return; |
820 | } |
821 | |
822 | row_added = false; |
823 | auto & low_cardinality_column= getColumnLowCardinality(column); |
824 | auto nested_column = low_cardinality_column.getDictionary().getNestedColumn(); |
825 | auto temp_column = nested_column->cloneEmpty(); |
826 | size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(low_cardinality_column.size() - 1); |
827 | temp_column->insertFrom(*nested_column, unique_row_number); |
828 | bool dummy; |
829 | dictionary_type.get()->deserializeProtobuf(*temp_column, protobuf, false, dummy); |
830 | low_cardinality_column.popBack(1); |
831 | low_cardinality_column.insertFromFullColumn(*temp_column, 0); |
832 | } |
833 | |
834 | template <typename... Params, typename... Args> |
835 | void DataTypeLowCardinality::serializeImpl( |
836 | const IColumn & column, size_t row_num, DataTypeLowCardinality::SerializeFunctionPtr<Params...> func, Args &&... args) const |
837 | { |
838 | auto & low_cardinality_column = getColumnLowCardinality(column); |
839 | size_t unique_row_number = low_cardinality_column.getIndexes().getUInt(row_num); |
840 | (dictionary_type.get()->*func)(*low_cardinality_column.getDictionary().getNestedColumn(), unique_row_number, std::forward<Args>(args)...); |
841 | } |
842 | |
843 | template <typename... Params, typename... Args> |
844 | void DataTypeLowCardinality::deserializeImpl( |
845 | IColumn & column, DataTypeLowCardinality::DeserializeFunctionPtr<Params...> func, Args &&... args) const |
846 | { |
847 | auto & low_cardinality_column= getColumnLowCardinality(column); |
848 | auto temp_column = low_cardinality_column.getDictionary().getNestedColumn()->cloneEmpty(); |
849 | |
850 | (dictionary_type.get()->*func)(*temp_column, std::forward<Args>(args)...); |
851 | |
852 | low_cardinality_column.insertFromFullColumn(*temp_column, 0); |
853 | } |
854 | |
855 | namespace |
856 | { |
857 | template <typename Creator> |
858 | struct CreateColumnVector |
859 | { |
860 | MutableColumnUniquePtr & column; |
861 | const IDataType & keys_type; |
862 | const Creator & creator; |
863 | |
864 | CreateColumnVector(MutableColumnUniquePtr & column_, const IDataType & keys_type_, const Creator & creator_) |
865 | : column(column_), keys_type(keys_type_), creator(creator_) |
866 | { |
867 | } |
868 | |
869 | template <typename T, size_t> |
870 | void operator()() |
871 | { |
872 | if (typeid_cast<const DataTypeNumber<T> *>(&keys_type)) |
873 | column = creator(static_cast<ColumnVector<T> *>(nullptr)); |
874 | } |
875 | }; |
876 | } |
877 | |
878 | template <typename Creator> |
879 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDataType & keys_type, |
880 | const Creator & creator) |
881 | { |
882 | auto * type = &keys_type; |
883 | if (auto * nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type)) |
884 | type = nullable_type->getNestedType().get(); |
885 | |
886 | if (isString(type)) |
887 | return creator(static_cast<ColumnString *>(nullptr)); |
888 | if (isFixedString(type)) |
889 | return creator(static_cast<ColumnFixedString *>(nullptr)); |
890 | if (typeid_cast<const DataTypeDate *>(type)) |
891 | return creator(static_cast<ColumnVector<UInt16> *>(nullptr)); |
892 | if (typeid_cast<const DataTypeDateTime *>(type)) |
893 | return creator(static_cast<ColumnVector<UInt32> *>(nullptr)); |
894 | if (isColumnedAsNumber(type)) |
895 | { |
896 | MutableColumnUniquePtr column; |
897 | TypeListNativeNumbers::forEach(CreateColumnVector(column, *type, creator)); |
898 | |
899 | if (!column) |
900 | throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR); |
901 | |
902 | return column; |
903 | } |
904 | |
905 | throw Exception("Unexpected dictionary type for DataTypeLowCardinality: " + type->getName(), |
906 | ErrorCodes::LOGICAL_ERROR); |
907 | } |
908 | |
909 | |
910 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type) |
911 | { |
912 | auto creator = [&](auto x) |
913 | { |
914 | using ColumnType = typename std::remove_pointer<decltype(x)>::type; |
915 | return ColumnUnique<ColumnType>::create(keys_type); |
916 | }; |
917 | return createColumnUniqueImpl(keys_type, creator); |
918 | } |
919 | |
920 | MutableColumnUniquePtr DataTypeLowCardinality::createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys) |
921 | { |
922 | auto creator = [&](auto x) |
923 | { |
924 | using ColumnType = typename std::remove_pointer<decltype(x)>::type; |
925 | return ColumnUnique<ColumnType>::create(std::move(keys), keys_type.isNullable()); |
926 | }; |
927 | return createColumnUniqueImpl(keys_type, creator); |
928 | } |
929 | |
930 | MutableColumnPtr DataTypeLowCardinality::createColumn() const |
931 | { |
932 | MutableColumnPtr indexes = DataTypeUInt8().createColumn(); |
933 | MutableColumnPtr dictionary = createColumnUnique(*dictionary_type); |
934 | return ColumnLowCardinality::create(std::move(dictionary), std::move(indexes)); |
935 | } |
936 | |
937 | Field DataTypeLowCardinality::getDefault() const |
938 | { |
939 | return dictionary_type->getDefault(); |
940 | } |
941 | |
942 | bool DataTypeLowCardinality::equals(const IDataType & rhs) const |
943 | { |
944 | if (typeid(rhs) != typeid(*this)) |
945 | return false; |
946 | |
947 | auto & low_cardinality_rhs= static_cast<const DataTypeLowCardinality &>(rhs); |
948 | return dictionary_type->equals(*low_cardinality_rhs.dictionary_type); |
949 | } |
950 | |
951 | |
952 | static DataTypePtr create(const String & /*type_name*/, const ASTPtr & arguments) |
953 | { |
954 | if (!arguments || arguments->children.size() != 1) |
955 | throw Exception("LowCardinality data type family must have single argument - type of elements" , |
956 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
957 | |
958 | return std::make_shared<DataTypeLowCardinality>(DataTypeFactory::instance().get(arguments->children[0])); |
959 | } |
960 | |
961 | void registerDataTypeLowCardinality(DataTypeFactory & factory) |
962 | { |
963 | factory.registerDataType("LowCardinality" , create); |
964 | } |
965 | |
966 | |
967 | DataTypePtr removeLowCardinality(const DataTypePtr & type) |
968 | { |
969 | if (auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get())) |
970 | return low_cardinality_type->getDictionaryType(); |
971 | return type; |
972 | } |
973 | |
974 | } |
975 | |