1#include "duckdb/common/bitpacking.hpp"
2#include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp"
3#include "duckdb/storage/string_uncompressed.hpp"
4#include "duckdb/function/compression/compression.hpp"
5#include "duckdb/storage/table/column_data_checkpointer.hpp"
6#include "duckdb/main/config.hpp"
7#include "duckdb/common/constants.hpp"
8#include "duckdb/common/random_engine.hpp"
9#include "duckdb/common/fsst.hpp"
10#include "miniz_wrapper.hpp"
11#include "fsst.h"
12
13namespace duckdb {
14
15typedef struct {
16 uint32_t dict_size;
17 uint32_t dict_end;
18 uint32_t bitpacking_width;
19 uint32_t fsst_symbol_table_offset;
20} fsst_compression_header_t;
21
22// Counts and offsets used during scanning/fetching
23// | ColumnSegment to be scanned / fetched from |
24// | untouched | bp align | unused d-values | to scan | bp align | untouched |
25typedef struct BPDeltaDecodeOffsets {
26 idx_t delta_decode_start_row; // X
27 idx_t bitunpack_alignment_offset; // <--------->
28 idx_t bitunpack_start_row; // X
29 idx_t unused_delta_decoded_values; // <----------------->
30 idx_t scan_offset; // <---------------------------->
31 idx_t total_delta_decode_count; // <-------------------------->
32 idx_t total_bitunpack_count; // <------------------------------------------------>
33} bp_delta_offsets_t;
34
35struct FSSTStorage {
36 static constexpr size_t COMPACTION_FLUSH_LIMIT = (size_t)Storage::BLOCK_SIZE / 5 * 4;
37 static constexpr double MINIMUM_COMPRESSION_RATIO = 1.2;
38 static constexpr double ANALYSIS_SAMPLE_SIZE = 0.25;
39
40 static unique_ptr<AnalyzeState> StringInitAnalyze(ColumnData &col_data, PhysicalType type);
41 static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count);
42 static idx_t StringFinalAnalyze(AnalyzeState &state_p);
43
44 static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer,
45 unique_ptr<AnalyzeState> analyze_state_p);
46 static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count);
47 static void FinalizeCompress(CompressionState &state_p);
48
49 static unique_ptr<SegmentScanState> StringInitScan(ColumnSegment &segment);
50 template <bool ALLOW_FSST_VECTORS = false>
51 static void StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result,
52 idx_t result_offset);
53 static void StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result);
54 static void StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result,
55 idx_t result_idx);
56
57 static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container);
58 static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle);
59
60 static char *FetchStringPointer(StringDictionaryContainer dict, data_ptr_t baseptr, int32_t dict_offset);
61 static bp_delta_offsets_t CalculateBpDeltaOffsets(int64_t last_known_row, idx_t start, idx_t scan_count);
62 static bool ParseFSSTSegmentHeader(data_ptr_t base_ptr, duckdb_fsst_decoder_t *decoder_out,
63 bitpacking_width_t *width_out);
64};
65
66//===--------------------------------------------------------------------===//
67// Analyze
68//===--------------------------------------------------------------------===//
69struct FSSTAnalyzeState : public AnalyzeState {
70 FSSTAnalyzeState() : count(0), fsst_string_total_size(0), empty_strings(0) {
71 }
72
73 ~FSSTAnalyzeState() override {
74 if (fsst_encoder) {
75 duckdb_fsst_destroy(fsst_encoder);
76 }
77 }
78
79 duckdb_fsst_encoder_t *fsst_encoder = nullptr;
80 idx_t count;
81
82 StringHeap fsst_string_heap;
83 vector<string_t> fsst_strings;
84 size_t fsst_string_total_size;
85
86 RandomEngine random_engine;
87 bool have_valid_row = false;
88
89 idx_t empty_strings;
90};
91
92unique_ptr<AnalyzeState> FSSTStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) {
93 return make_uniq<FSSTAnalyzeState>();
94}
95
96bool FSSTStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) {
97 auto &state = state_p.Cast<FSSTAnalyzeState>();
98 UnifiedVectorFormat vdata;
99 input.ToUnifiedFormat(count, data&: vdata);
100
101 state.count += count;
102 auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata);
103
104 // Note that we ignore the sampling in case we have not found any valid strings yet, this solves the issue of
105 // not having seen any valid strings here leading to an empty fsst symbol table.
106 bool sample_selected = !state.have_valid_row || state.random_engine.NextRandom() < ANALYSIS_SAMPLE_SIZE;
107
108 for (idx_t i = 0; i < count; i++) {
109 auto idx = vdata.sel->get_index(idx: i);
110
111 if (!vdata.validity.RowIsValid(row_idx: idx)) {
112 continue;
113 }
114
115 // We need to check all strings for this, otherwise we run in to trouble during compression if we miss ones
116 auto string_size = data[idx].GetSize();
117 if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) {
118 return false;
119 }
120
121 if (!sample_selected) {
122 continue;
123 }
124
125 if (string_size > 0) {
126 state.have_valid_row = true;
127 if (data[idx].IsInlined()) {
128 state.fsst_strings.push_back(x: data[idx]);
129 } else {
130 state.fsst_strings.emplace_back(args: state.fsst_string_heap.AddBlob(data: data[idx]));
131 }
132 state.fsst_string_total_size += string_size;
133 } else {
134 state.empty_strings++;
135 }
136 }
137 return true;
138}
139
140idx_t FSSTStorage::StringFinalAnalyze(AnalyzeState &state_p) {
141 auto &state = state_p.Cast<FSSTAnalyzeState>();
142
143 size_t compressed_dict_size = 0;
144 size_t max_compressed_string_length = 0;
145
146 auto string_count = state.fsst_strings.size();
147
148 if (!string_count) {
149 return DConstants::INVALID_INDEX;
150 }
151
152 size_t output_buffer_size = 7 + 2 * state.fsst_string_total_size; // size as specified in fsst.h
153
154 vector<size_t> fsst_string_sizes;
155 vector<unsigned char *> fsst_string_ptrs;
156 for (auto &str : state.fsst_strings) {
157 fsst_string_sizes.push_back(x: str.GetSize());
158 fsst_string_ptrs.push_back(x: (unsigned char *)str.GetData()); // NOLINT
159 }
160
161 state.fsst_encoder = duckdb_fsst_create(n: string_count, lenIn: &fsst_string_sizes[0], strIn: &fsst_string_ptrs[0], zeroTerminated: 0);
162
163 // TODO: do we really need to encode to get a size estimate?
164 auto compressed_ptrs = vector<unsigned char *>(string_count, nullptr);
165 auto compressed_sizes = vector<size_t>(string_count, 0);
166 unique_ptr<unsigned char[]> compressed_buffer(new unsigned char[output_buffer_size]);
167
168 auto res =
169 duckdb_fsst_compress(encoder: state.fsst_encoder, nstrings: string_count, lenIn: &fsst_string_sizes[0], strIn: &fsst_string_ptrs[0],
170 outsize: output_buffer_size, output: compressed_buffer.get(), lenOut: &compressed_sizes[0], strOut: &compressed_ptrs[0]);
171
172 if (string_count != res) {
173 throw std::runtime_error("FSST output buffer is too small unexpectedly");
174 }
175
176 // Sum and and Max compressed lengths
177 for (auto &size : compressed_sizes) {
178 compressed_dict_size += size;
179 max_compressed_string_length = MaxValue(a: max_compressed_string_length, b: size);
180 }
181 D_ASSERT(compressed_dict_size == (compressed_ptrs[res - 1] - compressed_ptrs[0]) + compressed_sizes[res - 1]);
182
183 auto minimum_width = BitpackingPrimitives::MinimumBitWidth(value: max_compressed_string_length);
184 auto bitpacked_offsets_size =
185 BitpackingPrimitives::GetRequiredSize(count: string_count + state.empty_strings, width: minimum_width);
186
187 auto estimated_base_size = (bitpacked_offsets_size + compressed_dict_size) * (1 / ANALYSIS_SAMPLE_SIZE);
188 auto num_blocks = estimated_base_size / (Storage::BLOCK_SIZE - sizeof(duckdb_fsst_decoder_t));
189 auto symtable_size = num_blocks * sizeof(duckdb_fsst_decoder_t);
190
191 auto estimated_size = estimated_base_size + symtable_size;
192
193 return estimated_size * MINIMUM_COMPRESSION_RATIO;
194}
195
196//===--------------------------------------------------------------------===//
197// Compress
198//===--------------------------------------------------------------------===//
199
200class FSSTCompressionState : public CompressionState {
201public:
202 explicit FSSTCompressionState(ColumnDataCheckpointer &checkpointer)
203 : checkpointer(checkpointer), function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_FSST)) {
204 CreateEmptySegment(row_start: checkpointer.GetRowGroup().start);
205 }
206
207 ~FSSTCompressionState() override {
208 if (fsst_encoder) {
209 duckdb_fsst_destroy(fsst_encoder);
210 }
211 }
212
213 void Reset() {
214 index_buffer.clear();
215 current_width = 0;
216 max_compressed_string_length = 0;
217 last_fitting_size = 0;
218
219 // Reset the pointers into the current segment
220 auto &buffer_manager = BufferManager::GetBufferManager(db&: current_segment->db);
221 current_handle = buffer_manager.Pin(handle&: current_segment->block);
222 current_dictionary = FSSTStorage::GetDictionary(segment&: *current_segment, handle&: current_handle);
223 current_end_ptr = current_handle.Ptr() + current_dictionary.end;
224 }
225
226 void CreateEmptySegment(idx_t row_start) {
227 auto &db = checkpointer.GetDatabase();
228 auto &type = checkpointer.GetType();
229 auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start);
230 current_segment = std::move(compressed_segment);
231 current_segment->function = function;
232 Reset();
233 }
234
235 void UpdateState(string_t uncompressed_string, unsigned char *compressed_string, size_t compressed_string_len) {
236 if (!HasEnoughSpace(string_len: compressed_string_len)) {
237 Flush();
238 if (!HasEnoughSpace(string_len: compressed_string_len)) {
239 throw InternalException("FSST string compression failed due to insufficient space in empty block");
240 };
241 }
242
243 UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: uncompressed_string);
244
245 // Write string into dictionary
246 current_dictionary.size += compressed_string_len;
247 auto dict_pos = current_end_ptr - current_dictionary.size;
248 memcpy(dest: dict_pos, src: compressed_string, n: compressed_string_len);
249 current_dictionary.Verify();
250
251 // We just push the string length to effectively delta encode the strings
252 index_buffer.push_back(x: compressed_string_len);
253
254 max_compressed_string_length = MaxValue(a: max_compressed_string_length, b: compressed_string_len);
255
256 current_width = BitpackingPrimitives::MinimumBitWidth(value: max_compressed_string_length);
257 current_segment->count++;
258 }
259
260 void AddNull() {
261 if (!HasEnoughSpace(string_len: 0)) {
262 Flush();
263 if (!HasEnoughSpace(string_len: 0)) {
264 throw InternalException("FSST string compression failed due to insufficient space in empty block");
265 };
266 }
267 index_buffer.push_back(x: 0);
268 current_segment->count++;
269 }
270
271 void AddEmptyString() {
272 AddNull();
273 UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: "");
274 }
275
276 size_t GetRequiredSize(size_t string_len) {
277 bitpacking_width_t required_minimum_width;
278 if (string_len > max_compressed_string_length) {
279 required_minimum_width = BitpackingPrimitives::MinimumBitWidth(value: string_len);
280 } else {
281 required_minimum_width = current_width;
282 }
283
284 size_t current_dict_size = current_dictionary.size;
285 idx_t current_string_count = index_buffer.size();
286
287 size_t dict_offsets_size =
288 BitpackingPrimitives::GetRequiredSize(count: current_string_count + 1, width: required_minimum_width);
289
290 // TODO switch to a symbol table per RowGroup, saves a bit of space
291 return sizeof(fsst_compression_header_t) + current_dict_size + dict_offsets_size + string_len +
292 fsst_serialized_symbol_table_size;
293 }
294
295 // Checks if there is enough space, if there is, sets last_fitting_size
296 bool HasEnoughSpace(size_t string_len) {
297 auto required_size = GetRequiredSize(string_len);
298
299 if (required_size <= Storage::BLOCK_SIZE) {
300 last_fitting_size = required_size;
301 return true;
302 }
303 return false;
304 }
305
306 void Flush(bool final = false) {
307 auto next_start = current_segment->start + current_segment->count;
308
309 auto segment_size = Finalize();
310 auto &state = checkpointer.GetCheckpointState();
311 state.FlushSegment(segment: std::move(current_segment), segment_size);
312
313 if (!final) {
314 CreateEmptySegment(row_start: next_start);
315 }
316 }
317
318 idx_t Finalize() {
319 auto &buffer_manager = BufferManager::GetBufferManager(db&: current_segment->db);
320 auto handle = buffer_manager.Pin(handle&: current_segment->block);
321 D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE);
322
323 // calculate sizes
324 auto compressed_index_buffer_size =
325 BitpackingPrimitives::GetRequiredSize(count: current_segment->count, width: current_width);
326 auto total_size = sizeof(fsst_compression_header_t) + compressed_index_buffer_size + current_dictionary.size +
327 fsst_serialized_symbol_table_size;
328
329 if (total_size != last_fitting_size) {
330 throw InternalException("FSST string compression failed due to incorrect size calculation");
331 }
332
333 // calculate ptr and offsets
334 auto base_ptr = handle.Ptr();
335 auto header_ptr = reinterpret_cast<fsst_compression_header_t *>(base_ptr);
336 auto compressed_index_buffer_offset = sizeof(fsst_compression_header_t);
337 auto symbol_table_offset = compressed_index_buffer_offset + compressed_index_buffer_size;
338
339 D_ASSERT(current_segment->count == index_buffer.size());
340 BitpackingPrimitives::PackBuffer<sel_t, false>(dst: base_ptr + compressed_index_buffer_offset,
341 src: reinterpret_cast<uint32_t *>(index_buffer.data()),
342 count: current_segment->count, width: current_width);
343
344 // Write the fsst symbol table or nothing
345 if (fsst_encoder != nullptr) {
346 memcpy(dest: base_ptr + symbol_table_offset, src: &fsst_serialized_symbol_table[0], n: fsst_serialized_symbol_table_size);
347 } else {
348 memset(s: base_ptr + symbol_table_offset, c: 0, n: fsst_serialized_symbol_table_size);
349 }
350
351 Store<uint32_t>(val: symbol_table_offset, ptr: data_ptr_cast(src: &header_ptr->fsst_symbol_table_offset));
352 Store<uint32_t>(val: (uint32_t)current_width, ptr: data_ptr_cast(src: &header_ptr->bitpacking_width));
353
354 if (total_size >= FSSTStorage::COMPACTION_FLUSH_LIMIT) {
355 // the block is full enough, don't bother moving around the dictionary
356 return Storage::BLOCK_SIZE;
357 }
358 // the block has space left: figure out how much space we can save
359 auto move_amount = Storage::BLOCK_SIZE - total_size;
360 // move the dictionary so it lines up exactly with the offsets
361 auto new_dictionary_offset = symbol_table_offset + fsst_serialized_symbol_table_size;
362 memmove(dest: base_ptr + new_dictionary_offset, src: base_ptr + current_dictionary.end - current_dictionary.size,
363 n: current_dictionary.size);
364 current_dictionary.end -= move_amount;
365 D_ASSERT(current_dictionary.end == total_size);
366 // write the new dictionary (with the updated "end")
367 FSSTStorage::SetDictionary(segment&: *current_segment, handle, container: current_dictionary);
368
369 return total_size;
370 }
371
372 ColumnDataCheckpointer &checkpointer;
373 CompressionFunction &function;
374
375 // State regarding current segment
376 unique_ptr<ColumnSegment> current_segment;
377 BufferHandle current_handle;
378 StringDictionaryContainer current_dictionary;
379 data_ptr_t current_end_ptr;
380
381 // Buffers and map for current segment
382 vector<uint32_t> index_buffer;
383
384 size_t max_compressed_string_length;
385 bitpacking_width_t current_width;
386 idx_t last_fitting_size;
387
388 duckdb_fsst_encoder_t *fsst_encoder = nullptr;
389 unsigned char fsst_serialized_symbol_table[sizeof(duckdb_fsst_decoder_t)];
390 size_t fsst_serialized_symbol_table_size = sizeof(duckdb_fsst_decoder_t);
391};
392
393unique_ptr<CompressionState> FSSTStorage::InitCompression(ColumnDataCheckpointer &checkpointer,
394 unique_ptr<AnalyzeState> analyze_state_p) {
395 auto analyze_state = static_cast<FSSTAnalyzeState *>(analyze_state_p.get());
396 auto compression_state = make_uniq<FSSTCompressionState>(args&: checkpointer);
397
398 if (analyze_state->fsst_encoder == nullptr) {
399 throw InternalException("No encoder found during FSST compression");
400 }
401
402 compression_state->fsst_encoder = analyze_state->fsst_encoder;
403 compression_state->fsst_serialized_symbol_table_size =
404 duckdb_fsst_export(encoder: compression_state->fsst_encoder, buf: &compression_state->fsst_serialized_symbol_table[0]);
405 analyze_state->fsst_encoder = nullptr;
406
407 return std::move(compression_state);
408}
409
410void FSSTStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) {
411 auto &state = state_p.Cast<FSSTCompressionState>();
412
413 // Get vector data
414 UnifiedVectorFormat vdata;
415 scan_vector.ToUnifiedFormat(count, data&: vdata);
416 auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata);
417
418 // Collect pointers to strings to compress
419 vector<size_t> sizes_in;
420 vector<unsigned char *> strings_in;
421 size_t total_size = 0;
422 idx_t total_count = 0;
423 for (idx_t i = 0; i < count; i++) {
424 auto idx = vdata.sel->get_index(idx: i);
425
426 // Note: we treat nulls and empty strings the same
427 if (!vdata.validity.RowIsValid(row_idx: idx) || data[idx].GetSize() == 0) {
428 continue;
429 }
430
431 total_count++;
432 total_size += data[idx].GetSize();
433 sizes_in.push_back(x: data[idx].GetSize());
434 strings_in.push_back(x: (unsigned char *)data[idx].GetData()); // NOLINT
435 }
436
437 // Only Nulls or empty strings in this vector, nothing to compress
438 if (total_count == 0) {
439 for (idx_t i = 0; i < count; i++) {
440 auto idx = vdata.sel->get_index(idx: i);
441 if (!vdata.validity.RowIsValid(row_idx: idx)) {
442 state.AddNull();
443 } else if (data[idx].GetSize() == 0) {
444 state.AddEmptyString();
445 } else {
446 throw FatalException("FSST: no encoder found even though there are values to encode");
447 }
448 }
449 return;
450 }
451
452 // Compress buffers
453 size_t compress_buffer_size = MaxValue<size_t>(a: total_size * 2 + 7, b: 1);
454 vector<unsigned char *> strings_out(total_count, nullptr);
455 vector<size_t> sizes_out(total_count, 0);
456 vector<unsigned char> compress_buffer(compress_buffer_size, 0);
457
458 auto res = duckdb_fsst_compress(
459 encoder: state.fsst_encoder, /* IN: encoder obtained from duckdb_fsst_create(). */
460 nstrings: total_count, /* IN: number of strings in batch to compress. */
461 lenIn: &sizes_in[0], /* IN: byte-lengths of the inputs */
462 strIn: &strings_in[0], /* IN: input string start pointers. */
463 outsize: compress_buffer_size, /* IN: byte-length of output buffer. */
464 output: &compress_buffer[0], /* OUT: memorxy buffer to put the compressed strings in (one after the other). */
465 lenOut: &sizes_out[0], /* OUT: byte-lengths of the compressed strings. */
466 strOut: &strings_out[0] /* OUT: output string start pointers. Will all point into [output,output+size). */
467 );
468
469 if (res != total_count) {
470 throw FatalException("FSST compression failed to compress all strings");
471 }
472
473 // Push the compressed strings to the compression state one by one
474 idx_t compressed_idx = 0;
475 for (idx_t i = 0; i < count; i++) {
476 auto idx = vdata.sel->get_index(idx: i);
477 if (!vdata.validity.RowIsValid(row_idx: idx)) {
478 state.AddNull();
479 } else if (data[idx].GetSize() == 0) {
480 state.AddEmptyString();
481 } else {
482 state.UpdateState(uncompressed_string: data[idx], compressed_string: strings_out[compressed_idx], compressed_string_len: sizes_out[compressed_idx]);
483 compressed_idx++;
484 }
485 }
486}
487
488void FSSTStorage::FinalizeCompress(CompressionState &state_p) {
489 auto &state = state_p.Cast<FSSTCompressionState>();
490 state.Flush(final: true);
491}
492
493//===--------------------------------------------------------------------===//
494// Scan
495//===--------------------------------------------------------------------===//
496struct FSSTScanState : public StringScanState {
497 FSSTScanState() {
498 ResetStoredDelta();
499 }
500
501 buffer_ptr<void> duckdb_fsst_decoder;
502 bitpacking_width_t current_width;
503
504 // To speed up delta decoding we store the last index
505 uint32_t last_known_index;
506 int64_t last_known_row;
507
508 void StoreLastDelta(uint32_t value, int64_t row) {
509 last_known_index = value;
510 last_known_row = row;
511 }
512 void ResetStoredDelta() {
513 last_known_index = 0;
514 last_known_row = -1;
515 }
516};
517
518unique_ptr<SegmentScanState> FSSTStorage::StringInitScan(ColumnSegment &segment) {
519 auto state = make_uniq<FSSTScanState>();
520 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
521 state->handle = buffer_manager.Pin(handle&: segment.block);
522 auto base_ptr = state->handle.Ptr() + segment.GetBlockOffset();
523
524 state->duckdb_fsst_decoder = make_buffer<duckdb_fsst_decoder_t>();
525 auto retval = ParseFSSTSegmentHeader(
526 base_ptr, decoder_out: reinterpret_cast<duckdb_fsst_decoder_t *>(state->duckdb_fsst_decoder.get()), width_out: &state->current_width);
527 if (!retval) {
528 state->duckdb_fsst_decoder = nullptr;
529 }
530
531 return std::move(state);
532}
533
534void DeltaDecodeIndices(uint32_t *buffer_in, uint32_t *buffer_out, idx_t decode_count, uint32_t last_known_value) {
535 buffer_out[0] = buffer_in[0];
536 buffer_out[0] += last_known_value;
537 for (idx_t i = 1; i < decode_count; i++) {
538 buffer_out[i] = buffer_in[i] + buffer_out[i - 1];
539 }
540}
541
542void BitUnpackRange(data_ptr_t src_ptr, data_ptr_t dst_ptr, idx_t count, idx_t row, bitpacking_width_t width) {
543 auto bitunpack_src_ptr = &src_ptr[(row * width) / 8];
544 BitpackingPrimitives::UnPackBuffer<uint32_t>(dst: dst_ptr, src: bitunpack_src_ptr, count, width);
545}
546
547//===--------------------------------------------------------------------===//
548// Scan base data
549//===--------------------------------------------------------------------===//
550template <bool ALLOW_FSST_VECTORS>
551void FSSTStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result,
552 idx_t result_offset) {
553
554 auto &scan_state = state.scan_state->Cast<FSSTScanState>();
555 auto start = segment.GetRelativeIndex(row_index: state.row_index);
556
557 bool enable_fsst_vectors;
558 if (ALLOW_FSST_VECTORS) {
559 auto &config = DBConfig::GetConfig(db&: segment.db);
560 enable_fsst_vectors = config.options.enable_fsst_vectors;
561 } else {
562 enable_fsst_vectors = false;
563 }
564
565 auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset();
566 auto dict = GetDictionary(segment, handle&: scan_state.handle);
567 auto base_data = data_ptr_cast(src: baseptr + sizeof(fsst_compression_header_t));
568 string_t *result_data;
569
570 if (scan_count == 0) {
571 return;
572 }
573
574 if (enable_fsst_vectors) {
575 D_ASSERT(result_offset == 0);
576 if (scan_state.duckdb_fsst_decoder) {
577 D_ASSERT(result_offset == 0 || result.GetVectorType() == VectorType::FSST_VECTOR);
578 result.SetVectorType(VectorType::FSST_VECTOR);
579 FSSTVector::RegisterDecoder(vector&: result, duckdb_fsst_decoder&: scan_state.duckdb_fsst_decoder);
580 result_data = FSSTVector::GetCompressedData<string_t>(vector&: result);
581 } else {
582 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
583 result_data = FlatVector::GetData<string_t>(vector&: result);
584 }
585 } else {
586 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
587 result_data = FlatVector::GetData<string_t>(vector&: result);
588 }
589
590 if (start == 0 || scan_state.last_known_row >= (int64_t)start) {
591 scan_state.ResetStoredDelta();
592 }
593
594 auto offsets = CalculateBpDeltaOffsets(last_known_row: scan_state.last_known_row, start, scan_count);
595
596 auto bitunpack_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_bitunpack_count]);
597 BitUnpackRange(src_ptr: base_data, dst_ptr: data_ptr_cast(src: bitunpack_buffer.get()), count: offsets.total_bitunpack_count,
598 row: offsets.bitunpack_start_row, width: scan_state.current_width);
599 auto delta_decode_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_delta_decode_count]);
600 DeltaDecodeIndices(buffer_in: bitunpack_buffer.get() + offsets.bitunpack_alignment_offset, buffer_out: delta_decode_buffer.get(),
601 decode_count: offsets.total_delta_decode_count, last_known_value: scan_state.last_known_index);
602
603 if (enable_fsst_vectors) {
604 // Lookup decompressed offsets in dict
605 for (idx_t i = 0; i < scan_count; i++) {
606 uint32_t string_length = bitunpack_buffer[i + offsets.scan_offset];
607 result_data[i] = UncompressedStringStorage::FetchStringFromDict(
608 segment, dict, result, baseptr, dict_offset: delta_decode_buffer[i + offsets.unused_delta_decoded_values],
609 string_length);
610 FSSTVector::SetCount(vector&: result, count: scan_count);
611 }
612 } else {
613 // Just decompress
614 for (idx_t i = 0; i < scan_count; i++) {
615 uint32_t str_len = bitunpack_buffer[i + offsets.scan_offset];
616 auto str_ptr = FSSTStorage::FetchStringPointer(
617 dict, baseptr, dict_offset: delta_decode_buffer[i + offsets.unused_delta_decoded_values]);
618
619 if (str_len > 0) {
620 result_data[i + result_offset] =
621 FSSTPrimitives::DecompressValue(duckdb_fsst_decoder: scan_state.duckdb_fsst_decoder.get(), result, compressed_string: str_ptr, compressed_string_len: str_len);
622 } else {
623 result_data[i + result_offset] = string_t(nullptr, 0);
624 }
625 }
626 }
627
628 scan_state.StoreLastDelta(value: delta_decode_buffer[scan_count + offsets.unused_delta_decoded_values - 1],
629 row: start + scan_count - 1);
630}
631
632void FSSTStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) {
633 StringScanPartial<true>(segment, state, scan_count, result, result_offset: 0);
634}
635
636//===--------------------------------------------------------------------===//
637// Fetch
638//===--------------------------------------------------------------------===//
639void FSSTStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result,
640 idx_t result_idx) {
641
642 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
643 auto handle = buffer_manager.Pin(handle&: segment.block);
644 auto base_ptr = handle.Ptr() + segment.GetBlockOffset();
645 auto base_data = data_ptr_cast(src: base_ptr + sizeof(fsst_compression_header_t));
646 auto dict = GetDictionary(segment, handle);
647
648 duckdb_fsst_decoder_t decoder;
649 bitpacking_width_t width;
650 auto have_symbol_table = ParseFSSTSegmentHeader(base_ptr, decoder_out: &decoder, width_out: &width);
651
652 auto result_data = FlatVector::GetData<string_t>(vector&: result);
653
654 if (have_symbol_table) {
655 // We basically just do a scan of 1 which is kinda expensive as we need to repeatedly delta decode until we
656 // reach the row we want, we could consider a more clever caching trick if this is slow
657 auto offsets = CalculateBpDeltaOffsets(last_known_row: -1, start: row_id, scan_count: 1);
658
659 auto bitunpack_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_bitunpack_count]);
660 BitUnpackRange(src_ptr: base_data, dst_ptr: data_ptr_cast(src: bitunpack_buffer.get()), count: offsets.total_bitunpack_count,
661 row: offsets.bitunpack_start_row, width);
662 auto delta_decode_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_delta_decode_count]);
663 DeltaDecodeIndices(buffer_in: bitunpack_buffer.get() + offsets.bitunpack_alignment_offset, buffer_out: delta_decode_buffer.get(),
664 decode_count: offsets.total_delta_decode_count, last_known_value: 0);
665
666 uint32_t string_length = bitunpack_buffer[offsets.scan_offset];
667
668 string_t compressed_string = UncompressedStringStorage::FetchStringFromDict(
669 segment, dict, result, baseptr: base_ptr, dict_offset: delta_decode_buffer[offsets.unused_delta_decoded_values], string_length);
670
671 result_data[result_idx] = FSSTPrimitives::DecompressValue(duckdb_fsst_decoder: (void *)&decoder, result, compressed_string: compressed_string.GetData(),
672 compressed_string_len: compressed_string.GetSize());
673 } else {
674 // There's no fsst symtable, this only happens for empty strings or nulls, we can just emit an empty string
675 result_data[result_idx] = string_t(nullptr, 0);
676 }
677}
678
679//===--------------------------------------------------------------------===//
680// Get Function
681//===--------------------------------------------------------------------===//
682CompressionFunction FSSTFun::GetFunction(PhysicalType data_type) {
683 D_ASSERT(data_type == PhysicalType::VARCHAR);
684 return CompressionFunction(
685 CompressionType::COMPRESSION_FSST, data_type, FSSTStorage::StringInitAnalyze, FSSTStorage::StringAnalyze,
686 FSSTStorage::StringFinalAnalyze, FSSTStorage::InitCompression, FSSTStorage::Compress,
687 FSSTStorage::FinalizeCompress, FSSTStorage::StringInitScan, FSSTStorage::StringScan,
688 FSSTStorage::StringScanPartial<false>, FSSTStorage::StringFetchRow, UncompressedFunctions::EmptySkip);
689}
690
691bool FSSTFun::TypeIsSupported(PhysicalType type) {
692 return type == PhysicalType::VARCHAR;
693}
694
695//===--------------------------------------------------------------------===//
696// Helper Functions
697//===--------------------------------------------------------------------===//
698void FSSTStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container) {
699 auto header_ptr = reinterpret_cast<fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset());
700 Store<uint32_t>(val: container.size, ptr: data_ptr_cast(src: &header_ptr->dict_size));
701 Store<uint32_t>(val: container.end, ptr: data_ptr_cast(src: &header_ptr->dict_end));
702}
703
704StringDictionaryContainer FSSTStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) {
705 auto header_ptr = reinterpret_cast<fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset());
706 StringDictionaryContainer container;
707 container.size = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_size));
708 container.end = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_end));
709 return container;
710}
711
712char *FSSTStorage::FetchStringPointer(StringDictionaryContainer dict, data_ptr_t baseptr, int32_t dict_offset) {
713 if (dict_offset == 0) {
714 return nullptr;
715 }
716
717 auto dict_end = baseptr + dict.end;
718 auto dict_pos = dict_end - dict_offset;
719 return char_ptr_cast(src: dict_pos);
720}
721
722// Returns false if no symbol table was found. This means all strings are either empty or null
723bool FSSTStorage::ParseFSSTSegmentHeader(data_ptr_t base_ptr, duckdb_fsst_decoder_t *decoder_out,
724 bitpacking_width_t *width_out) {
725 auto header_ptr = reinterpret_cast<fsst_compression_header_t *>(base_ptr);
726 auto fsst_symbol_table_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->fsst_symbol_table_offset));
727 *width_out = (bitpacking_width_t)(Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)));
728 return duckdb_fsst_import(decoder: decoder_out, buf: base_ptr + fsst_symbol_table_offset);
729}
730
731// The calculation of offsets and counts while scanning or fetching is a bit tricky, for two reasons:
732// - bitunpacking needs to be aligned to BITPACKING_ALGORITHM_GROUP_SIZE
733// - delta decoding needs to decode from the last known value.
734bp_delta_offsets_t FSSTStorage::CalculateBpDeltaOffsets(int64_t last_known_row, idx_t start, idx_t scan_count) {
735 D_ASSERT((idx_t)(last_known_row + 1) <= start);
736 bp_delta_offsets_t result;
737
738 result.delta_decode_start_row = (idx_t)(last_known_row + 1);
739 result.bitunpack_alignment_offset =
740 result.delta_decode_start_row % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
741 result.bitunpack_start_row = result.delta_decode_start_row - result.bitunpack_alignment_offset;
742 result.unused_delta_decoded_values = start - result.delta_decode_start_row;
743 result.scan_offset = result.bitunpack_alignment_offset + result.unused_delta_decoded_values;
744 result.total_delta_decode_count = scan_count + result.unused_delta_decoded_values;
745 result.total_bitunpack_count =
746 BitpackingPrimitives::RoundUpToAlgorithmGroupSize<idx_t>(num_to_round: scan_count + result.scan_offset);
747
748 D_ASSERT(result.total_delta_decode_count + result.bitunpack_alignment_offset <= result.total_bitunpack_count);
749 return result;
750}
751
752} // namespace duckdb
753