1 | #include "duckdb/common/bitpacking.hpp" |
2 | #include "duckdb/common/operator/comparison_operators.hpp" |
3 | #include "duckdb/common/string_map_set.hpp" |
4 | #include "duckdb/common/types/vector_buffer.hpp" |
5 | #include "duckdb/function/compression/compression.hpp" |
6 | #include "duckdb/function/compression_function.hpp" |
7 | #include "duckdb/main/config.hpp" |
8 | #include "duckdb/storage/segment/uncompressed.hpp" |
9 | #include "duckdb/storage/string_uncompressed.hpp" |
10 | #include "duckdb/storage/table/append_state.hpp" |
11 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
12 | |
13 | namespace duckdb { |
14 | |
15 | // Abstract class for keeping compression state either for compression or size analysis |
16 | class DictionaryCompressionState : public CompressionState { |
17 | public: |
18 | bool UpdateState(Vector &scan_vector, idx_t count) { |
19 | UnifiedVectorFormat vdata; |
20 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
21 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
22 | Verify(); |
23 | |
24 | for (idx_t i = 0; i < count; i++) { |
25 | auto idx = vdata.sel->get_index(idx: i); |
26 | size_t string_size = 0; |
27 | bool new_string = false; |
28 | auto row_is_valid = vdata.validity.RowIsValid(row_idx: idx); |
29 | |
30 | if (row_is_valid) { |
31 | string_size = data[idx].GetSize(); |
32 | if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) { |
33 | // Big strings not implemented for dictionary compression |
34 | return false; |
35 | } |
36 | new_string = !LookupString(str: data[idx]); |
37 | } |
38 | |
39 | bool fits = CalculateSpaceRequirements(new_string, string_size); |
40 | if (!fits) { |
41 | Flush(); |
42 | new_string = true; |
43 | |
44 | fits = CalculateSpaceRequirements(new_string, string_size); |
45 | if (!fits) { |
46 | throw InternalException("Dictionary compression could not write to new segment" ); |
47 | } |
48 | } |
49 | |
50 | if (!row_is_valid) { |
51 | AddNull(); |
52 | } else if (new_string) { |
53 | AddNewString(str: data[idx]); |
54 | } else { |
55 | AddLastLookup(); |
56 | } |
57 | |
58 | Verify(); |
59 | } |
60 | |
61 | return true; |
62 | } |
63 | |
64 | protected: |
65 | // Should verify the State |
66 | virtual void Verify() = 0; |
67 | // Performs a lookup of str, storing the result internally |
68 | virtual bool LookupString(string_t str) = 0; |
69 | // Add the most recently looked up str to compression state |
70 | virtual void AddLastLookup() = 0; |
71 | // Add string to the state that is known to not be seen yet |
72 | virtual void AddNewString(string_t str) = 0; |
73 | // Add a null value to the compression state |
74 | virtual void AddNull() = 0; |
75 | // Needs to be called before adding a value. Will return false if a flush is required first. |
76 | virtual bool CalculateSpaceRequirements(bool new_string, size_t string_size) = 0; |
77 | // Flush the segment to disk if compressing or reset the counters if analyzing |
78 | virtual void Flush(bool final = false) = 0; |
79 | }; |
80 | |
81 | typedef struct { |
82 | uint32_t dict_size; |
83 | uint32_t dict_end; |
84 | uint32_t index_buffer_offset; |
85 | uint32_t index_buffer_count; |
86 | uint32_t bitpacking_width; |
87 | } ; |
88 | |
89 | struct DictionaryCompressionStorage { |
90 | static constexpr float MINIMUM_COMPRESSION_RATIO = 1.2; |
91 | static constexpr uint16_t = sizeof(dictionary_compression_header_t); |
92 | static constexpr size_t COMPACTION_FLUSH_LIMIT = (size_t)Storage::BLOCK_SIZE / 5 * 4; |
93 | |
94 | static unique_ptr<AnalyzeState> StringInitAnalyze(ColumnData &col_data, PhysicalType type); |
95 | static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); |
96 | static idx_t StringFinalAnalyze(AnalyzeState &state_p); |
97 | |
98 | static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer, |
99 | unique_ptr<AnalyzeState> state); |
100 | static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); |
101 | static void FinalizeCompress(CompressionState &state_p); |
102 | |
103 | static unique_ptr<SegmentScanState> StringInitScan(ColumnSegment &segment); |
104 | template <bool ALLOW_DICT_VECTORS> |
105 | static void StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
106 | idx_t result_offset); |
107 | static void StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result); |
108 | static void StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
109 | idx_t result_idx); |
110 | |
111 | static bool HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
112 | bitpacking_width_t packing_width); |
113 | static idx_t RequiredSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
114 | bitpacking_width_t packing_width); |
115 | |
116 | static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle); |
117 | static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container); |
118 | static string_t FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, data_ptr_t baseptr, |
119 | int32_t dict_offset, uint16_t string_len); |
120 | static uint16_t GetStringLength(uint32_t *index_buffer_ptr, sel_t index); |
121 | }; |
122 | |
123 | // Dictionary compression uses a combination of bitpacking and a dictionary to compress string segments. The data is |
124 | // stored across three buffers: the index buffer, the selection buffer and the dictionary. Firstly the Index buffer |
125 | // contains the offsets into the dictionary which are also used to determine the string lengths. Each value in the |
126 | // dictionary gets a single unique index in the index buffer. Secondly, the selection buffer maps the tuples to an index |
127 | // in the index buffer. The selection buffer is compressed with bitpacking. Finally, the dictionary contains simply all |
128 | // the unique strings without lenghts or null termination as we can deduce the lengths from the index buffer. The |
129 | // addition of the selection buffer is done for two reasons: firstly, to allow the scan to emit dictionary vectors by |
130 | // scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it |
131 | // allows for efficient bitpacking compression as the selection values should remain relatively small. |
132 | struct DictionaryCompressionCompressState : public DictionaryCompressionState { |
133 | explicit DictionaryCompressionCompressState(ColumnDataCheckpointer &checkpointer_p) |
134 | : checkpointer(checkpointer_p), |
135 | function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_DICTIONARY)), |
136 | heap(BufferAllocator::Get(db&: checkpointer.GetDatabase())) { |
137 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
138 | } |
139 | |
140 | ColumnDataCheckpointer &checkpointer; |
141 | CompressionFunction &function; |
142 | |
143 | // State regarding current segment |
144 | unique_ptr<ColumnSegment> current_segment; |
145 | BufferHandle current_handle; |
146 | StringDictionaryContainer current_dictionary; |
147 | data_ptr_t current_end_ptr; |
148 | |
149 | // Buffers and map for current segment |
150 | StringHeap heap; |
151 | string_map_t<uint32_t> current_string_map; |
152 | vector<uint32_t> index_buffer; |
153 | vector<uint32_t> selection_buffer; |
154 | |
155 | bitpacking_width_t current_width = 0; |
156 | bitpacking_width_t next_width = 0; |
157 | |
158 | // Result of latest LookupString call |
159 | uint32_t latest_lookup_result; |
160 | |
161 | public: |
162 | void CreateEmptySegment(idx_t row_start) { |
163 | auto &db = checkpointer.GetDatabase(); |
164 | auto &type = checkpointer.GetType(); |
165 | auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
166 | current_segment = std::move(compressed_segment); |
167 | |
168 | current_segment->function = function; |
169 | |
170 | // Reset the buffers and string map |
171 | current_string_map.clear(); |
172 | index_buffer.clear(); |
173 | index_buffer.push_back(x: 0); // Reserve index 0 for null strings |
174 | selection_buffer.clear(); |
175 | |
176 | current_width = 0; |
177 | next_width = 0; |
178 | |
179 | // Reset the pointers into the current segment |
180 | auto &buffer_manager = BufferManager::GetBufferManager(db&: checkpointer.GetDatabase()); |
181 | current_handle = buffer_manager.Pin(handle&: current_segment->block); |
182 | current_dictionary = DictionaryCompressionStorage::GetDictionary(segment&: *current_segment, handle&: current_handle); |
183 | current_end_ptr = current_handle.Ptr() + current_dictionary.end; |
184 | } |
185 | |
186 | void Verify() override { |
187 | current_dictionary.Verify(); |
188 | D_ASSERT(current_segment->count == selection_buffer.size()); |
189 | D_ASSERT(DictionaryCompressionStorage::HasEnoughSpace(current_segment->count.load(), index_buffer.size(), |
190 | current_dictionary.size, current_width)); |
191 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
192 | D_ASSERT(index_buffer.size() == current_string_map.size() + 1); // +1 is for null value |
193 | } |
194 | |
195 | bool LookupString(string_t str) override { |
196 | auto search = current_string_map.find(x: str); |
197 | auto has_result = search != current_string_map.end(); |
198 | |
199 | if (has_result) { |
200 | latest_lookup_result = search->second; |
201 | } |
202 | return has_result; |
203 | } |
204 | |
205 | void AddNewString(string_t str) override { |
206 | UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: str); |
207 | |
208 | // Copy string to dict |
209 | current_dictionary.size += str.GetSize(); |
210 | auto dict_pos = current_end_ptr - current_dictionary.size; |
211 | memcpy(dest: dict_pos, src: str.GetData(), n: str.GetSize()); |
212 | current_dictionary.Verify(); |
213 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
214 | |
215 | // Update buffers and map |
216 | index_buffer.push_back(x: current_dictionary.size); |
217 | selection_buffer.push_back(x: index_buffer.size() - 1); |
218 | if (str.IsInlined()) { |
219 | current_string_map.insert(x: {str, index_buffer.size() - 1}); |
220 | } else { |
221 | current_string_map.insert(x: {heap.AddBlob(data: str), index_buffer.size() - 1}); |
222 | } |
223 | DictionaryCompressionStorage::SetDictionary(segment&: *current_segment, handle&: current_handle, container: current_dictionary); |
224 | |
225 | current_width = next_width; |
226 | current_segment->count++; |
227 | } |
228 | |
229 | void AddNull() override { |
230 | selection_buffer.push_back(x: 0); |
231 | current_segment->count++; |
232 | } |
233 | |
234 | void AddLastLookup() override { |
235 | selection_buffer.push_back(x: latest_lookup_result); |
236 | current_segment->count++; |
237 | } |
238 | |
239 | bool CalculateSpaceRequirements(bool new_string, size_t string_size) override { |
240 | if (new_string) { |
241 | next_width = BitpackingPrimitives::MinimumBitWidth(value: index_buffer.size() - 1 + new_string); |
242 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_segment->count.load() + 1, |
243 | index_count: index_buffer.size() + 1, |
244 | dict_size: current_dictionary.size + string_size, packing_width: next_width); |
245 | } else { |
246 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_segment->count.load() + 1, index_count: index_buffer.size(), |
247 | dict_size: current_dictionary.size, packing_width: current_width); |
248 | } |
249 | } |
250 | |
251 | void Flush(bool final = false) override { |
252 | auto next_start = current_segment->start + current_segment->count; |
253 | |
254 | auto segment_size = Finalize(); |
255 | auto &state = checkpointer.GetCheckpointState(); |
256 | state.FlushSegment(segment: std::move(current_segment), segment_size); |
257 | |
258 | if (!final) { |
259 | CreateEmptySegment(row_start: next_start); |
260 | } |
261 | } |
262 | |
263 | idx_t Finalize() { |
264 | auto &buffer_manager = BufferManager::GetBufferManager(db&: checkpointer.GetDatabase()); |
265 | auto handle = buffer_manager.Pin(handle&: current_segment->block); |
266 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
267 | |
268 | // calculate sizes |
269 | auto compressed_selection_buffer_size = |
270 | BitpackingPrimitives::GetRequiredSize(count: current_segment->count, width: current_width); |
271 | auto index_buffer_size = index_buffer.size() * sizeof(uint32_t); |
272 | auto total_size = DictionaryCompressionStorage::DICTIONARY_HEADER_SIZE + compressed_selection_buffer_size + |
273 | index_buffer_size + current_dictionary.size; |
274 | |
275 | // calculate ptr and offsets |
276 | auto base_ptr = handle.Ptr(); |
277 | auto = reinterpret_cast<dictionary_compression_header_t *>(base_ptr); |
278 | auto compressed_selection_buffer_offset = DictionaryCompressionStorage::DICTIONARY_HEADER_SIZE; |
279 | auto index_buffer_offset = compressed_selection_buffer_offset + compressed_selection_buffer_size; |
280 | |
281 | // Write compressed selection buffer |
282 | BitpackingPrimitives::PackBuffer<sel_t, false>(dst: base_ptr + compressed_selection_buffer_offset, |
283 | src: (sel_t *)(selection_buffer.data()), count: current_segment->count, |
284 | width: current_width); |
285 | |
286 | // Write the index buffer |
287 | memcpy(dest: base_ptr + index_buffer_offset, src: index_buffer.data(), n: index_buffer_size); |
288 | |
289 | // Store sizes and offsets in segment header |
290 | Store<uint32_t>(val: index_buffer_offset, ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
291 | Store<uint32_t>(val: index_buffer.size(), ptr: data_ptr_cast(src: &header_ptr->index_buffer_count)); |
292 | Store<uint32_t>(val: (uint32_t)current_width, ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)); |
293 | |
294 | D_ASSERT(current_width == BitpackingPrimitives::MinimumBitWidth(index_buffer.size() - 1)); |
295 | D_ASSERT(DictionaryCompressionStorage::HasEnoughSpace(current_segment->count, index_buffer.size(), |
296 | current_dictionary.size, current_width)); |
297 | D_ASSERT((uint64_t)*max_element(std::begin(selection_buffer), std::end(selection_buffer)) == |
298 | index_buffer.size() - 1); |
299 | |
300 | if (total_size >= DictionaryCompressionStorage::COMPACTION_FLUSH_LIMIT) { |
301 | // the block is full enough, don't bother moving around the dictionary |
302 | return Storage::BLOCK_SIZE; |
303 | } |
304 | // the block has space left: figure out how much space we can save |
305 | auto move_amount = Storage::BLOCK_SIZE - total_size; |
306 | // move the dictionary so it lines up exactly with the offsets |
307 | auto new_dictionary_offset = index_buffer_offset + index_buffer_size; |
308 | memmove(dest: base_ptr + new_dictionary_offset, src: base_ptr + current_dictionary.end - current_dictionary.size, |
309 | n: current_dictionary.size); |
310 | current_dictionary.end -= move_amount; |
311 | D_ASSERT(current_dictionary.end == total_size); |
312 | // write the new dictionary (with the updated "end") |
313 | DictionaryCompressionStorage::SetDictionary(segment&: *current_segment, handle, container: current_dictionary); |
314 | return total_size; |
315 | } |
316 | }; |
317 | |
318 | //===--------------------------------------------------------------------===// |
319 | // Analyze |
320 | //===--------------------------------------------------------------------===// |
321 | struct DictionaryAnalyzeState : public DictionaryCompressionState { |
322 | DictionaryAnalyzeState() |
323 | : segment_count(0), current_tuple_count(0), current_unique_count(0), current_dict_size(0), current_width(0), |
324 | next_width(0) { |
325 | } |
326 | |
327 | size_t segment_count; |
328 | idx_t current_tuple_count; |
329 | idx_t current_unique_count; |
330 | size_t current_dict_size; |
331 | StringHeap heap; |
332 | string_set_t current_set; |
333 | bitpacking_width_t current_width; |
334 | bitpacking_width_t next_width; |
335 | |
336 | bool LookupString(string_t str) override { |
337 | return current_set.count(x: str); |
338 | } |
339 | |
340 | void AddNewString(string_t str) override { |
341 | current_tuple_count++; |
342 | current_unique_count++; |
343 | current_dict_size += str.GetSize(); |
344 | if (str.IsInlined()) { |
345 | current_set.insert(x: str); |
346 | } else { |
347 | current_set.insert(x: heap.AddBlob(data: str)); |
348 | } |
349 | current_width = next_width; |
350 | } |
351 | |
352 | void AddLastLookup() override { |
353 | current_tuple_count++; |
354 | } |
355 | |
356 | void AddNull() override { |
357 | current_tuple_count++; |
358 | } |
359 | |
360 | bool CalculateSpaceRequirements(bool new_string, size_t string_size) override { |
361 | if (new_string) { |
362 | next_width = |
363 | BitpackingPrimitives::MinimumBitWidth(value: current_unique_count + 2); // 1 for null, one for new string |
364 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_tuple_count + 1, index_count: current_unique_count + 1, |
365 | dict_size: current_dict_size + string_size, packing_width: next_width); |
366 | } else { |
367 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_tuple_count + 1, index_count: current_unique_count, |
368 | dict_size: current_dict_size, packing_width: current_width); |
369 | } |
370 | } |
371 | |
372 | void Flush(bool final = false) override { |
373 | segment_count++; |
374 | current_tuple_count = 0; |
375 | current_unique_count = 0; |
376 | current_dict_size = 0; |
377 | current_set.clear(); |
378 | } |
379 | void Verify() override {}; |
380 | }; |
381 | |
382 | struct DictionaryCompressionAnalyzeState : public AnalyzeState { |
383 | DictionaryCompressionAnalyzeState() : analyze_state(make_uniq<DictionaryAnalyzeState>()) { |
384 | } |
385 | |
386 | unique_ptr<DictionaryAnalyzeState> analyze_state; |
387 | }; |
388 | |
389 | unique_ptr<AnalyzeState> DictionaryCompressionStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { |
390 | return make_uniq<DictionaryCompressionAnalyzeState>(); |
391 | } |
392 | |
393 | bool DictionaryCompressionStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { |
394 | auto &state = state_p.Cast<DictionaryCompressionAnalyzeState>(); |
395 | return state.analyze_state->UpdateState(scan_vector&: input, count); |
396 | } |
397 | |
398 | idx_t DictionaryCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) { |
399 | auto &analyze_state = state_p.Cast<DictionaryCompressionAnalyzeState>(); |
400 | auto &state = *analyze_state.analyze_state; |
401 | |
402 | auto width = BitpackingPrimitives::MinimumBitWidth(value: state.current_unique_count + 1); |
403 | auto req_space = |
404 | RequiredSpace(current_count: state.current_tuple_count, index_count: state.current_unique_count, dict_size: state.current_dict_size, packing_width: width); |
405 | |
406 | return MINIMUM_COMPRESSION_RATIO * (state.segment_count * Storage::BLOCK_SIZE + req_space); |
407 | } |
408 | |
409 | //===--------------------------------------------------------------------===// |
410 | // Compress |
411 | //===--------------------------------------------------------------------===// |
412 | unique_ptr<CompressionState> DictionaryCompressionStorage::InitCompression(ColumnDataCheckpointer &checkpointer, |
413 | unique_ptr<AnalyzeState> state) { |
414 | return make_uniq<DictionaryCompressionCompressState>(args&: checkpointer); |
415 | } |
416 | |
417 | void DictionaryCompressionStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
418 | auto &state = state_p.Cast<DictionaryCompressionCompressState>(); |
419 | state.UpdateState(scan_vector, count); |
420 | } |
421 | |
422 | void DictionaryCompressionStorage::FinalizeCompress(CompressionState &state_p) { |
423 | auto &state = state_p.Cast<DictionaryCompressionCompressState>(); |
424 | state.Flush(final: true); |
425 | } |
426 | |
427 | //===--------------------------------------------------------------------===// |
428 | // Scan |
429 | //===--------------------------------------------------------------------===// |
430 | struct CompressedStringScanState : public StringScanState { |
431 | BufferHandle handle; |
432 | buffer_ptr<Vector> dictionary; |
433 | bitpacking_width_t current_width; |
434 | buffer_ptr<SelectionVector> sel_vec; |
435 | idx_t sel_vec_size = 0; |
436 | }; |
437 | |
438 | unique_ptr<SegmentScanState> DictionaryCompressionStorage::StringInitScan(ColumnSegment &segment) { |
439 | auto state = make_uniq<CompressedStringScanState>(); |
440 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
441 | state->handle = buffer_manager.Pin(handle&: segment.block); |
442 | |
443 | auto baseptr = state->handle.Ptr() + segment.GetBlockOffset(); |
444 | |
445 | // Load header values |
446 | auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle&: state->handle); |
447 | auto = reinterpret_cast<dictionary_compression_header_t *>(baseptr); |
448 | auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
449 | auto index_buffer_count = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_count)); |
450 | state->current_width = (bitpacking_width_t)(Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width))); |
451 | |
452 | auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset); |
453 | |
454 | state->dictionary = make_buffer<Vector>(args&: segment.type, args&: index_buffer_count); |
455 | auto dict_child_data = FlatVector::GetData<string_t>(vector&: *(state->dictionary)); |
456 | |
457 | for (uint32_t i = 0; i < index_buffer_count; i++) { |
458 | // NOTE: the passing of dict_child_vector, will not be used, its for big strings |
459 | uint16_t str_len = GetStringLength(index_buffer_ptr, index: i); |
460 | dict_child_data[i] = FetchStringFromDict(segment, dict, baseptr, dict_offset: index_buffer_ptr[i], string_len: str_len); |
461 | } |
462 | |
463 | return std::move(state); |
464 | } |
465 | |
466 | //===--------------------------------------------------------------------===// |
467 | // Scan base data |
468 | //===--------------------------------------------------------------------===// |
469 | template <bool ALLOW_DICT_VECTORS> |
470 | void DictionaryCompressionStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
471 | Vector &result, idx_t result_offset) { |
472 | // clear any previously locked buffers and get the primary buffer handle |
473 | auto &scan_state = state.scan_state->Cast<CompressedStringScanState>(); |
474 | auto start = segment.GetRelativeIndex(row_index: state.row_index); |
475 | |
476 | auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
477 | auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle&: scan_state.handle); |
478 | |
479 | auto = reinterpret_cast<dictionary_compression_header_t *>(baseptr); |
480 | auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
481 | auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset); |
482 | |
483 | auto base_data = data_ptr_cast(src: baseptr + DICTIONARY_HEADER_SIZE); |
484 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
485 | |
486 | if (!ALLOW_DICT_VECTORS || scan_count != STANDARD_VECTOR_SIZE || |
487 | start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE != 0) { |
488 | // Emit regular vector |
489 | |
490 | // Handling non-bitpacking-group-aligned start values; |
491 | idx_t start_offset = start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
492 | |
493 | // We will scan in blocks of BITPACKING_ALGORITHM_GROUP_SIZE, so we may scan some extra values. |
494 | idx_t decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: scan_count + start_offset); |
495 | |
496 | // Create a decompression buffer of sufficient size if we don't already have one. |
497 | if (!scan_state.sel_vec || scan_state.sel_vec_size < decompress_count) { |
498 | scan_state.sel_vec_size = decompress_count; |
499 | scan_state.sel_vec = make_buffer<SelectionVector>(args&: decompress_count); |
500 | } |
501 | |
502 | data_ptr_t src = &base_data[((start - start_offset) * scan_state.current_width) / 8]; |
503 | sel_t *sel_vec_ptr = scan_state.sel_vec->data(); |
504 | |
505 | BitpackingPrimitives::UnPackBuffer<sel_t>(dst: data_ptr_cast(src: sel_vec_ptr), src, count: decompress_count, |
506 | width: scan_state.current_width); |
507 | |
508 | for (idx_t i = 0; i < scan_count; i++) { |
509 | // Lookup dict offset in index buffer |
510 | auto string_number = scan_state.sel_vec->get_index(idx: i + start_offset); |
511 | auto dict_offset = index_buffer_ptr[string_number]; |
512 | uint16_t str_len = GetStringLength(index_buffer_ptr, index: string_number); |
513 | result_data[result_offset + i] = FetchStringFromDict(segment, dict, baseptr, dict_offset, string_len: str_len); |
514 | } |
515 | |
516 | } else { |
517 | D_ASSERT(start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE == 0); |
518 | D_ASSERT(scan_count == STANDARD_VECTOR_SIZE); |
519 | D_ASSERT(result_offset == 0); |
520 | |
521 | idx_t decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: scan_count); |
522 | |
523 | // Create a selection vector of sufficient size if we don't already have one. |
524 | if (!scan_state.sel_vec || scan_state.sel_vec_size < decompress_count) { |
525 | scan_state.sel_vec_size = decompress_count; |
526 | scan_state.sel_vec = make_buffer<SelectionVector>(args&: decompress_count); |
527 | } |
528 | |
529 | // Scanning 1024 values, emitting a dict vector |
530 | data_ptr_t dst = data_ptr_cast(src: scan_state.sel_vec->data()); |
531 | data_ptr_t src = data_ptr_cast(src: &base_data[(start * scan_state.current_width) / 8]); |
532 | |
533 | BitpackingPrimitives::UnPackBuffer<sel_t>(dst, src, count: scan_count, width: scan_state.current_width); |
534 | |
535 | result.Slice(other&: *(scan_state.dictionary), sel: *scan_state.sel_vec, count: scan_count); |
536 | } |
537 | } |
538 | |
539 | void DictionaryCompressionStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
540 | Vector &result) { |
541 | StringScanPartial<true>(segment, state, scan_count, result, result_offset: 0); |
542 | } |
543 | |
544 | //===--------------------------------------------------------------------===// |
545 | // Fetch |
546 | //===--------------------------------------------------------------------===// |
547 | void DictionaryCompressionStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, |
548 | Vector &result, idx_t result_idx) { |
549 | // fetch a single row from the string segment |
550 | // first pin the main buffer if it is not already pinned |
551 | auto &handle = state.GetOrInsertHandle(segment); |
552 | |
553 | auto baseptr = handle.Ptr() + segment.GetBlockOffset(); |
554 | auto = reinterpret_cast<dictionary_compression_header_t *>(baseptr); |
555 | auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle); |
556 | auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
557 | auto width = (bitpacking_width_t)Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)); |
558 | auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset); |
559 | auto base_data = data_ptr_cast(src: baseptr + DICTIONARY_HEADER_SIZE); |
560 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
561 | |
562 | // Handling non-bitpacking-group-aligned start values; |
563 | idx_t start_offset = row_id % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
564 | |
565 | // Decompress part of selection buffer we need for this value. |
566 | sel_t decompression_buffer[BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE]; |
567 | data_ptr_t src = data_ptr_cast(src: &base_data[((row_id - start_offset) * width) / 8]); |
568 | BitpackingPrimitives::UnPackBuffer<sel_t>(dst: data_ptr_cast(src: decompression_buffer), src, |
569 | count: BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE, width); |
570 | |
571 | auto selection_value = decompression_buffer[start_offset]; |
572 | auto dict_offset = index_buffer_ptr[selection_value]; |
573 | uint16_t str_len = GetStringLength(index_buffer_ptr, index: selection_value); |
574 | |
575 | result_data[result_idx] = FetchStringFromDict(segment, dict, baseptr, dict_offset, string_len: str_len); |
576 | } |
577 | |
578 | //===--------------------------------------------------------------------===// |
579 | // Helper Functions |
580 | //===--------------------------------------------------------------------===// |
581 | bool DictionaryCompressionStorage::HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
582 | bitpacking_width_t packing_width) { |
583 | return RequiredSpace(current_count, index_count, dict_size, packing_width) <= Storage::BLOCK_SIZE; |
584 | } |
585 | |
586 | idx_t DictionaryCompressionStorage::RequiredSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
587 | bitpacking_width_t packing_width) { |
588 | idx_t base_space = DICTIONARY_HEADER_SIZE + dict_size; |
589 | idx_t string_number_space = BitpackingPrimitives::GetRequiredSize(count: current_count, width: packing_width); |
590 | idx_t index_space = index_count * sizeof(uint32_t); |
591 | |
592 | idx_t used_space = base_space + index_space + string_number_space; |
593 | |
594 | return used_space; |
595 | } |
596 | |
597 | StringDictionaryContainer DictionaryCompressionStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) { |
598 | auto = reinterpret_cast<dictionary_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
599 | StringDictionaryContainer container; |
600 | container.size = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
601 | container.end = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
602 | return container; |
603 | } |
604 | |
605 | void DictionaryCompressionStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, |
606 | StringDictionaryContainer container) { |
607 | auto = reinterpret_cast<dictionary_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
608 | Store<uint32_t>(val: container.size, ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
609 | Store<uint32_t>(val: container.end, ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
610 | } |
611 | |
612 | string_t DictionaryCompressionStorage::FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, |
613 | data_ptr_t baseptr, int32_t dict_offset, |
614 | uint16_t string_len) { |
615 | D_ASSERT(dict_offset >= 0 && dict_offset <= Storage::BLOCK_SIZE); |
616 | |
617 | if (dict_offset == 0) { |
618 | return string_t(nullptr, 0); |
619 | } |
620 | // normal string: read string from this block |
621 | auto dict_end = baseptr + dict.end; |
622 | auto dict_pos = dict_end - dict_offset; |
623 | |
624 | auto str_ptr = char_ptr_cast(src: dict_pos); |
625 | return string_t(str_ptr, string_len); |
626 | } |
627 | |
628 | uint16_t DictionaryCompressionStorage::GetStringLength(uint32_t *index_buffer_ptr, sel_t index) { |
629 | if (index == 0) { |
630 | return 0; |
631 | } else { |
632 | return index_buffer_ptr[index] - index_buffer_ptr[index - 1]; |
633 | } |
634 | } |
635 | |
636 | //===--------------------------------------------------------------------===// |
637 | // Get Function |
638 | //===--------------------------------------------------------------------===// |
639 | CompressionFunction DictionaryCompressionFun::GetFunction(PhysicalType data_type) { |
640 | return CompressionFunction( |
641 | CompressionType::COMPRESSION_DICTIONARY, data_type, DictionaryCompressionStorage ::StringInitAnalyze, |
642 | DictionaryCompressionStorage::StringAnalyze, DictionaryCompressionStorage::StringFinalAnalyze, |
643 | DictionaryCompressionStorage::InitCompression, DictionaryCompressionStorage::Compress, |
644 | DictionaryCompressionStorage::FinalizeCompress, DictionaryCompressionStorage::StringInitScan, |
645 | DictionaryCompressionStorage::StringScan, DictionaryCompressionStorage::StringScanPartial<false>, |
646 | DictionaryCompressionStorage::StringFetchRow, UncompressedFunctions::EmptySkip); |
647 | } |
648 | |
649 | bool DictionaryCompressionFun::TypeIsSupported(PhysicalType type) { |
650 | return type == PhysicalType::VARCHAR; |
651 | } |
652 | } // namespace duckdb |
653 | |