| 1 | #include "duckdb/common/bitpacking.hpp" |
| 2 | #include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp" |
| 3 | #include "duckdb/storage/string_uncompressed.hpp" |
| 4 | #include "duckdb/function/compression/compression.hpp" |
| 5 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
| 6 | #include "duckdb/main/config.hpp" |
| 7 | #include "duckdb/common/constants.hpp" |
| 8 | #include "duckdb/common/random_engine.hpp" |
| 9 | #include "duckdb/common/fsst.hpp" |
| 10 | #include "miniz_wrapper.hpp" |
| 11 | #include "fsst.h" |
| 12 | |
| 13 | namespace duckdb { |
| 14 | |
| 15 | typedef struct { |
| 16 | uint32_t dict_size; |
| 17 | uint32_t dict_end; |
| 18 | uint32_t bitpacking_width; |
| 19 | uint32_t fsst_symbol_table_offset; |
| 20 | } ; |
| 21 | |
| 22 | // Counts and offsets used during scanning/fetching |
| 23 | // | ColumnSegment to be scanned / fetched from | |
| 24 | // | untouched | bp align | unused d-values | to scan | bp align | untouched | |
| 25 | typedef struct BPDeltaDecodeOffsets { |
| 26 | idx_t delta_decode_start_row; // X |
| 27 | idx_t bitunpack_alignment_offset; // <---------> |
| 28 | idx_t bitunpack_start_row; // X |
| 29 | idx_t unused_delta_decoded_values; // <-----------------> |
| 30 | idx_t scan_offset; // <----------------------------> |
| 31 | idx_t total_delta_decode_count; // <--------------------------> |
| 32 | idx_t total_bitunpack_count; // <------------------------------------------------> |
| 33 | } bp_delta_offsets_t; |
| 34 | |
| 35 | struct FSSTStorage { |
| 36 | static constexpr size_t COMPACTION_FLUSH_LIMIT = (size_t)Storage::BLOCK_SIZE / 5 * 4; |
| 37 | static constexpr double MINIMUM_COMPRESSION_RATIO = 1.2; |
| 38 | static constexpr double ANALYSIS_SAMPLE_SIZE = 0.25; |
| 39 | |
| 40 | static unique_ptr<AnalyzeState> StringInitAnalyze(ColumnData &col_data, PhysicalType type); |
| 41 | static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); |
| 42 | static idx_t StringFinalAnalyze(AnalyzeState &state_p); |
| 43 | |
| 44 | static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer, |
| 45 | unique_ptr<AnalyzeState> analyze_state_p); |
| 46 | static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); |
| 47 | static void FinalizeCompress(CompressionState &state_p); |
| 48 | |
| 49 | static unique_ptr<SegmentScanState> StringInitScan(ColumnSegment &segment); |
| 50 | template <bool ALLOW_FSST_VECTORS = false> |
| 51 | static void StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
| 52 | idx_t result_offset); |
| 53 | static void StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result); |
| 54 | static void StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
| 55 | idx_t result_idx); |
| 56 | |
| 57 | static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container); |
| 58 | static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle); |
| 59 | |
| 60 | static char *FetchStringPointer(StringDictionaryContainer dict, data_ptr_t baseptr, int32_t dict_offset); |
| 61 | static bp_delta_offsets_t CalculateBpDeltaOffsets(int64_t last_known_row, idx_t start, idx_t scan_count); |
| 62 | static bool ParseFSSTSegmentHeader(data_ptr_t base_ptr, duckdb_fsst_decoder_t *decoder_out, |
| 63 | bitpacking_width_t *width_out); |
| 64 | }; |
| 65 | |
| 66 | //===--------------------------------------------------------------------===// |
| 67 | // Analyze |
| 68 | //===--------------------------------------------------------------------===// |
| 69 | struct FSSTAnalyzeState : public AnalyzeState { |
| 70 | FSSTAnalyzeState() : count(0), fsst_string_total_size(0), empty_strings(0) { |
| 71 | } |
| 72 | |
| 73 | ~FSSTAnalyzeState() override { |
| 74 | if (fsst_encoder) { |
| 75 | duckdb_fsst_destroy(fsst_encoder); |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | duckdb_fsst_encoder_t *fsst_encoder = nullptr; |
| 80 | idx_t count; |
| 81 | |
| 82 | StringHeap fsst_string_heap; |
| 83 | vector<string_t> fsst_strings; |
| 84 | size_t fsst_string_total_size; |
| 85 | |
| 86 | RandomEngine random_engine; |
| 87 | bool have_valid_row = false; |
| 88 | |
| 89 | idx_t empty_strings; |
| 90 | }; |
| 91 | |
| 92 | unique_ptr<AnalyzeState> FSSTStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { |
| 93 | return make_uniq<FSSTAnalyzeState>(); |
| 94 | } |
| 95 | |
| 96 | bool FSSTStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { |
| 97 | auto &state = state_p.Cast<FSSTAnalyzeState>(); |
| 98 | UnifiedVectorFormat vdata; |
| 99 | input.ToUnifiedFormat(count, data&: vdata); |
| 100 | |
| 101 | state.count += count; |
| 102 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
| 103 | |
| 104 | // Note that we ignore the sampling in case we have not found any valid strings yet, this solves the issue of |
| 105 | // not having seen any valid strings here leading to an empty fsst symbol table. |
| 106 | bool sample_selected = !state.have_valid_row || state.random_engine.NextRandom() < ANALYSIS_SAMPLE_SIZE; |
| 107 | |
| 108 | for (idx_t i = 0; i < count; i++) { |
| 109 | auto idx = vdata.sel->get_index(idx: i); |
| 110 | |
| 111 | if (!vdata.validity.RowIsValid(row_idx: idx)) { |
| 112 | continue; |
| 113 | } |
| 114 | |
| 115 | // We need to check all strings for this, otherwise we run in to trouble during compression if we miss ones |
| 116 | auto string_size = data[idx].GetSize(); |
| 117 | if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) { |
| 118 | return false; |
| 119 | } |
| 120 | |
| 121 | if (!sample_selected) { |
| 122 | continue; |
| 123 | } |
| 124 | |
| 125 | if (string_size > 0) { |
| 126 | state.have_valid_row = true; |
| 127 | if (data[idx].IsInlined()) { |
| 128 | state.fsst_strings.push_back(x: data[idx]); |
| 129 | } else { |
| 130 | state.fsst_strings.emplace_back(args: state.fsst_string_heap.AddBlob(data: data[idx])); |
| 131 | } |
| 132 | state.fsst_string_total_size += string_size; |
| 133 | } else { |
| 134 | state.empty_strings++; |
| 135 | } |
| 136 | } |
| 137 | return true; |
| 138 | } |
| 139 | |
| 140 | idx_t FSSTStorage::StringFinalAnalyze(AnalyzeState &state_p) { |
| 141 | auto &state = state_p.Cast<FSSTAnalyzeState>(); |
| 142 | |
| 143 | size_t compressed_dict_size = 0; |
| 144 | size_t max_compressed_string_length = 0; |
| 145 | |
| 146 | auto string_count = state.fsst_strings.size(); |
| 147 | |
| 148 | if (!string_count) { |
| 149 | return DConstants::INVALID_INDEX; |
| 150 | } |
| 151 | |
| 152 | size_t output_buffer_size = 7 + 2 * state.fsst_string_total_size; // size as specified in fsst.h |
| 153 | |
| 154 | vector<size_t> fsst_string_sizes; |
| 155 | vector<unsigned char *> fsst_string_ptrs; |
| 156 | for (auto &str : state.fsst_strings) { |
| 157 | fsst_string_sizes.push_back(x: str.GetSize()); |
| 158 | fsst_string_ptrs.push_back(x: (unsigned char *)str.GetData()); // NOLINT |
| 159 | } |
| 160 | |
| 161 | state.fsst_encoder = duckdb_fsst_create(n: string_count, lenIn: &fsst_string_sizes[0], strIn: &fsst_string_ptrs[0], zeroTerminated: 0); |
| 162 | |
| 163 | // TODO: do we really need to encode to get a size estimate? |
| 164 | auto compressed_ptrs = vector<unsigned char *>(string_count, nullptr); |
| 165 | auto compressed_sizes = vector<size_t>(string_count, 0); |
| 166 | unique_ptr<unsigned char[]> compressed_buffer(new unsigned char[output_buffer_size]); |
| 167 | |
| 168 | auto res = |
| 169 | duckdb_fsst_compress(encoder: state.fsst_encoder, nstrings: string_count, lenIn: &fsst_string_sizes[0], strIn: &fsst_string_ptrs[0], |
| 170 | outsize: output_buffer_size, output: compressed_buffer.get(), lenOut: &compressed_sizes[0], strOut: &compressed_ptrs[0]); |
| 171 | |
| 172 | if (string_count != res) { |
| 173 | throw std::runtime_error("FSST output buffer is too small unexpectedly" ); |
| 174 | } |
| 175 | |
| 176 | // Sum and and Max compressed lengths |
| 177 | for (auto &size : compressed_sizes) { |
| 178 | compressed_dict_size += size; |
| 179 | max_compressed_string_length = MaxValue(a: max_compressed_string_length, b: size); |
| 180 | } |
| 181 | D_ASSERT(compressed_dict_size == (compressed_ptrs[res - 1] - compressed_ptrs[0]) + compressed_sizes[res - 1]); |
| 182 | |
| 183 | auto minimum_width = BitpackingPrimitives::MinimumBitWidth(value: max_compressed_string_length); |
| 184 | auto bitpacked_offsets_size = |
| 185 | BitpackingPrimitives::GetRequiredSize(count: string_count + state.empty_strings, width: minimum_width); |
| 186 | |
| 187 | auto estimated_base_size = (bitpacked_offsets_size + compressed_dict_size) * (1 / ANALYSIS_SAMPLE_SIZE); |
| 188 | auto num_blocks = estimated_base_size / (Storage::BLOCK_SIZE - sizeof(duckdb_fsst_decoder_t)); |
| 189 | auto symtable_size = num_blocks * sizeof(duckdb_fsst_decoder_t); |
| 190 | |
| 191 | auto estimated_size = estimated_base_size + symtable_size; |
| 192 | |
| 193 | return estimated_size * MINIMUM_COMPRESSION_RATIO; |
| 194 | } |
| 195 | |
| 196 | //===--------------------------------------------------------------------===// |
| 197 | // Compress |
| 198 | //===--------------------------------------------------------------------===// |
| 199 | |
| 200 | class FSSTCompressionState : public CompressionState { |
| 201 | public: |
| 202 | explicit FSSTCompressionState(ColumnDataCheckpointer &checkpointer) |
| 203 | : checkpointer(checkpointer), function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_FSST)) { |
| 204 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
| 205 | } |
| 206 | |
| 207 | ~FSSTCompressionState() override { |
| 208 | if (fsst_encoder) { |
| 209 | duckdb_fsst_destroy(fsst_encoder); |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | void Reset() { |
| 214 | index_buffer.clear(); |
| 215 | current_width = 0; |
| 216 | max_compressed_string_length = 0; |
| 217 | last_fitting_size = 0; |
| 218 | |
| 219 | // Reset the pointers into the current segment |
| 220 | auto &buffer_manager = BufferManager::GetBufferManager(db&: current_segment->db); |
| 221 | current_handle = buffer_manager.Pin(handle&: current_segment->block); |
| 222 | current_dictionary = FSSTStorage::GetDictionary(segment&: *current_segment, handle&: current_handle); |
| 223 | current_end_ptr = current_handle.Ptr() + current_dictionary.end; |
| 224 | } |
| 225 | |
| 226 | void CreateEmptySegment(idx_t row_start) { |
| 227 | auto &db = checkpointer.GetDatabase(); |
| 228 | auto &type = checkpointer.GetType(); |
| 229 | auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
| 230 | current_segment = std::move(compressed_segment); |
| 231 | current_segment->function = function; |
| 232 | Reset(); |
| 233 | } |
| 234 | |
| 235 | void UpdateState(string_t uncompressed_string, unsigned char *compressed_string, size_t compressed_string_len) { |
| 236 | if (!HasEnoughSpace(string_len: compressed_string_len)) { |
| 237 | Flush(); |
| 238 | if (!HasEnoughSpace(string_len: compressed_string_len)) { |
| 239 | throw InternalException("FSST string compression failed due to insufficient space in empty block" ); |
| 240 | }; |
| 241 | } |
| 242 | |
| 243 | UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: uncompressed_string); |
| 244 | |
| 245 | // Write string into dictionary |
| 246 | current_dictionary.size += compressed_string_len; |
| 247 | auto dict_pos = current_end_ptr - current_dictionary.size; |
| 248 | memcpy(dest: dict_pos, src: compressed_string, n: compressed_string_len); |
| 249 | current_dictionary.Verify(); |
| 250 | |
| 251 | // We just push the string length to effectively delta encode the strings |
| 252 | index_buffer.push_back(x: compressed_string_len); |
| 253 | |
| 254 | max_compressed_string_length = MaxValue(a: max_compressed_string_length, b: compressed_string_len); |
| 255 | |
| 256 | current_width = BitpackingPrimitives::MinimumBitWidth(value: max_compressed_string_length); |
| 257 | current_segment->count++; |
| 258 | } |
| 259 | |
| 260 | void AddNull() { |
| 261 | if (!HasEnoughSpace(string_len: 0)) { |
| 262 | Flush(); |
| 263 | if (!HasEnoughSpace(string_len: 0)) { |
| 264 | throw InternalException("FSST string compression failed due to insufficient space in empty block" ); |
| 265 | }; |
| 266 | } |
| 267 | index_buffer.push_back(x: 0); |
| 268 | current_segment->count++; |
| 269 | } |
| 270 | |
| 271 | void AddEmptyString() { |
| 272 | AddNull(); |
| 273 | UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: "" ); |
| 274 | } |
| 275 | |
| 276 | size_t GetRequiredSize(size_t string_len) { |
| 277 | bitpacking_width_t required_minimum_width; |
| 278 | if (string_len > max_compressed_string_length) { |
| 279 | required_minimum_width = BitpackingPrimitives::MinimumBitWidth(value: string_len); |
| 280 | } else { |
| 281 | required_minimum_width = current_width; |
| 282 | } |
| 283 | |
| 284 | size_t current_dict_size = current_dictionary.size; |
| 285 | idx_t current_string_count = index_buffer.size(); |
| 286 | |
| 287 | size_t dict_offsets_size = |
| 288 | BitpackingPrimitives::GetRequiredSize(count: current_string_count + 1, width: required_minimum_width); |
| 289 | |
| 290 | // TODO switch to a symbol table per RowGroup, saves a bit of space |
| 291 | return sizeof(fsst_compression_header_t) + current_dict_size + dict_offsets_size + string_len + |
| 292 | fsst_serialized_symbol_table_size; |
| 293 | } |
| 294 | |
| 295 | // Checks if there is enough space, if there is, sets last_fitting_size |
| 296 | bool HasEnoughSpace(size_t string_len) { |
| 297 | auto required_size = GetRequiredSize(string_len); |
| 298 | |
| 299 | if (required_size <= Storage::BLOCK_SIZE) { |
| 300 | last_fitting_size = required_size; |
| 301 | return true; |
| 302 | } |
| 303 | return false; |
| 304 | } |
| 305 | |
| 306 | void Flush(bool final = false) { |
| 307 | auto next_start = current_segment->start + current_segment->count; |
| 308 | |
| 309 | auto segment_size = Finalize(); |
| 310 | auto &state = checkpointer.GetCheckpointState(); |
| 311 | state.FlushSegment(segment: std::move(current_segment), segment_size); |
| 312 | |
| 313 | if (!final) { |
| 314 | CreateEmptySegment(row_start: next_start); |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | idx_t Finalize() { |
| 319 | auto &buffer_manager = BufferManager::GetBufferManager(db&: current_segment->db); |
| 320 | auto handle = buffer_manager.Pin(handle&: current_segment->block); |
| 321 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
| 322 | |
| 323 | // calculate sizes |
| 324 | auto compressed_index_buffer_size = |
| 325 | BitpackingPrimitives::GetRequiredSize(count: current_segment->count, width: current_width); |
| 326 | auto total_size = sizeof(fsst_compression_header_t) + compressed_index_buffer_size + current_dictionary.size + |
| 327 | fsst_serialized_symbol_table_size; |
| 328 | |
| 329 | if (total_size != last_fitting_size) { |
| 330 | throw InternalException("FSST string compression failed due to incorrect size calculation" ); |
| 331 | } |
| 332 | |
| 333 | // calculate ptr and offsets |
| 334 | auto base_ptr = handle.Ptr(); |
| 335 | auto = reinterpret_cast<fsst_compression_header_t *>(base_ptr); |
| 336 | auto compressed_index_buffer_offset = sizeof(fsst_compression_header_t); |
| 337 | auto symbol_table_offset = compressed_index_buffer_offset + compressed_index_buffer_size; |
| 338 | |
| 339 | D_ASSERT(current_segment->count == index_buffer.size()); |
| 340 | BitpackingPrimitives::PackBuffer<sel_t, false>(dst: base_ptr + compressed_index_buffer_offset, |
| 341 | src: reinterpret_cast<uint32_t *>(index_buffer.data()), |
| 342 | count: current_segment->count, width: current_width); |
| 343 | |
| 344 | // Write the fsst symbol table or nothing |
| 345 | if (fsst_encoder != nullptr) { |
| 346 | memcpy(dest: base_ptr + symbol_table_offset, src: &fsst_serialized_symbol_table[0], n: fsst_serialized_symbol_table_size); |
| 347 | } else { |
| 348 | memset(s: base_ptr + symbol_table_offset, c: 0, n: fsst_serialized_symbol_table_size); |
| 349 | } |
| 350 | |
| 351 | Store<uint32_t>(val: symbol_table_offset, ptr: data_ptr_cast(src: &header_ptr->fsst_symbol_table_offset)); |
| 352 | Store<uint32_t>(val: (uint32_t)current_width, ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)); |
| 353 | |
| 354 | if (total_size >= FSSTStorage::COMPACTION_FLUSH_LIMIT) { |
| 355 | // the block is full enough, don't bother moving around the dictionary |
| 356 | return Storage::BLOCK_SIZE; |
| 357 | } |
| 358 | // the block has space left: figure out how much space we can save |
| 359 | auto move_amount = Storage::BLOCK_SIZE - total_size; |
| 360 | // move the dictionary so it lines up exactly with the offsets |
| 361 | auto new_dictionary_offset = symbol_table_offset + fsst_serialized_symbol_table_size; |
| 362 | memmove(dest: base_ptr + new_dictionary_offset, src: base_ptr + current_dictionary.end - current_dictionary.size, |
| 363 | n: current_dictionary.size); |
| 364 | current_dictionary.end -= move_amount; |
| 365 | D_ASSERT(current_dictionary.end == total_size); |
| 366 | // write the new dictionary (with the updated "end") |
| 367 | FSSTStorage::SetDictionary(segment&: *current_segment, handle, container: current_dictionary); |
| 368 | |
| 369 | return total_size; |
| 370 | } |
| 371 | |
| 372 | ColumnDataCheckpointer &checkpointer; |
| 373 | CompressionFunction &function; |
| 374 | |
| 375 | // State regarding current segment |
| 376 | unique_ptr<ColumnSegment> current_segment; |
| 377 | BufferHandle current_handle; |
| 378 | StringDictionaryContainer current_dictionary; |
| 379 | data_ptr_t current_end_ptr; |
| 380 | |
| 381 | // Buffers and map for current segment |
| 382 | vector<uint32_t> index_buffer; |
| 383 | |
| 384 | size_t max_compressed_string_length; |
| 385 | bitpacking_width_t current_width; |
| 386 | idx_t last_fitting_size; |
| 387 | |
| 388 | duckdb_fsst_encoder_t *fsst_encoder = nullptr; |
| 389 | unsigned char fsst_serialized_symbol_table[sizeof(duckdb_fsst_decoder_t)]; |
| 390 | size_t fsst_serialized_symbol_table_size = sizeof(duckdb_fsst_decoder_t); |
| 391 | }; |
| 392 | |
| 393 | unique_ptr<CompressionState> FSSTStorage::InitCompression(ColumnDataCheckpointer &checkpointer, |
| 394 | unique_ptr<AnalyzeState> analyze_state_p) { |
| 395 | auto analyze_state = static_cast<FSSTAnalyzeState *>(analyze_state_p.get()); |
| 396 | auto compression_state = make_uniq<FSSTCompressionState>(args&: checkpointer); |
| 397 | |
| 398 | if (analyze_state->fsst_encoder == nullptr) { |
| 399 | throw InternalException("No encoder found during FSST compression" ); |
| 400 | } |
| 401 | |
| 402 | compression_state->fsst_encoder = analyze_state->fsst_encoder; |
| 403 | compression_state->fsst_serialized_symbol_table_size = |
| 404 | duckdb_fsst_export(encoder: compression_state->fsst_encoder, buf: &compression_state->fsst_serialized_symbol_table[0]); |
| 405 | analyze_state->fsst_encoder = nullptr; |
| 406 | |
| 407 | return std::move(compression_state); |
| 408 | } |
| 409 | |
| 410 | void FSSTStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
| 411 | auto &state = state_p.Cast<FSSTCompressionState>(); |
| 412 | |
| 413 | // Get vector data |
| 414 | UnifiedVectorFormat vdata; |
| 415 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
| 416 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
| 417 | |
| 418 | // Collect pointers to strings to compress |
| 419 | vector<size_t> sizes_in; |
| 420 | vector<unsigned char *> strings_in; |
| 421 | size_t total_size = 0; |
| 422 | idx_t total_count = 0; |
| 423 | for (idx_t i = 0; i < count; i++) { |
| 424 | auto idx = vdata.sel->get_index(idx: i); |
| 425 | |
| 426 | // Note: we treat nulls and empty strings the same |
| 427 | if (!vdata.validity.RowIsValid(row_idx: idx) || data[idx].GetSize() == 0) { |
| 428 | continue; |
| 429 | } |
| 430 | |
| 431 | total_count++; |
| 432 | total_size += data[idx].GetSize(); |
| 433 | sizes_in.push_back(x: data[idx].GetSize()); |
| 434 | strings_in.push_back(x: (unsigned char *)data[idx].GetData()); // NOLINT |
| 435 | } |
| 436 | |
| 437 | // Only Nulls or empty strings in this vector, nothing to compress |
| 438 | if (total_count == 0) { |
| 439 | for (idx_t i = 0; i < count; i++) { |
| 440 | auto idx = vdata.sel->get_index(idx: i); |
| 441 | if (!vdata.validity.RowIsValid(row_idx: idx)) { |
| 442 | state.AddNull(); |
| 443 | } else if (data[idx].GetSize() == 0) { |
| 444 | state.AddEmptyString(); |
| 445 | } else { |
| 446 | throw FatalException("FSST: no encoder found even though there are values to encode" ); |
| 447 | } |
| 448 | } |
| 449 | return; |
| 450 | } |
| 451 | |
| 452 | // Compress buffers |
| 453 | size_t compress_buffer_size = MaxValue<size_t>(a: total_size * 2 + 7, b: 1); |
| 454 | vector<unsigned char *> strings_out(total_count, nullptr); |
| 455 | vector<size_t> sizes_out(total_count, 0); |
| 456 | vector<unsigned char> compress_buffer(compress_buffer_size, 0); |
| 457 | |
| 458 | auto res = duckdb_fsst_compress( |
| 459 | encoder: state.fsst_encoder, /* IN: encoder obtained from duckdb_fsst_create(). */ |
| 460 | nstrings: total_count, /* IN: number of strings in batch to compress. */ |
| 461 | lenIn: &sizes_in[0], /* IN: byte-lengths of the inputs */ |
| 462 | strIn: &strings_in[0], /* IN: input string start pointers. */ |
| 463 | outsize: compress_buffer_size, /* IN: byte-length of output buffer. */ |
| 464 | output: &compress_buffer[0], /* OUT: memorxy buffer to put the compressed strings in (one after the other). */ |
| 465 | lenOut: &sizes_out[0], /* OUT: byte-lengths of the compressed strings. */ |
| 466 | strOut: &strings_out[0] /* OUT: output string start pointers. Will all point into [output,output+size). */ |
| 467 | ); |
| 468 | |
| 469 | if (res != total_count) { |
| 470 | throw FatalException("FSST compression failed to compress all strings" ); |
| 471 | } |
| 472 | |
| 473 | // Push the compressed strings to the compression state one by one |
| 474 | idx_t compressed_idx = 0; |
| 475 | for (idx_t i = 0; i < count; i++) { |
| 476 | auto idx = vdata.sel->get_index(idx: i); |
| 477 | if (!vdata.validity.RowIsValid(row_idx: idx)) { |
| 478 | state.AddNull(); |
| 479 | } else if (data[idx].GetSize() == 0) { |
| 480 | state.AddEmptyString(); |
| 481 | } else { |
| 482 | state.UpdateState(uncompressed_string: data[idx], compressed_string: strings_out[compressed_idx], compressed_string_len: sizes_out[compressed_idx]); |
| 483 | compressed_idx++; |
| 484 | } |
| 485 | } |
| 486 | } |
| 487 | |
| 488 | void FSSTStorage::FinalizeCompress(CompressionState &state_p) { |
| 489 | auto &state = state_p.Cast<FSSTCompressionState>(); |
| 490 | state.Flush(final: true); |
| 491 | } |
| 492 | |
| 493 | //===--------------------------------------------------------------------===// |
| 494 | // Scan |
| 495 | //===--------------------------------------------------------------------===// |
| 496 | struct FSSTScanState : public StringScanState { |
| 497 | FSSTScanState() { |
| 498 | ResetStoredDelta(); |
| 499 | } |
| 500 | |
| 501 | buffer_ptr<void> duckdb_fsst_decoder; |
| 502 | bitpacking_width_t current_width; |
| 503 | |
| 504 | // To speed up delta decoding we store the last index |
| 505 | uint32_t last_known_index; |
| 506 | int64_t last_known_row; |
| 507 | |
| 508 | void StoreLastDelta(uint32_t value, int64_t row) { |
| 509 | last_known_index = value; |
| 510 | last_known_row = row; |
| 511 | } |
| 512 | void ResetStoredDelta() { |
| 513 | last_known_index = 0; |
| 514 | last_known_row = -1; |
| 515 | } |
| 516 | }; |
| 517 | |
| 518 | unique_ptr<SegmentScanState> FSSTStorage::StringInitScan(ColumnSegment &segment) { |
| 519 | auto state = make_uniq<FSSTScanState>(); |
| 520 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 521 | state->handle = buffer_manager.Pin(handle&: segment.block); |
| 522 | auto base_ptr = state->handle.Ptr() + segment.GetBlockOffset(); |
| 523 | |
| 524 | state->duckdb_fsst_decoder = make_buffer<duckdb_fsst_decoder_t>(); |
| 525 | auto retval = ParseFSSTSegmentHeader( |
| 526 | base_ptr, decoder_out: reinterpret_cast<duckdb_fsst_decoder_t *>(state->duckdb_fsst_decoder.get()), width_out: &state->current_width); |
| 527 | if (!retval) { |
| 528 | state->duckdb_fsst_decoder = nullptr; |
| 529 | } |
| 530 | |
| 531 | return std::move(state); |
| 532 | } |
| 533 | |
| 534 | void DeltaDecodeIndices(uint32_t *buffer_in, uint32_t *buffer_out, idx_t decode_count, uint32_t last_known_value) { |
| 535 | buffer_out[0] = buffer_in[0]; |
| 536 | buffer_out[0] += last_known_value; |
| 537 | for (idx_t i = 1; i < decode_count; i++) { |
| 538 | buffer_out[i] = buffer_in[i] + buffer_out[i - 1]; |
| 539 | } |
| 540 | } |
| 541 | |
| 542 | void BitUnpackRange(data_ptr_t src_ptr, data_ptr_t dst_ptr, idx_t count, idx_t row, bitpacking_width_t width) { |
| 543 | auto bitunpack_src_ptr = &src_ptr[(row * width) / 8]; |
| 544 | BitpackingPrimitives::UnPackBuffer<uint32_t>(dst: dst_ptr, src: bitunpack_src_ptr, count, width); |
| 545 | } |
| 546 | |
| 547 | //===--------------------------------------------------------------------===// |
| 548 | // Scan base data |
| 549 | //===--------------------------------------------------------------------===// |
| 550 | template <bool ALLOW_FSST_VECTORS> |
| 551 | void FSSTStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
| 552 | idx_t result_offset) { |
| 553 | |
| 554 | auto &scan_state = state.scan_state->Cast<FSSTScanState>(); |
| 555 | auto start = segment.GetRelativeIndex(row_index: state.row_index); |
| 556 | |
| 557 | bool enable_fsst_vectors; |
| 558 | if (ALLOW_FSST_VECTORS) { |
| 559 | auto &config = DBConfig::GetConfig(db&: segment.db); |
| 560 | enable_fsst_vectors = config.options.enable_fsst_vectors; |
| 561 | } else { |
| 562 | enable_fsst_vectors = false; |
| 563 | } |
| 564 | |
| 565 | auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
| 566 | auto dict = GetDictionary(segment, handle&: scan_state.handle); |
| 567 | auto base_data = data_ptr_cast(src: baseptr + sizeof(fsst_compression_header_t)); |
| 568 | string_t *result_data; |
| 569 | |
| 570 | if (scan_count == 0) { |
| 571 | return; |
| 572 | } |
| 573 | |
| 574 | if (enable_fsst_vectors) { |
| 575 | D_ASSERT(result_offset == 0); |
| 576 | if (scan_state.duckdb_fsst_decoder) { |
| 577 | D_ASSERT(result_offset == 0 || result.GetVectorType() == VectorType::FSST_VECTOR); |
| 578 | result.SetVectorType(VectorType::FSST_VECTOR); |
| 579 | FSSTVector::RegisterDecoder(vector&: result, duckdb_fsst_decoder&: scan_state.duckdb_fsst_decoder); |
| 580 | result_data = FSSTVector::GetCompressedData<string_t>(vector&: result); |
| 581 | } else { |
| 582 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
| 583 | result_data = FlatVector::GetData<string_t>(vector&: result); |
| 584 | } |
| 585 | } else { |
| 586 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
| 587 | result_data = FlatVector::GetData<string_t>(vector&: result); |
| 588 | } |
| 589 | |
| 590 | if (start == 0 || scan_state.last_known_row >= (int64_t)start) { |
| 591 | scan_state.ResetStoredDelta(); |
| 592 | } |
| 593 | |
| 594 | auto offsets = CalculateBpDeltaOffsets(last_known_row: scan_state.last_known_row, start, scan_count); |
| 595 | |
| 596 | auto bitunpack_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_bitunpack_count]); |
| 597 | BitUnpackRange(src_ptr: base_data, dst_ptr: data_ptr_cast(src: bitunpack_buffer.get()), count: offsets.total_bitunpack_count, |
| 598 | row: offsets.bitunpack_start_row, width: scan_state.current_width); |
| 599 | auto delta_decode_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_delta_decode_count]); |
| 600 | DeltaDecodeIndices(buffer_in: bitunpack_buffer.get() + offsets.bitunpack_alignment_offset, buffer_out: delta_decode_buffer.get(), |
| 601 | decode_count: offsets.total_delta_decode_count, last_known_value: scan_state.last_known_index); |
| 602 | |
| 603 | if (enable_fsst_vectors) { |
| 604 | // Lookup decompressed offsets in dict |
| 605 | for (idx_t i = 0; i < scan_count; i++) { |
| 606 | uint32_t string_length = bitunpack_buffer[i + offsets.scan_offset]; |
| 607 | result_data[i] = UncompressedStringStorage::FetchStringFromDict( |
| 608 | segment, dict, result, baseptr, dict_offset: delta_decode_buffer[i + offsets.unused_delta_decoded_values], |
| 609 | string_length); |
| 610 | FSSTVector::SetCount(vector&: result, count: scan_count); |
| 611 | } |
| 612 | } else { |
| 613 | // Just decompress |
| 614 | for (idx_t i = 0; i < scan_count; i++) { |
| 615 | uint32_t str_len = bitunpack_buffer[i + offsets.scan_offset]; |
| 616 | auto str_ptr = FSSTStorage::FetchStringPointer( |
| 617 | dict, baseptr, dict_offset: delta_decode_buffer[i + offsets.unused_delta_decoded_values]); |
| 618 | |
| 619 | if (str_len > 0) { |
| 620 | result_data[i + result_offset] = |
| 621 | FSSTPrimitives::DecompressValue(duckdb_fsst_decoder: scan_state.duckdb_fsst_decoder.get(), result, compressed_string: str_ptr, compressed_string_len: str_len); |
| 622 | } else { |
| 623 | result_data[i + result_offset] = string_t(nullptr, 0); |
| 624 | } |
| 625 | } |
| 626 | } |
| 627 | |
| 628 | scan_state.StoreLastDelta(value: delta_decode_buffer[scan_count + offsets.unused_delta_decoded_values - 1], |
| 629 | row: start + scan_count - 1); |
| 630 | } |
| 631 | |
| 632 | void FSSTStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { |
| 633 | StringScanPartial<true>(segment, state, scan_count, result, result_offset: 0); |
| 634 | } |
| 635 | |
| 636 | //===--------------------------------------------------------------------===// |
| 637 | // Fetch |
| 638 | //===--------------------------------------------------------------------===// |
| 639 | void FSSTStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
| 640 | idx_t result_idx) { |
| 641 | |
| 642 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 643 | auto handle = buffer_manager.Pin(handle&: segment.block); |
| 644 | auto base_ptr = handle.Ptr() + segment.GetBlockOffset(); |
| 645 | auto base_data = data_ptr_cast(src: base_ptr + sizeof(fsst_compression_header_t)); |
| 646 | auto dict = GetDictionary(segment, handle); |
| 647 | |
| 648 | duckdb_fsst_decoder_t decoder; |
| 649 | bitpacking_width_t width; |
| 650 | auto have_symbol_table = ParseFSSTSegmentHeader(base_ptr, decoder_out: &decoder, width_out: &width); |
| 651 | |
| 652 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
| 653 | |
| 654 | if (have_symbol_table) { |
| 655 | // We basically just do a scan of 1 which is kinda expensive as we need to repeatedly delta decode until we |
| 656 | // reach the row we want, we could consider a more clever caching trick if this is slow |
| 657 | auto offsets = CalculateBpDeltaOffsets(last_known_row: -1, start: row_id, scan_count: 1); |
| 658 | |
| 659 | auto bitunpack_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_bitunpack_count]); |
| 660 | BitUnpackRange(src_ptr: base_data, dst_ptr: data_ptr_cast(src: bitunpack_buffer.get()), count: offsets.total_bitunpack_count, |
| 661 | row: offsets.bitunpack_start_row, width); |
| 662 | auto delta_decode_buffer = unique_ptr<uint32_t[]>(new uint32_t[offsets.total_delta_decode_count]); |
| 663 | DeltaDecodeIndices(buffer_in: bitunpack_buffer.get() + offsets.bitunpack_alignment_offset, buffer_out: delta_decode_buffer.get(), |
| 664 | decode_count: offsets.total_delta_decode_count, last_known_value: 0); |
| 665 | |
| 666 | uint32_t string_length = bitunpack_buffer[offsets.scan_offset]; |
| 667 | |
| 668 | string_t compressed_string = UncompressedStringStorage::FetchStringFromDict( |
| 669 | segment, dict, result, baseptr: base_ptr, dict_offset: delta_decode_buffer[offsets.unused_delta_decoded_values], string_length); |
| 670 | |
| 671 | result_data[result_idx] = FSSTPrimitives::DecompressValue(duckdb_fsst_decoder: (void *)&decoder, result, compressed_string: compressed_string.GetData(), |
| 672 | compressed_string_len: compressed_string.GetSize()); |
| 673 | } else { |
| 674 | // There's no fsst symtable, this only happens for empty strings or nulls, we can just emit an empty string |
| 675 | result_data[result_idx] = string_t(nullptr, 0); |
| 676 | } |
| 677 | } |
| 678 | |
| 679 | //===--------------------------------------------------------------------===// |
| 680 | // Get Function |
| 681 | //===--------------------------------------------------------------------===// |
| 682 | CompressionFunction FSSTFun::GetFunction(PhysicalType data_type) { |
| 683 | D_ASSERT(data_type == PhysicalType::VARCHAR); |
| 684 | return CompressionFunction( |
| 685 | CompressionType::COMPRESSION_FSST, data_type, FSSTStorage::StringInitAnalyze, FSSTStorage::StringAnalyze, |
| 686 | FSSTStorage::StringFinalAnalyze, FSSTStorage::InitCompression, FSSTStorage::Compress, |
| 687 | FSSTStorage::FinalizeCompress, FSSTStorage::StringInitScan, FSSTStorage::StringScan, |
| 688 | FSSTStorage::StringScanPartial<false>, FSSTStorage::StringFetchRow, UncompressedFunctions::EmptySkip); |
| 689 | } |
| 690 | |
| 691 | bool FSSTFun::TypeIsSupported(PhysicalType type) { |
| 692 | return type == PhysicalType::VARCHAR; |
| 693 | } |
| 694 | |
| 695 | //===--------------------------------------------------------------------===// |
| 696 | // Helper Functions |
| 697 | //===--------------------------------------------------------------------===// |
| 698 | void FSSTStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container) { |
| 699 | auto = reinterpret_cast<fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
| 700 | Store<uint32_t>(val: container.size, ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
| 701 | Store<uint32_t>(val: container.end, ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
| 702 | } |
| 703 | |
| 704 | StringDictionaryContainer FSSTStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) { |
| 705 | auto = reinterpret_cast<fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
| 706 | StringDictionaryContainer container; |
| 707 | container.size = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
| 708 | container.end = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
| 709 | return container; |
| 710 | } |
| 711 | |
| 712 | char *FSSTStorage::FetchStringPointer(StringDictionaryContainer dict, data_ptr_t baseptr, int32_t dict_offset) { |
| 713 | if (dict_offset == 0) { |
| 714 | return nullptr; |
| 715 | } |
| 716 | |
| 717 | auto dict_end = baseptr + dict.end; |
| 718 | auto dict_pos = dict_end - dict_offset; |
| 719 | return char_ptr_cast(src: dict_pos); |
| 720 | } |
| 721 | |
| 722 | // Returns false if no symbol table was found. This means all strings are either empty or null |
| 723 | bool FSSTStorage::(data_ptr_t base_ptr, duckdb_fsst_decoder_t *decoder_out, |
| 724 | bitpacking_width_t *width_out) { |
| 725 | auto = reinterpret_cast<fsst_compression_header_t *>(base_ptr); |
| 726 | auto fsst_symbol_table_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->fsst_symbol_table_offset)); |
| 727 | *width_out = (bitpacking_width_t)(Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width))); |
| 728 | return duckdb_fsst_import(decoder: decoder_out, buf: base_ptr + fsst_symbol_table_offset); |
| 729 | } |
| 730 | |
| 731 | // The calculation of offsets and counts while scanning or fetching is a bit tricky, for two reasons: |
| 732 | // - bitunpacking needs to be aligned to BITPACKING_ALGORITHM_GROUP_SIZE |
| 733 | // - delta decoding needs to decode from the last known value. |
| 734 | bp_delta_offsets_t FSSTStorage::CalculateBpDeltaOffsets(int64_t last_known_row, idx_t start, idx_t scan_count) { |
| 735 | D_ASSERT((idx_t)(last_known_row + 1) <= start); |
| 736 | bp_delta_offsets_t result; |
| 737 | |
| 738 | result.delta_decode_start_row = (idx_t)(last_known_row + 1); |
| 739 | result.bitunpack_alignment_offset = |
| 740 | result.delta_decode_start_row % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
| 741 | result.bitunpack_start_row = result.delta_decode_start_row - result.bitunpack_alignment_offset; |
| 742 | result.unused_delta_decoded_values = start - result.delta_decode_start_row; |
| 743 | result.scan_offset = result.bitunpack_alignment_offset + result.unused_delta_decoded_values; |
| 744 | result.total_delta_decode_count = scan_count + result.unused_delta_decoded_values; |
| 745 | result.total_bitunpack_count = |
| 746 | BitpackingPrimitives::RoundUpToAlgorithmGroupSize<idx_t>(num_to_round: scan_count + result.scan_offset); |
| 747 | |
| 748 | D_ASSERT(result.total_delta_decode_count + result.bitunpack_alignment_offset <= result.total_bitunpack_count); |
| 749 | return result; |
| 750 | } |
| 751 | |
| 752 | } // namespace duckdb |
| 753 | |