1#include "duckdb/common/bitpacking.hpp"
2#include "duckdb/common/operator/comparison_operators.hpp"
3#include "duckdb/common/string_map_set.hpp"
4#include "duckdb/common/types/vector_buffer.hpp"
5#include "duckdb/function/compression/compression.hpp"
6#include "duckdb/function/compression_function.hpp"
7#include "duckdb/main/config.hpp"
8#include "duckdb/storage/segment/uncompressed.hpp"
9#include "duckdb/storage/string_uncompressed.hpp"
10#include "duckdb/storage/table/append_state.hpp"
11#include "duckdb/storage/table/column_data_checkpointer.hpp"
12
13namespace duckdb {
14
15// Abstract class for keeping compression state either for compression or size analysis
16class DictionaryCompressionState : public CompressionState {
17public:
18 bool UpdateState(Vector &scan_vector, idx_t count) {
19 UnifiedVectorFormat vdata;
20 scan_vector.ToUnifiedFormat(count, data&: vdata);
21 auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata);
22 Verify();
23
24 for (idx_t i = 0; i < count; i++) {
25 auto idx = vdata.sel->get_index(idx: i);
26 size_t string_size = 0;
27 bool new_string = false;
28 auto row_is_valid = vdata.validity.RowIsValid(row_idx: idx);
29
30 if (row_is_valid) {
31 string_size = data[idx].GetSize();
32 if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) {
33 // Big strings not implemented for dictionary compression
34 return false;
35 }
36 new_string = !LookupString(str: data[idx]);
37 }
38
39 bool fits = CalculateSpaceRequirements(new_string, string_size);
40 if (!fits) {
41 Flush();
42 new_string = true;
43
44 fits = CalculateSpaceRequirements(new_string, string_size);
45 if (!fits) {
46 throw InternalException("Dictionary compression could not write to new segment");
47 }
48 }
49
50 if (!row_is_valid) {
51 AddNull();
52 } else if (new_string) {
53 AddNewString(str: data[idx]);
54 } else {
55 AddLastLookup();
56 }
57
58 Verify();
59 }
60
61 return true;
62 }
63
64protected:
65 // Should verify the State
66 virtual void Verify() = 0;
67 // Performs a lookup of str, storing the result internally
68 virtual bool LookupString(string_t str) = 0;
69 // Add the most recently looked up str to compression state
70 virtual void AddLastLookup() = 0;
71 // Add string to the state that is known to not be seen yet
72 virtual void AddNewString(string_t str) = 0;
73 // Add a null value to the compression state
74 virtual void AddNull() = 0;
75 // Needs to be called before adding a value. Will return false if a flush is required first.
76 virtual bool CalculateSpaceRequirements(bool new_string, size_t string_size) = 0;
77 // Flush the segment to disk if compressing or reset the counters if analyzing
78 virtual void Flush(bool final = false) = 0;
79};
80
81typedef struct {
82 uint32_t dict_size;
83 uint32_t dict_end;
84 uint32_t index_buffer_offset;
85 uint32_t index_buffer_count;
86 uint32_t bitpacking_width;
87} dictionary_compression_header_t;
88
89struct DictionaryCompressionStorage {
90 static constexpr float MINIMUM_COMPRESSION_RATIO = 1.2;
91 static constexpr uint16_t DICTIONARY_HEADER_SIZE = sizeof(dictionary_compression_header_t);
92 static constexpr size_t COMPACTION_FLUSH_LIMIT = (size_t)Storage::BLOCK_SIZE / 5 * 4;
93
94 static unique_ptr<AnalyzeState> StringInitAnalyze(ColumnData &col_data, PhysicalType type);
95 static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count);
96 static idx_t StringFinalAnalyze(AnalyzeState &state_p);
97
98 static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer,
99 unique_ptr<AnalyzeState> state);
100 static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count);
101 static void FinalizeCompress(CompressionState &state_p);
102
103 static unique_ptr<SegmentScanState> StringInitScan(ColumnSegment &segment);
104 template <bool ALLOW_DICT_VECTORS>
105 static void StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result,
106 idx_t result_offset);
107 static void StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result);
108 static void StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result,
109 idx_t result_idx);
110
111 static bool HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size,
112 bitpacking_width_t packing_width);
113 static idx_t RequiredSpace(idx_t current_count, idx_t index_count, idx_t dict_size,
114 bitpacking_width_t packing_width);
115
116 static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle);
117 static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container);
118 static string_t FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, data_ptr_t baseptr,
119 int32_t dict_offset, uint16_t string_len);
120 static uint16_t GetStringLength(uint32_t *index_buffer_ptr, sel_t index);
121};
122
123// Dictionary compression uses a combination of bitpacking and a dictionary to compress string segments. The data is
124// stored across three buffers: the index buffer, the selection buffer and the dictionary. Firstly the Index buffer
125// contains the offsets into the dictionary which are also used to determine the string lengths. Each value in the
126// dictionary gets a single unique index in the index buffer. Secondly, the selection buffer maps the tuples to an index
127// in the index buffer. The selection buffer is compressed with bitpacking. Finally, the dictionary contains simply all
128// the unique strings without lenghts or null termination as we can deduce the lengths from the index buffer. The
129// addition of the selection buffer is done for two reasons: firstly, to allow the scan to emit dictionary vectors by
130// scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it
131// allows for efficient bitpacking compression as the selection values should remain relatively small.
132struct DictionaryCompressionCompressState : public DictionaryCompressionState {
133 explicit DictionaryCompressionCompressState(ColumnDataCheckpointer &checkpointer_p)
134 : checkpointer(checkpointer_p),
135 function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_DICTIONARY)),
136 heap(BufferAllocator::Get(db&: checkpointer.GetDatabase())) {
137 CreateEmptySegment(row_start: checkpointer.GetRowGroup().start);
138 }
139
140 ColumnDataCheckpointer &checkpointer;
141 CompressionFunction &function;
142
143 // State regarding current segment
144 unique_ptr<ColumnSegment> current_segment;
145 BufferHandle current_handle;
146 StringDictionaryContainer current_dictionary;
147 data_ptr_t current_end_ptr;
148
149 // Buffers and map for current segment
150 StringHeap heap;
151 string_map_t<uint32_t> current_string_map;
152 vector<uint32_t> index_buffer;
153 vector<uint32_t> selection_buffer;
154
155 bitpacking_width_t current_width = 0;
156 bitpacking_width_t next_width = 0;
157
158 // Result of latest LookupString call
159 uint32_t latest_lookup_result;
160
161public:
162 void CreateEmptySegment(idx_t row_start) {
163 auto &db = checkpointer.GetDatabase();
164 auto &type = checkpointer.GetType();
165 auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start);
166 current_segment = std::move(compressed_segment);
167
168 current_segment->function = function;
169
170 // Reset the buffers and string map
171 current_string_map.clear();
172 index_buffer.clear();
173 index_buffer.push_back(x: 0); // Reserve index 0 for null strings
174 selection_buffer.clear();
175
176 current_width = 0;
177 next_width = 0;
178
179 // Reset the pointers into the current segment
180 auto &buffer_manager = BufferManager::GetBufferManager(db&: checkpointer.GetDatabase());
181 current_handle = buffer_manager.Pin(handle&: current_segment->block);
182 current_dictionary = DictionaryCompressionStorage::GetDictionary(segment&: *current_segment, handle&: current_handle);
183 current_end_ptr = current_handle.Ptr() + current_dictionary.end;
184 }
185
186 void Verify() override {
187 current_dictionary.Verify();
188 D_ASSERT(current_segment->count == selection_buffer.size());
189 D_ASSERT(DictionaryCompressionStorage::HasEnoughSpace(current_segment->count.load(), index_buffer.size(),
190 current_dictionary.size, current_width));
191 D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE);
192 D_ASSERT(index_buffer.size() == current_string_map.size() + 1); // +1 is for null value
193 }
194
195 bool LookupString(string_t str) override {
196 auto search = current_string_map.find(x: str);
197 auto has_result = search != current_string_map.end();
198
199 if (has_result) {
200 latest_lookup_result = search->second;
201 }
202 return has_result;
203 }
204
205 void AddNewString(string_t str) override {
206 UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: str);
207
208 // Copy string to dict
209 current_dictionary.size += str.GetSize();
210 auto dict_pos = current_end_ptr - current_dictionary.size;
211 memcpy(dest: dict_pos, src: str.GetData(), n: str.GetSize());
212 current_dictionary.Verify();
213 D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE);
214
215 // Update buffers and map
216 index_buffer.push_back(x: current_dictionary.size);
217 selection_buffer.push_back(x: index_buffer.size() - 1);
218 if (str.IsInlined()) {
219 current_string_map.insert(x: {str, index_buffer.size() - 1});
220 } else {
221 current_string_map.insert(x: {heap.AddBlob(data: str), index_buffer.size() - 1});
222 }
223 DictionaryCompressionStorage::SetDictionary(segment&: *current_segment, handle&: current_handle, container: current_dictionary);
224
225 current_width = next_width;
226 current_segment->count++;
227 }
228
229 void AddNull() override {
230 selection_buffer.push_back(x: 0);
231 current_segment->count++;
232 }
233
234 void AddLastLookup() override {
235 selection_buffer.push_back(x: latest_lookup_result);
236 current_segment->count++;
237 }
238
239 bool CalculateSpaceRequirements(bool new_string, size_t string_size) override {
240 if (new_string) {
241 next_width = BitpackingPrimitives::MinimumBitWidth(value: index_buffer.size() - 1 + new_string);
242 return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_segment->count.load() + 1,
243 index_count: index_buffer.size() + 1,
244 dict_size: current_dictionary.size + string_size, packing_width: next_width);
245 } else {
246 return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_segment->count.load() + 1, index_count: index_buffer.size(),
247 dict_size: current_dictionary.size, packing_width: current_width);
248 }
249 }
250
251 void Flush(bool final = false) override {
252 auto next_start = current_segment->start + current_segment->count;
253
254 auto segment_size = Finalize();
255 auto &state = checkpointer.GetCheckpointState();
256 state.FlushSegment(segment: std::move(current_segment), segment_size);
257
258 if (!final) {
259 CreateEmptySegment(row_start: next_start);
260 }
261 }
262
263 idx_t Finalize() {
264 auto &buffer_manager = BufferManager::GetBufferManager(db&: checkpointer.GetDatabase());
265 auto handle = buffer_manager.Pin(handle&: current_segment->block);
266 D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE);
267
268 // calculate sizes
269 auto compressed_selection_buffer_size =
270 BitpackingPrimitives::GetRequiredSize(count: current_segment->count, width: current_width);
271 auto index_buffer_size = index_buffer.size() * sizeof(uint32_t);
272 auto total_size = DictionaryCompressionStorage::DICTIONARY_HEADER_SIZE + compressed_selection_buffer_size +
273 index_buffer_size + current_dictionary.size;
274
275 // calculate ptr and offsets
276 auto base_ptr = handle.Ptr();
277 auto header_ptr = reinterpret_cast<dictionary_compression_header_t *>(base_ptr);
278 auto compressed_selection_buffer_offset = DictionaryCompressionStorage::DICTIONARY_HEADER_SIZE;
279 auto index_buffer_offset = compressed_selection_buffer_offset + compressed_selection_buffer_size;
280
281 // Write compressed selection buffer
282 BitpackingPrimitives::PackBuffer<sel_t, false>(dst: base_ptr + compressed_selection_buffer_offset,
283 src: (sel_t *)(selection_buffer.data()), count: current_segment->count,
284 width: current_width);
285
286 // Write the index buffer
287 memcpy(dest: base_ptr + index_buffer_offset, src: index_buffer.data(), n: index_buffer_size);
288
289 // Store sizes and offsets in segment header
290 Store<uint32_t>(val: index_buffer_offset, ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset));
291 Store<uint32_t>(val: index_buffer.size(), ptr: data_ptr_cast(src: &header_ptr->index_buffer_count));
292 Store<uint32_t>(val: (uint32_t)current_width, ptr: data_ptr_cast(src: &header_ptr->bitpacking_width));
293
294 D_ASSERT(current_width == BitpackingPrimitives::MinimumBitWidth(index_buffer.size() - 1));
295 D_ASSERT(DictionaryCompressionStorage::HasEnoughSpace(current_segment->count, index_buffer.size(),
296 current_dictionary.size, current_width));
297 D_ASSERT((uint64_t)*max_element(std::begin(selection_buffer), std::end(selection_buffer)) ==
298 index_buffer.size() - 1);
299
300 if (total_size >= DictionaryCompressionStorage::COMPACTION_FLUSH_LIMIT) {
301 // the block is full enough, don't bother moving around the dictionary
302 return Storage::BLOCK_SIZE;
303 }
304 // the block has space left: figure out how much space we can save
305 auto move_amount = Storage::BLOCK_SIZE - total_size;
306 // move the dictionary so it lines up exactly with the offsets
307 auto new_dictionary_offset = index_buffer_offset + index_buffer_size;
308 memmove(dest: base_ptr + new_dictionary_offset, src: base_ptr + current_dictionary.end - current_dictionary.size,
309 n: current_dictionary.size);
310 current_dictionary.end -= move_amount;
311 D_ASSERT(current_dictionary.end == total_size);
312 // write the new dictionary (with the updated "end")
313 DictionaryCompressionStorage::SetDictionary(segment&: *current_segment, handle, container: current_dictionary);
314 return total_size;
315 }
316};
317
318//===--------------------------------------------------------------------===//
319// Analyze
320//===--------------------------------------------------------------------===//
321struct DictionaryAnalyzeState : public DictionaryCompressionState {
322 DictionaryAnalyzeState()
323 : segment_count(0), current_tuple_count(0), current_unique_count(0), current_dict_size(0), current_width(0),
324 next_width(0) {
325 }
326
327 size_t segment_count;
328 idx_t current_tuple_count;
329 idx_t current_unique_count;
330 size_t current_dict_size;
331 StringHeap heap;
332 string_set_t current_set;
333 bitpacking_width_t current_width;
334 bitpacking_width_t next_width;
335
336 bool LookupString(string_t str) override {
337 return current_set.count(x: str);
338 }
339
340 void AddNewString(string_t str) override {
341 current_tuple_count++;
342 current_unique_count++;
343 current_dict_size += str.GetSize();
344 if (str.IsInlined()) {
345 current_set.insert(x: str);
346 } else {
347 current_set.insert(x: heap.AddBlob(data: str));
348 }
349 current_width = next_width;
350 }
351
352 void AddLastLookup() override {
353 current_tuple_count++;
354 }
355
356 void AddNull() override {
357 current_tuple_count++;
358 }
359
360 bool CalculateSpaceRequirements(bool new_string, size_t string_size) override {
361 if (new_string) {
362 next_width =
363 BitpackingPrimitives::MinimumBitWidth(value: current_unique_count + 2); // 1 for null, one for new string
364 return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_tuple_count + 1, index_count: current_unique_count + 1,
365 dict_size: current_dict_size + string_size, packing_width: next_width);
366 } else {
367 return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_tuple_count + 1, index_count: current_unique_count,
368 dict_size: current_dict_size, packing_width: current_width);
369 }
370 }
371
372 void Flush(bool final = false) override {
373 segment_count++;
374 current_tuple_count = 0;
375 current_unique_count = 0;
376 current_dict_size = 0;
377 current_set.clear();
378 }
379 void Verify() override {};
380};
381
382struct DictionaryCompressionAnalyzeState : public AnalyzeState {
383 DictionaryCompressionAnalyzeState() : analyze_state(make_uniq<DictionaryAnalyzeState>()) {
384 }
385
386 unique_ptr<DictionaryAnalyzeState> analyze_state;
387};
388
389unique_ptr<AnalyzeState> DictionaryCompressionStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) {
390 return make_uniq<DictionaryCompressionAnalyzeState>();
391}
392
393bool DictionaryCompressionStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) {
394 auto &state = state_p.Cast<DictionaryCompressionAnalyzeState>();
395 return state.analyze_state->UpdateState(scan_vector&: input, count);
396}
397
398idx_t DictionaryCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) {
399 auto &analyze_state = state_p.Cast<DictionaryCompressionAnalyzeState>();
400 auto &state = *analyze_state.analyze_state;
401
402 auto width = BitpackingPrimitives::MinimumBitWidth(value: state.current_unique_count + 1);
403 auto req_space =
404 RequiredSpace(current_count: state.current_tuple_count, index_count: state.current_unique_count, dict_size: state.current_dict_size, packing_width: width);
405
406 return MINIMUM_COMPRESSION_RATIO * (state.segment_count * Storage::BLOCK_SIZE + req_space);
407}
408
409//===--------------------------------------------------------------------===//
410// Compress
411//===--------------------------------------------------------------------===//
412unique_ptr<CompressionState> DictionaryCompressionStorage::InitCompression(ColumnDataCheckpointer &checkpointer,
413 unique_ptr<AnalyzeState> state) {
414 return make_uniq<DictionaryCompressionCompressState>(args&: checkpointer);
415}
416
417void DictionaryCompressionStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) {
418 auto &state = state_p.Cast<DictionaryCompressionCompressState>();
419 state.UpdateState(scan_vector, count);
420}
421
422void DictionaryCompressionStorage::FinalizeCompress(CompressionState &state_p) {
423 auto &state = state_p.Cast<DictionaryCompressionCompressState>();
424 state.Flush(final: true);
425}
426
427//===--------------------------------------------------------------------===//
428// Scan
429//===--------------------------------------------------------------------===//
430struct CompressedStringScanState : public StringScanState {
431 BufferHandle handle;
432 buffer_ptr<Vector> dictionary;
433 bitpacking_width_t current_width;
434 buffer_ptr<SelectionVector> sel_vec;
435 idx_t sel_vec_size = 0;
436};
437
438unique_ptr<SegmentScanState> DictionaryCompressionStorage::StringInitScan(ColumnSegment &segment) {
439 auto state = make_uniq<CompressedStringScanState>();
440 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
441 state->handle = buffer_manager.Pin(handle&: segment.block);
442
443 auto baseptr = state->handle.Ptr() + segment.GetBlockOffset();
444
445 // Load header values
446 auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle&: state->handle);
447 auto header_ptr = reinterpret_cast<dictionary_compression_header_t *>(baseptr);
448 auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset));
449 auto index_buffer_count = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_count));
450 state->current_width = (bitpacking_width_t)(Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)));
451
452 auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset);
453
454 state->dictionary = make_buffer<Vector>(args&: segment.type, args&: index_buffer_count);
455 auto dict_child_data = FlatVector::GetData<string_t>(vector&: *(state->dictionary));
456
457 for (uint32_t i = 0; i < index_buffer_count; i++) {
458 // NOTE: the passing of dict_child_vector, will not be used, its for big strings
459 uint16_t str_len = GetStringLength(index_buffer_ptr, index: i);
460 dict_child_data[i] = FetchStringFromDict(segment, dict, baseptr, dict_offset: index_buffer_ptr[i], string_len: str_len);
461 }
462
463 return std::move(state);
464}
465
466//===--------------------------------------------------------------------===//
467// Scan base data
468//===--------------------------------------------------------------------===//
469template <bool ALLOW_DICT_VECTORS>
470void DictionaryCompressionStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count,
471 Vector &result, idx_t result_offset) {
472 // clear any previously locked buffers and get the primary buffer handle
473 auto &scan_state = state.scan_state->Cast<CompressedStringScanState>();
474 auto start = segment.GetRelativeIndex(row_index: state.row_index);
475
476 auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset();
477 auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle&: scan_state.handle);
478
479 auto header_ptr = reinterpret_cast<dictionary_compression_header_t *>(baseptr);
480 auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset));
481 auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset);
482
483 auto base_data = data_ptr_cast(src: baseptr + DICTIONARY_HEADER_SIZE);
484 auto result_data = FlatVector::GetData<string_t>(vector&: result);
485
486 if (!ALLOW_DICT_VECTORS || scan_count != STANDARD_VECTOR_SIZE ||
487 start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE != 0) {
488 // Emit regular vector
489
490 // Handling non-bitpacking-group-aligned start values;
491 idx_t start_offset = start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
492
493 // We will scan in blocks of BITPACKING_ALGORITHM_GROUP_SIZE, so we may scan some extra values.
494 idx_t decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: scan_count + start_offset);
495
496 // Create a decompression buffer of sufficient size if we don't already have one.
497 if (!scan_state.sel_vec || scan_state.sel_vec_size < decompress_count) {
498 scan_state.sel_vec_size = decompress_count;
499 scan_state.sel_vec = make_buffer<SelectionVector>(args&: decompress_count);
500 }
501
502 data_ptr_t src = &base_data[((start - start_offset) * scan_state.current_width) / 8];
503 sel_t *sel_vec_ptr = scan_state.sel_vec->data();
504
505 BitpackingPrimitives::UnPackBuffer<sel_t>(dst: data_ptr_cast(src: sel_vec_ptr), src, count: decompress_count,
506 width: scan_state.current_width);
507
508 for (idx_t i = 0; i < scan_count; i++) {
509 // Lookup dict offset in index buffer
510 auto string_number = scan_state.sel_vec->get_index(idx: i + start_offset);
511 auto dict_offset = index_buffer_ptr[string_number];
512 uint16_t str_len = GetStringLength(index_buffer_ptr, index: string_number);
513 result_data[result_offset + i] = FetchStringFromDict(segment, dict, baseptr, dict_offset, string_len: str_len);
514 }
515
516 } else {
517 D_ASSERT(start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE == 0);
518 D_ASSERT(scan_count == STANDARD_VECTOR_SIZE);
519 D_ASSERT(result_offset == 0);
520
521 idx_t decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: scan_count);
522
523 // Create a selection vector of sufficient size if we don't already have one.
524 if (!scan_state.sel_vec || scan_state.sel_vec_size < decompress_count) {
525 scan_state.sel_vec_size = decompress_count;
526 scan_state.sel_vec = make_buffer<SelectionVector>(args&: decompress_count);
527 }
528
529 // Scanning 1024 values, emitting a dict vector
530 data_ptr_t dst = data_ptr_cast(src: scan_state.sel_vec->data());
531 data_ptr_t src = data_ptr_cast(src: &base_data[(start * scan_state.current_width) / 8]);
532
533 BitpackingPrimitives::UnPackBuffer<sel_t>(dst, src, count: scan_count, width: scan_state.current_width);
534
535 result.Slice(other&: *(scan_state.dictionary), sel: *scan_state.sel_vec, count: scan_count);
536 }
537}
538
539void DictionaryCompressionStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count,
540 Vector &result) {
541 StringScanPartial<true>(segment, state, scan_count, result, result_offset: 0);
542}
543
544//===--------------------------------------------------------------------===//
545// Fetch
546//===--------------------------------------------------------------------===//
547void DictionaryCompressionStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id,
548 Vector &result, idx_t result_idx) {
549 // fetch a single row from the string segment
550 // first pin the main buffer if it is not already pinned
551 auto &handle = state.GetOrInsertHandle(segment);
552
553 auto baseptr = handle.Ptr() + segment.GetBlockOffset();
554 auto header_ptr = reinterpret_cast<dictionary_compression_header_t *>(baseptr);
555 auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle);
556 auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset));
557 auto width = (bitpacking_width_t)Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width));
558 auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset);
559 auto base_data = data_ptr_cast(src: baseptr + DICTIONARY_HEADER_SIZE);
560 auto result_data = FlatVector::GetData<string_t>(vector&: result);
561
562 // Handling non-bitpacking-group-aligned start values;
563 idx_t start_offset = row_id % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
564
565 // Decompress part of selection buffer we need for this value.
566 sel_t decompression_buffer[BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE];
567 data_ptr_t src = data_ptr_cast(src: &base_data[((row_id - start_offset) * width) / 8]);
568 BitpackingPrimitives::UnPackBuffer<sel_t>(dst: data_ptr_cast(src: decompression_buffer), src,
569 count: BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE, width);
570
571 auto selection_value = decompression_buffer[start_offset];
572 auto dict_offset = index_buffer_ptr[selection_value];
573 uint16_t str_len = GetStringLength(index_buffer_ptr, index: selection_value);
574
575 result_data[result_idx] = FetchStringFromDict(segment, dict, baseptr, dict_offset, string_len: str_len);
576}
577
578//===--------------------------------------------------------------------===//
579// Helper Functions
580//===--------------------------------------------------------------------===//
581bool DictionaryCompressionStorage::HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size,
582 bitpacking_width_t packing_width) {
583 return RequiredSpace(current_count, index_count, dict_size, packing_width) <= Storage::BLOCK_SIZE;
584}
585
586idx_t DictionaryCompressionStorage::RequiredSpace(idx_t current_count, idx_t index_count, idx_t dict_size,
587 bitpacking_width_t packing_width) {
588 idx_t base_space = DICTIONARY_HEADER_SIZE + dict_size;
589 idx_t string_number_space = BitpackingPrimitives::GetRequiredSize(count: current_count, width: packing_width);
590 idx_t index_space = index_count * sizeof(uint32_t);
591
592 idx_t used_space = base_space + index_space + string_number_space;
593
594 return used_space;
595}
596
597StringDictionaryContainer DictionaryCompressionStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) {
598 auto header_ptr = reinterpret_cast<dictionary_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset());
599 StringDictionaryContainer container;
600 container.size = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_size));
601 container.end = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_end));
602 return container;
603}
604
605void DictionaryCompressionStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle,
606 StringDictionaryContainer container) {
607 auto header_ptr = reinterpret_cast<dictionary_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset());
608 Store<uint32_t>(val: container.size, ptr: data_ptr_cast(src: &header_ptr->dict_size));
609 Store<uint32_t>(val: container.end, ptr: data_ptr_cast(src: &header_ptr->dict_end));
610}
611
612string_t DictionaryCompressionStorage::FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict,
613 data_ptr_t baseptr, int32_t dict_offset,
614 uint16_t string_len) {
615 D_ASSERT(dict_offset >= 0 && dict_offset <= Storage::BLOCK_SIZE);
616
617 if (dict_offset == 0) {
618 return string_t(nullptr, 0);
619 }
620 // normal string: read string from this block
621 auto dict_end = baseptr + dict.end;
622 auto dict_pos = dict_end - dict_offset;
623
624 auto str_ptr = char_ptr_cast(src: dict_pos);
625 return string_t(str_ptr, string_len);
626}
627
628uint16_t DictionaryCompressionStorage::GetStringLength(uint32_t *index_buffer_ptr, sel_t index) {
629 if (index == 0) {
630 return 0;
631 } else {
632 return index_buffer_ptr[index] - index_buffer_ptr[index - 1];
633 }
634}
635
636//===--------------------------------------------------------------------===//
637// Get Function
638//===--------------------------------------------------------------------===//
639CompressionFunction DictionaryCompressionFun::GetFunction(PhysicalType data_type) {
640 return CompressionFunction(
641 CompressionType::COMPRESSION_DICTIONARY, data_type, DictionaryCompressionStorage ::StringInitAnalyze,
642 DictionaryCompressionStorage::StringAnalyze, DictionaryCompressionStorage::StringFinalAnalyze,
643 DictionaryCompressionStorage::InitCompression, DictionaryCompressionStorage::Compress,
644 DictionaryCompressionStorage::FinalizeCompress, DictionaryCompressionStorage::StringInitScan,
645 DictionaryCompressionStorage::StringScan, DictionaryCompressionStorage::StringScanPartial<false>,
646 DictionaryCompressionStorage::StringFetchRow, UncompressedFunctions::EmptySkip);
647}
648
649bool DictionaryCompressionFun::TypeIsSupported(PhysicalType type) {
650 return type == PhysicalType::VARCHAR;
651}
652} // namespace duckdb
653