1 | #include "duckdb/common/bitpacking.hpp" |
2 | #include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp" |
3 | #include "duckdb/storage/string_uncompressed.hpp" |
4 | #include "duckdb/function/compression/compression.hpp" |
5 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
6 | #include "duckdb/main/config.hpp" |
7 | #include "duckdb/common/constants.hpp" |
8 | #include "duckdb/common/random_engine.hpp" |
9 | #include "duckdb/common/fsst.hpp" |
10 | #include "miniz_wrapper.hpp" |
11 | #include "fsst.h" |
12 | |
13 | namespace duckdb { |
14 | |
15 | typedef struct { |
16 | uint32_t dict_size; |
17 | uint32_t dict_end; |
18 | uint32_t bitpacking_width; |
19 | uint32_t fsst_symbol_table_offset; |
20 | } ; |
21 | |
22 | // Counts and offsets used during scanning/fetching |
23 | // | ColumnSegment to be scanned / fetched from | |
24 | // | untouched | bp align | unused d-values | to scan | bp align | untouched | |
25 | typedef struct BPDeltaDecodeOffsets { |
26 | idx_t delta_decode_start_row; // X |
27 | idx_t bitunpack_alignment_offset; // <---------> |
28 | idx_t bitunpack_start_row; // X |
29 | idx_t unused_delta_decoded_values; // <-----------------> |
30 | idx_t scan_offset; // <----------------------------> |
31 | idx_t total_delta_decode_count; // <--------------------------> |
32 | idx_t total_bitunpack_count; // <------------------------------------------------> |
33 | } bp_delta_offsets_t; |
34 | |
35 | struct FSSTStorage { |
36 | static constexpr size_t COMPACTION_FLUSH_LIMIT = (size_t)Storage::BLOCK_SIZE / 5 * 4; |
37 | static constexpr double MINIMUM_COMPRESSION_RATIO = 1.2; |
38 | static constexpr double ANALYSIS_SAMPLE_SIZE = 0.25; |
39 | |
40 | static unique_ptr<AnalyzeState> StringInitAnalyze(ColumnData &col_data, PhysicalType type); |
41 | static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); |
42 | static idx_t StringFinalAnalyze(AnalyzeState &state_p); |
43 | |
44 | static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer, |
45 | unique_ptr<AnalyzeState> analyze_state_p); |
46 | static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); |
47 | static void FinalizeCompress(CompressionState &state_p); |
48 | |
49 | static unique_ptr<SegmentScanState> StringInitScan(ColumnSegment &segment); |
50 | template <bool ALLOW_FSST_VECTORS = false> |
51 | static void StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
52 | idx_t result_offset); |
53 | static void StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result); |
54 | static void StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
55 | idx_t result_idx); |
56 | |
57 | static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container); |
58 | static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle); |
59 | |
60 | static char *FetchStringPointer(StringDictionaryContainer dict, data_ptr_t baseptr, int32_t dict_offset); |
61 | static bp_delta_offsets_t CalculateBpDeltaOffsets(int64_t last_known_row, idx_t start, idx_t scan_count); |
62 | static bool ParseFSSTSegmentHeader(data_ptr_t base_ptr, duckdb_fsst_decoder_t *decoder_out, |
63 | bitpacking_width_t *width_out); |
64 | }; |
65 | |
66 | //===--------------------------------------------------------------------===// |
67 | // Analyze |
68 | //===--------------------------------------------------------------------===// |
69 | struct FSSTAnalyzeState : public AnalyzeState { |
70 | FSSTAnalyzeState() : count(0), fsst_string_total_size(0), empty_strings(0) { |
71 | } |
72 | |
73 | ~FSSTAnalyzeState() override { |
74 | if (fsst_encoder) { |
75 | duckdb_fsst_destroy(fsst_encoder); |
76 | } |
77 | } |
78 | |
79 | duckdb_fsst_encoder_t *fsst_encoder = nullptr; |
80 | idx_t count; |
81 | |
82 | StringHeap fsst_string_heap; |
83 | vector<string_t> fsst_strings; |
84 | size_t fsst_string_total_size; |
85 | |
86 | RandomEngine random_engine; |
87 | bool have_valid_row = false; |
88 | |
89 | idx_t empty_strings; |
90 | }; |
91 | |
92 | unique_ptr<AnalyzeState> FSSTStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { |
93 | return make_uniq<FSSTAnalyzeState>(); |
94 | } |
95 | |
96 | bool FSSTStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { |
97 | auto &state = state_p.Cast<FSSTAnalyzeState>(); |
98 | UnifiedVectorFormat vdata; |
99 | input.ToUnifiedFormat(count, data&: vdata); |
100 | |
101 | state.count += count; |
102 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
103 | |
104 | // Note that we ignore the sampling in case we have not found any valid strings yet, this solves the issue of |
105 | // not having seen any valid strings here leading to an empty fsst symbol table. |
106 | bool sample_selected = !state.have_valid_row || state.random_engine.NextRandom() < ANALYSIS_SAMPLE_SIZE; |
107 | |
108 | for (idx_t i = 0; i < count; i++) { |
109 | auto idx = vdata.sel->get_index(idx: i); |
110 | |
111 | if (!vdata.validity.RowIsValid(row_idx: idx)) { |
112 | continue; |
113 | } |
114 | |
115 | // We need to check all strings for this, otherwise we run in to trouble during compression if we miss ones |
116 | auto string_size = data[idx].GetSize(); |
117 | if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) { |
118 | return false; |
119 | } |
120 | |
121 | if (!sample_selected) { |
122 | continue; |
123 | } |
124 | |
125 | if (string_size > 0) { |
126 | state.have_valid_row = true; |
127 | if (data[idx].IsInlined()) { |
128 | state.fsst_strings.push_back(x: data[idx]); |
129 | } else { |
130 | state.fsst_strings.emplace_back(args: state.fsst_string_heap.AddBlob(data: data[idx])); |
131 | } |
132 | state.fsst_string_total_size += string_size; |
133 | } else { |
134 | state.empty_strings++; |
135 | } |
136 | } |
137 | return true; |
138 | } |
139 | |
140 | idx_t FSSTStorage::StringFinalAnalyze(AnalyzeState &state_p) { |
141 | auto &state = state_p.Cast<FSSTAnalyzeState>(); |
142 | |
143 | size_t compressed_dict_size = 0; |
144 | size_t max_compressed_string_length = 0; |
145 | |
146 | auto string_count = state.fsst_strings.size(); |
147 | |
148 | if (!string_count) { |
149 | return DConstants::INVALID_INDEX; |
150 | } |
151 | |
152 | size_t output_buffer_size = 7 + 2 * state.fsst_string_total_size; // size as specified in fsst.h |
153 | |
154 | vector<size_t> fsst_string_sizes; |
155 | vector<unsigned char *> fsst_string_ptrs; |
156 | for (auto &str : state.fsst_strings) { |
157 | fsst_string_sizes.push_back(x: str.GetSize()); |
158 | fsst_string_ptrs.push_back(x: (unsigned char *)str.GetData()); // NOLINT |
159 | } |
160 | |
161 | state.fsst_encoder = duckdb_fsst_create(n: string_count, lenIn: &fsst_string_sizes[0], strIn: &fsst_string_ptrs[0], zeroTerminated: 0); |
162 | |
163 | // TODO: do we really need to encode to get a size estimate? |
164 | auto compressed_ptrs = vector<unsigned char *>(string_count, nullptr); |
165 | auto compressed_sizes = vector<size_t>(string_count, 0); |
166 | unique_ptr<unsigned char[]> compressed_buffer(new unsigned char[output_buffer_size]); |
167 | |
168 | auto res = |
169 | duckdb_fsst_compress(encoder: state.fsst_encoder, nstrings: string_count, lenIn: &fsst_string_sizes[0], strIn: &fsst_string_ptrs[0], |
170 | outsize: output_buffer_size, output: compressed_buffer.get(), lenOut: &compressed_sizes[0], strOut: &compressed_ptrs[0]); |
171 | |
172 | if (string_count != res) { |
173 | throw std::runtime_error("FSST output buffer is too small unexpectedly" ); |
174 | } |
175 | |
176 | // Sum and and Max compressed lengths |
177 | for (auto &size : compressed_sizes) { |
178 | compressed_dict_size += size; |
179 | max_compressed_string_length = MaxValue(a: max_compressed_string_length, b: size); |
180 | } |
181 | D_ASSERT(compressed_dict_size == (compressed_ptrs[res - 1] - compressed_ptrs[0]) + compressed_sizes[res - 1]); |
182 | |
183 | auto minimum_width = BitpackingPrimitives::MinimumBitWidth(value: max_compressed_string_length); |
184 | auto bitpacked_offsets_size = |
185 | BitpackingPrimitives::GetRequiredSize(count: string_count + state.empty_strings, width: minimum_width); |
186 | |
187 | auto estimated_base_size = (bitpacked_offsets_size + compressed_dict_size) * (1 / ANALYSIS_SAMPLE_SIZE); |
188 | auto num_blocks = estimated_base_size / (Storage::BLOCK_SIZE - sizeof(duckdb_fsst_decoder_t)); |
189 | auto symtable_size = num_blocks * sizeof(duckdb_fsst_decoder_t); |
190 | |
191 | auto estimated_size = estimated_base_size + symtable_size; |
192 | |
193 | return estimated_size * MINIMUM_COMPRESSION_RATIO; |
194 | } |
195 | |
196 | //===--------------------------------------------------------------------===// |
197 | // Compress |
198 | //===--------------------------------------------------------------------===// |
199 | |
200 | class FSSTCompressionState : public CompressionState { |
201 | public: |
202 | explicit FSSTCompressionState(ColumnDataCheckpointer &checkpointer) |
203 | : checkpointer(checkpointer), function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_FSST)) { |
204 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
205 | } |
206 | |
207 | ~FSSTCompressionState() override { |
208 | if (fsst_encoder) { |
209 | duckdb_fsst_destroy(fsst_encoder); |
210 | } |
211 | } |
212 | |
213 | void Reset() { |
214 | index_buffer.clear(); |
215 | current_width = 0; |
216 | max_compressed_string_length = 0; |
217 | last_fitting_size = 0; |
218 | |
219 | // Reset the pointers into the current segment |
220 | auto &buffer_manager = BufferManager::GetBufferManager(db&: current_segment->db); |
221 | current_handle = buffer_manager.Pin(handle&: current_segment->block); |
222 | current_dictionary = FSSTStorage::GetDictionary(segment&: *current_segment, handle&: current_handle); |
223 | current_end_ptr = current_handle.Ptr() + current_dictionary.end; |
224 | } |
225 | |
226 | void CreateEmptySegment(idx_t row_start) { |
227 | auto &db = checkpointer.GetDatabase(); |
228 | auto &type = checkpointer.GetType(); |
229 | auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
230 | current_segment = std::move(compressed_segment); |
231 | current_segment->function = function; |
232 | Reset(); |
233 | } |
234 | |
235 | void UpdateState(string_t uncompressed_string, unsigned char *compressed_string, size_t compressed_string_len) { |
236 | if (!HasEnoughSpace(string_len: compressed_string_len)) { |
237 | Flush(); |
238 | if (!HasEnoughSpace(string_len: compressed_string_len)) { |
239 | throw InternalException("FSST string compression failed due to insufficient space in empty block" ); |
240 | }; |
241 | } |
242 | |
243 | UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: uncompressed_string); |
244 | |
245 | // Write string into dictionary |
246 | current_dictionary.size += compressed_string_len; |
247 | auto dict_pos = current_end_ptr - current_dictionary.size; |
248 | memcpy(dest: dict_pos, src: compressed_string, n: compressed_string_len); |
249 | current_dictionary.Verify(); |
250 | |
251 | // We just push the string length to effectively delta encode the strings |
252 | index_buffer.push_back(x: compressed_string_len); |
253 | |
254 | max_compressed_string_length = MaxValue(a: max_compressed_string_length, b: compressed_string_len); |
255 | |
256 | current_width = BitpackingPrimitives::MinimumBitWidth(value: max_compressed_string_length); |
257 | current_segment->count++; |
258 | } |
259 | |
260 | void AddNull() { |
261 | if (!HasEnoughSpace(string_len: 0)) { |
262 | Flush(); |
263 | if (!HasEnoughSpace(string_len: 0)) { |
264 | throw InternalException("FSST string compression failed due to insufficient space in empty block" ); |
265 | }; |
266 | } |
267 | index_buffer.push_back(x: 0); |
268 | current_segment->count++; |
269 | } |
270 | |
271 | void AddEmptyString() { |
272 | AddNull(); |
273 | UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: "" ); |
274 | } |
275 | |
276 | size_t GetRequiredSize(size_t string_len) { |
277 | bitpacking_width_t required_minimum_width; |
278 | if (string_len > max_compressed_string_length) { |
279 | required_minimum_width = BitpackingPrimitives::MinimumBitWidth(value: string_len); |
280 | } else { |
281 | required_minimum_width = current_width; |
282 | } |
283 | |
284 | size_t current_dict_size = current_dictionary.size; |
285 | idx_t current_string_count = index_buffer.size(); |
286 | |
287 | size_t dict_offsets_size = |
288 | BitpackingPrimitives::GetRequiredSize(count: current_string_count + 1, width: required_minimum_width); |
289 | |
290 | // TODO switch to a symbol table per RowGroup, saves a bit of space |
291 | return sizeof(fsst_compression_header_t) + current_dict_size + dict_offsets_size + string_len + |
292 | fsst_serialized_symbol_table_size; |
293 | } |
294 | |
295 | // Checks if there is enough space, if there is, sets last_fitting_size |
296 | bool HasEnoughSpace(size_t string_len) { |
297 | auto required_size = GetRequiredSize(string_len); |
298 | |
299 | if (required_size <= Storage::BLOCK_SIZE) { |
300 | last_fitting_size = required_size; |
301 | return true; |
302 | } |
303 | return false; |
304 | } |
305 | |
306 | void Flush(bool final = false) { |
307 | auto next_start = current_segment->start + current_segment->count; |
308 | |
309 | auto segment_size = Finalize(); |
310 | auto &state = checkpointer.GetCheckpointState(); |
311 | state.FlushSegment(segment: std::move(current_segment), segment_size); |
312 | |
313 | if (!final) { |
314 | CreateEmptySegment(row_start: next_start); |
315 | } |
316 | } |
317 | |
318 | idx_t Finalize() { |
319 | auto &buffer_manager = BufferManager::GetBufferManager(db&: current_segment->db); |
320 | auto handle = buffer_manager.Pin(handle&: current_segment->block); |
321 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
322 | |
323 | // calculate sizes |
324 | auto compressed_index_buffer_size = |
325 | BitpackingPrimitives::GetRequiredSize(count: current_segment->count, width: current_width); |
326 | auto total_size = sizeof(fsst_compression_header_t) + compressed_index_buffer_size + current_dictionary.size + |
327 | fsst_serialized_symbol_table_size; |
328 | |
329 | if (total_size != last_fitting_size) { |
330 | throw InternalException("FSST string compression failed due to incorrect size calculation" ); |
331 | } |
332 | |
333 | // calculate ptr and offsets |
334 | auto base_ptr = handle.Ptr(); |
335 | auto = reinterpret_cast<fsst_compression_header_t *>(base_ptr); |
336 | auto compressed_index_buffer_offset = sizeof(fsst_compression_header_t); |
337 | auto symbol_table_offset = compressed_index_buffer_offset + compressed_index_buffer_size; |
338 | |
339 | D_ASSERT(current_segment->count == index_buffer.size()); |
340 | BitpackingPrimitives::PackBuffer<sel_t, false>(dst: base_ptr + compressed_index_buffer_offset, |
341 | src: reinterpret_cast<uint32_t *>(index_buffer.data()), |
342 | count: current_segment->count, width: current_width); |
343 | |
344 | // Write the fsst symbol table or nothing |
345 | if (fsst_encoder != nullptr) { |
346 | memcpy(dest: base_ptr + symbol_table_offset, src: &fsst_serialized_symbol_table[0], n: fsst_serialized_symbol_table_size); |
347 | } else { |
348 | memset(s: base_ptr + symbol_table_offset, c: 0, n: fsst_serialized_symbol_table_size); |
349 | } |
350 | |
351 | Store<uint32_t>(val: symbol_table_offset, ptr: data_ptr_cast(src: &header_ptr->fsst_symbol_table_offset)); |
352 | Store<uint32_t>(val: (uint32_t)current_width, ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)); |
353 | |
354 | if (total_size >= FSSTStorage::COMPACTION_FLUSH_LIMIT) { |
355 | // the block is full enough, don't bother moving around the dictionary |
356 | return Storage::BLOCK_SIZE; |
357 | } |
358 | // the block has space left: figure out how much space we can save |
359 | auto move_amount = Storage::BLOCK_SIZE - total_size; |
360 | // move the dictionary so it lines up exactly with the offsets |
361 | auto new_dictionary_offset = symbol_table_offset + fsst_serialized_symbol_table_size; |
362 | memmove(dest: base_ptr + new_dictionary_offset, src: base_ptr + current_dictionary.end - current_dictionary.size, |
363 | n: current_dictionary.size); |
364 | current_dictionary.end -= move_amount; |
365 | D_ASSERT(current_dictionary.end == total_size); |
366 | // write the new dictionary (with the updated "end") |
367 | FSSTStorage::SetDictionary(segment&: *current_segment, handle, container: current_dictionary); |
368 | |
369 | return total_size; |
370 | } |
371 | |
372 | ColumnDataCheckpointer &checkpointer; |
373 | CompressionFunction &function; |
374 | |
375 | // State regarding current segment |
376 | unique_ptr<ColumnSegment> current_segment; |
377 | BufferHandle current_handle; |
378 | StringDictionaryContainer current_dictionary; |
379 | data_ptr_t current_end_ptr; |
380 | |
381 | // Buffers and map for current segment |
382 | vector<uint32_t> index_buffer; |
383 | |
384 | size_t max_compressed_string_length; |
385 | bitpacking_width_t current_width; |
386 | idx_t last_fitting_size; |
387 | |
388 | duckdb_fsst_encoder_t *fsst_encoder = nullptr; |
389 | unsigned char fsst_serialized_symbol_table[sizeof(duckdb_fsst_decoder_t)]; |
390 | size_t fsst_serialized_symbol_table_size = sizeof(duckdb_fsst_decoder_t); |
391 | }; |
392 | |
393 | unique_ptr<CompressionState> FSSTStorage::InitCompression(ColumnDataCheckpointer &checkpointer, |
394 | unique_ptr<AnalyzeState> analyze_state_p) { |
395 | auto analyze_state = static_cast<FSSTAnalyzeState *>(analyze_state_p.get()); |
396 | auto compression_state = make_uniq<FSSTCompressionState>(args&: checkpointer); |
397 | |
398 | if (analyze_state->fsst_encoder == nullptr) { |
399 | throw InternalException("No encoder found during FSST compression" ); |
400 | } |
401 | |
402 | compression_state->fsst_encoder = analyze_state->fsst_encoder; |
403 | compression_state->fsst_serialized_symbol_table_size = |
404 | duckdb_fsst_export(encoder: compression_state->fsst_encoder, buf: &compression_state->fsst_serialized_symbol_table[0]); |
405 | analyze_state->fsst_encoder = nullptr; |
406 | |
407 | return std::move(compression_state); |
408 | } |
409 | |
410 | void FSSTStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
411 | auto &state = state_p.Cast<FSSTCompressionState>(); |
412 | |
413 | // Get vector data |
414 | UnifiedVectorFormat vdata; |
415 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
416 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
417 | |
418 | // Collect pointers to strings to compress |
419 | vector<size_t> sizes_in; |
420 | vector<unsigned char *> strings_in; |
421 | size_t total_size = 0; |
422 | idx_t total_count = 0; |
423 | for (idx_t i = 0; i < count; i++) { |
424 | auto idx = vdata.sel->get_index(idx: i); |
425 | |
426 | // Note: we treat nulls and empty strings the same |
427 | if (!vdata.validity.RowIsValid(row_idx: idx) || data[idx].GetSize() == 0) { |
428 | continue; |
429 | } |
430 | |
431 | total_count++; |
432 | total_size += data[idx].GetSize(); |
433 | sizes_in.push_back(x: data[idx].GetSize()); |
434 | strings_in.push_back(x: (unsigned char *)data[idx].GetData()); // NOLINT |
435 | } |
436 | |
437 | // Only Nulls or empty strings in this vector, nothing to compress |
438 | if (total_count == 0) { |
439 | for (idx_t i = 0; i < count; i++) { |
440 | auto idx = vdata.sel->get_index(idx: i); |
441 | if (!vdata.validity.RowIsValid(row_idx: idx)) { |
442 | state.AddNull(); |
443 | } else if (data[idx].GetSize() == 0) { |
444 | state.AddEmptyString(); |
445 | } else { |
446 | throw FatalException("FSST: no encoder found even though there are values to encode" ); |
447 | } |
448 | } |
449 | return; |
450 | } |
451 | |
452 | // Compress buffers |
453 | size_t compress_buffer_size = MaxValue<size_t>(a: total_size * 2 + 7, b: 1); |
454 | vector<unsigned char *> strings_out(total_count, nullptr); |
455 | vector<size_t> sizes_out(total_count, 0); |
456 | vector<unsigned char> compress_buffer(compress_buffer_size, 0); |
457 | |
458 | auto res = duckdb_fsst_compress( |
459 | encoder: state.fsst_encoder, /* IN: encoder obtained from duckdb_fsst_create(). */ |
460 | nstrings: total_count, /* IN: number of strings in batch to compress. */ |
461 | lenIn: &sizes_in[0], /* IN: byte-lengths of the inputs */ |
462 | strIn: &strings_in[0], /* IN: input string start pointers. */ |
463 | outsize: compress_buffer_size, /* IN: byte-length of output buffer. */ |
464 | output: &compress_buffer[0], /* OUT: memorxy buffer to put the compressed strings in (one after the other). */ |
465 | lenOut: &sizes_out[0], /* OUT: byte-lengths of the compressed strings. */ |
466 | strOut: &strings_out[0] /* OUT: output string start pointers. Will all point into [output,output+size). */ |
467 | ); |
468 | |
469 | if (res != total_count) { |
470 | throw FatalException("FSST compression failed to compress all strings" ); |
471 | } |
472 | |
473 | // Push the compressed strings to the compression state one by one |
474 | idx_t compressed_idx = 0; |
475 | for (idx_t i = 0; i < count; i++) { |
476 | auto idx = vdata.sel->get_index(idx: i); |
477 | if (!vdata.validity.RowIsValid(row_idx: idx)) { |
478 | state.AddNull(); |
479 | } else if (data[idx].GetSize() == 0) { |
480 | state.AddEmptyString(); |
481 | } else { |
482 | state.UpdateState(uncompressed_string: data[idx], compressed_string: strings_out[compressed_idx], compressed_string_len: sizes_out[compressed_idx]); |
483 | compressed_idx++; |
484 | } |
485 | } |
486 | } |
487 | |
488 | void FSSTStorage::FinalizeCompress(CompressionState &state_p) { |
489 | auto &state = state_p.Cast<FSSTCompressionState>(); |
490 | state.Flush(final: true); |
491 | } |
492 | |
493 | //===--------------------------------------------------------------------===// |
494 | // Scan |
495 | //===--------------------------------------------------------------------===// |
496 | struct FSSTScanState : public StringScanState { |
497 | FSSTScanState() { |
498 | ResetStoredDelta(); |
499 | } |
500 | |
501 | buffer_ptr<void> duckdb_fsst_decoder; |
502 | bitpacking_width_t current_width; |
503 | |
504 | // To speed up delta decoding we store the last index |
505 | uint32_t last_known_index; |
506 | int64_t last_known_row; |
507 | |
508 | void StoreLastDelta(uint32_t value, int64_t row) { |
509 | last_known_index = value; |
510 | last_known_row = row; |
511 | } |
512 | void ResetStoredDelta() { |
513 | last_known_index = 0; |
514 | last_known_row = -1; |
515 | } |
516 | }; |
517 | |
518 | unique_ptr<SegmentScanState> FSSTStorage::StringInitScan(ColumnSegment &segment) { |
519 | auto state = make_uniq<FSSTScanState>(); |
520 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
521 | state->handle = buffer_manager.Pin(handle&: segment.block); |
522 | auto base_ptr = state->handle.Ptr() + segment.GetBlockOffset(); |
523 | |
524 | state->duckdb_fsst_decoder = make_buffer<duckdb_fsst_decoder_t>(); |
525 | auto retval = ParseFSSTSegmentHeader( |
526 | base_ptr, decoder_out: reinterpret_cast<duckdb_fsst_decoder_t *>(state->duckdb_fsst_decoder.get()), width_out: &state->current_width); |
527 | if (!retval) { |
528 | state->duckdb_fsst_decoder = nullptr; |
529 | } |
530 | |
531 | return std::move(state); |
532 | } |
533 | |
534 | void DeltaDecodeIndices(uint32_t *buffer_in, uint32_t *buffer_out, idx_t decode_count, uint32_t last_known_value) { |
535 | buffer_out[0] = buffer_in[0]; |
536 | buffer_out[0] += last_known_value; |
537 | for (idx_t i = 1; i < decode_count; i++) { |
538 | buffer_out[i] = buffer_in[i] + buffer_out[i - 1]; |
539 | } |
540 | } |
541 | |
542 | void BitUnpackRange(data_ptr_t src_ptr, data_ptr_t dst_ptr, idx_t count, idx_t row, bitpacking_width_t width) { |
543 | auto bitunpack_src_ptr = &src_ptr[(row * width) / 8]; |
544 | BitpackingPrimitives::UnPackBuffer<uint32_t>(dst: dst_ptr, src: bitunpack_src_ptr, count, width); |
545 | } |
546 | |
547 | //===--------------------------------------------------------------------===// |
548 | // Scan base data |
549 | //===--------------------------------------------------------------------===// |
550 | template <bool ALLOW_FSST_VECTORS> |
551 | void FSSTStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
552 | idx_t result_offset) { |
553 | |
554 | auto &scan_state = state.scan_state->Cast<FSSTScanState>(); |
555 | auto start = segment.GetRelativeIndex(row_index: state.row_index); |
556 | |
557 | bool enable_fsst_vectors; |
558 | if (ALLOW_FSST_VECTORS) { |
559 | auto &config = DBConfig::GetConfig(db&: segment.db); |
560 | enable_fsst_vectors = config.options.enable_fsst_vectors; |
561 | } else { |
562 | enable_fsst_vectors = false; |
563 | } |
564 | |
565 | auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
566 | auto dict = GetDictionary(segment, handle&: scan_state.handle); |
567 | auto base_data = data_ptr_cast(src: baseptr + sizeof(fsst_compression_header_t)); |
568 | string_t *result_data; |
569 | |
570 | if (scan_count == 0) { |
571 | return; |
572 | } |
573 | |
574 | if (enable_fsst_vectors) { |
575 | D_ASSERT(result_offset == 0); |
576 | if (scan_state.duckdb_fsst_decoder) { |
577 | D_ASSERT(result_offset == 0 || result.GetVectorType() == VectorType::FSST_VECTOR); |
578 | result.SetVectorType(VectorType::FSST_VECTOR); |
579 | FSSTVector::RegisterDecoder(vector&: result, duckdb_fsst_decoder&: scan_state.duckdb_fsst_decoder); |
580 | result_data = FSSTVector::GetCompressedData<string_t>(vector&: result); |
581 | } else { |
582 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
583 | result_data = FlatVector::GetData<string_t>(vector&: result); |
584 | } |
585 | } else { |
586 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
587 | result_data = FlatVector::GetData<string_t>(vector&: result); |
588 | } |
589 | |
590 | if (start == 0 || scan_state.last_known_row >= (int64_t)start) { |
591 | scan_state.ResetStoredDelta(); |
592 | } |
593 | |
594 | auto offsets = CalculateBpDeltaOffsets(last_known_row: scan_state.last_known_row, start, scan_count); |
595 | |
596 | auto bitunpack_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_bitunpack_count]); |
597 | BitUnpackRange(src_ptr: base_data, dst_ptr: data_ptr_cast(src: bitunpack_buffer.get()), count: offsets.total_bitunpack_count, |
598 | row: offsets.bitunpack_start_row, width: scan_state.current_width); |
599 | auto delta_decode_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_delta_decode_count]); |
600 | DeltaDecodeIndices(buffer_in: bitunpack_buffer.get() + offsets.bitunpack_alignment_offset, buffer_out: delta_decode_buffer.get(), |
601 | decode_count: offsets.total_delta_decode_count, last_known_value: scan_state.last_known_index); |
602 | |
603 | if (enable_fsst_vectors) { |
604 | // Lookup decompressed offsets in dict |
605 | for (idx_t i = 0; i < scan_count; i++) { |
606 | uint32_t string_length = bitunpack_buffer[i + offsets.scan_offset]; |
607 | result_data[i] = UncompressedStringStorage::FetchStringFromDict( |
608 | segment, dict, result, baseptr, dict_offset: delta_decode_buffer[i + offsets.unused_delta_decoded_values], |
609 | string_length); |
610 | FSSTVector::SetCount(vector&: result, count: scan_count); |
611 | } |
612 | } else { |
613 | // Just decompress |
614 | for (idx_t i = 0; i < scan_count; i++) { |
615 | uint32_t str_len = bitunpack_buffer[i + offsets.scan_offset]; |
616 | auto str_ptr = FSSTStorage::FetchStringPointer( |
617 | dict, baseptr, dict_offset: delta_decode_buffer[i + offsets.unused_delta_decoded_values]); |
618 | |
619 | if (str_len > 0) { |
620 | result_data[i + result_offset] = |
621 | FSSTPrimitives::DecompressValue(duckdb_fsst_decoder: scan_state.duckdb_fsst_decoder.get(), result, compressed_string: str_ptr, compressed_string_len: str_len); |
622 | } else { |
623 | result_data[i + result_offset] = string_t(nullptr, 0); |
624 | } |
625 | } |
626 | } |
627 | |
628 | scan_state.StoreLastDelta(value: delta_decode_buffer[scan_count + offsets.unused_delta_decoded_values - 1], |
629 | row: start + scan_count - 1); |
630 | } |
631 | |
632 | void FSSTStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { |
633 | StringScanPartial<true>(segment, state, scan_count, result, result_offset: 0); |
634 | } |
635 | |
636 | //===--------------------------------------------------------------------===// |
637 | // Fetch |
638 | //===--------------------------------------------------------------------===// |
639 | void FSSTStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
640 | idx_t result_idx) { |
641 | |
642 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
643 | auto handle = buffer_manager.Pin(handle&: segment.block); |
644 | auto base_ptr = handle.Ptr() + segment.GetBlockOffset(); |
645 | auto base_data = data_ptr_cast(src: base_ptr + sizeof(fsst_compression_header_t)); |
646 | auto dict = GetDictionary(segment, handle); |
647 | |
648 | duckdb_fsst_decoder_t decoder; |
649 | bitpacking_width_t width; |
650 | auto have_symbol_table = ParseFSSTSegmentHeader(base_ptr, decoder_out: &decoder, width_out: &width); |
651 | |
652 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
653 | |
654 | if (have_symbol_table) { |
655 | // We basically just do a scan of 1 which is kinda expensive as we need to repeatedly delta decode until we |
656 | // reach the row we want, we could consider a more clever caching trick if this is slow |
657 | auto offsets = CalculateBpDeltaOffsets(last_known_row: -1, start: row_id, scan_count: 1); |
658 | |
659 | auto bitunpack_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_bitunpack_count]); |
660 | BitUnpackRange(src_ptr: base_data, dst_ptr: data_ptr_cast(src: bitunpack_buffer.get()), count: offsets.total_bitunpack_count, |
661 | row: offsets.bitunpack_start_row, width); |
662 | auto delta_decode_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_delta_decode_count]); |
663 | DeltaDecodeIndices(buffer_in: bitunpack_buffer.get() + offsets.bitunpack_alignment_offset, buffer_out: delta_decode_buffer.get(), |
664 | decode_count: offsets.total_delta_decode_count, last_known_value: 0); |
665 | |
666 | uint32_t string_length = bitunpack_buffer[offsets.scan_offset]; |
667 | |
668 | string_t compressed_string = UncompressedStringStorage::FetchStringFromDict( |
669 | segment, dict, result, baseptr: base_ptr, dict_offset: delta_decode_buffer[offsets.unused_delta_decoded_values], string_length); |
670 | |
671 | result_data[result_idx] = FSSTPrimitives::DecompressValue(duckdb_fsst_decoder: (void *)&decoder, result, compressed_string: compressed_string.GetData(), |
672 | compressed_string_len: compressed_string.GetSize()); |
673 | } else { |
674 | // There's no fsst symtable, this only happens for empty strings or nulls, we can just emit an empty string |
675 | result_data[result_idx] = string_t(nullptr, 0); |
676 | } |
677 | } |
678 | |
679 | //===--------------------------------------------------------------------===// |
680 | // Get Function |
681 | //===--------------------------------------------------------------------===// |
682 | CompressionFunction FSSTFun::GetFunction(PhysicalType data_type) { |
683 | D_ASSERT(data_type == PhysicalType::VARCHAR); |
684 | return CompressionFunction( |
685 | CompressionType::COMPRESSION_FSST, data_type, FSSTStorage::StringInitAnalyze, FSSTStorage::StringAnalyze, |
686 | FSSTStorage::StringFinalAnalyze, FSSTStorage::InitCompression, FSSTStorage::Compress, |
687 | FSSTStorage::FinalizeCompress, FSSTStorage::StringInitScan, FSSTStorage::StringScan, |
688 | FSSTStorage::StringScanPartial<false>, FSSTStorage::StringFetchRow, UncompressedFunctions::EmptySkip); |
689 | } |
690 | |
691 | bool FSSTFun::TypeIsSupported(PhysicalType type) { |
692 | return type == PhysicalType::VARCHAR; |
693 | } |
694 | |
695 | //===--------------------------------------------------------------------===// |
696 | // Helper Functions |
697 | //===--------------------------------------------------------------------===// |
698 | void FSSTStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container) { |
699 | auto = reinterpret_cast<fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
700 | Store<uint32_t>(val: container.size, ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
701 | Store<uint32_t>(val: container.end, ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
702 | } |
703 | |
704 | StringDictionaryContainer FSSTStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) { |
705 | auto = reinterpret_cast<fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
706 | StringDictionaryContainer container; |
707 | container.size = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
708 | container.end = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
709 | return container; |
710 | } |
711 | |
712 | char *FSSTStorage::FetchStringPointer(StringDictionaryContainer dict, data_ptr_t baseptr, int32_t dict_offset) { |
713 | if (dict_offset == 0) { |
714 | return nullptr; |
715 | } |
716 | |
717 | auto dict_end = baseptr + dict.end; |
718 | auto dict_pos = dict_end - dict_offset; |
719 | return char_ptr_cast(src: dict_pos); |
720 | } |
721 | |
722 | // Returns false if no symbol table was found. This means all strings are either empty or null |
723 | bool FSSTStorage::(data_ptr_t base_ptr, duckdb_fsst_decoder_t *decoder_out, |
724 | bitpacking_width_t *width_out) { |
725 | auto = reinterpret_cast<fsst_compression_header_t *>(base_ptr); |
726 | auto fsst_symbol_table_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->fsst_symbol_table_offset)); |
727 | *width_out = (bitpacking_width_t)(Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width))); |
728 | return duckdb_fsst_import(decoder: decoder_out, buf: base_ptr + fsst_symbol_table_offset); |
729 | } |
730 | |
731 | // The calculation of offsets and counts while scanning or fetching is a bit tricky, for two reasons: |
732 | // - bitunpacking needs to be aligned to BITPACKING_ALGORITHM_GROUP_SIZE |
733 | // - delta decoding needs to decode from the last known value. |
734 | bp_delta_offsets_t FSSTStorage::CalculateBpDeltaOffsets(int64_t last_known_row, idx_t start, idx_t scan_count) { |
735 | D_ASSERT((idx_t)(last_known_row + 1) <= start); |
736 | bp_delta_offsets_t result; |
737 | |
738 | result.delta_decode_start_row = (idx_t)(last_known_row + 1); |
739 | result.bitunpack_alignment_offset = |
740 | result.delta_decode_start_row % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
741 | result.bitunpack_start_row = result.delta_decode_start_row - result.bitunpack_alignment_offset; |
742 | result.unused_delta_decoded_values = start - result.delta_decode_start_row; |
743 | result.scan_offset = result.bitunpack_alignment_offset + result.unused_delta_decoded_values; |
744 | result.total_delta_decode_count = scan_count + result.unused_delta_decoded_values; |
745 | result.total_bitunpack_count = |
746 | BitpackingPrimitives::RoundUpToAlgorithmGroupSize<idx_t>(num_to_round: scan_count + result.scan_offset); |
747 | |
748 | D_ASSERT(result.total_delta_decode_count + result.bitunpack_alignment_offset <= result.total_bitunpack_count); |
749 | return result; |
750 | } |
751 | |
752 | } // namespace duckdb |
753 | |