| 1 | #include "duckdb/common/bitpacking.hpp" |
| 2 | #include "duckdb/common/operator/comparison_operators.hpp" |
| 3 | #include "duckdb/common/string_map_set.hpp" |
| 4 | #include "duckdb/common/types/vector_buffer.hpp" |
| 5 | #include "duckdb/function/compression/compression.hpp" |
| 6 | #include "duckdb/function/compression_function.hpp" |
| 7 | #include "duckdb/main/config.hpp" |
| 8 | #include "duckdb/storage/segment/uncompressed.hpp" |
| 9 | #include "duckdb/storage/string_uncompressed.hpp" |
| 10 | #include "duckdb/storage/table/append_state.hpp" |
| 11 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
| 12 | |
| 13 | namespace duckdb { |
| 14 | |
| 15 | // Abstract class for keeping compression state either for compression or size analysis |
| 16 | class DictionaryCompressionState : public CompressionState { |
| 17 | public: |
| 18 | bool UpdateState(Vector &scan_vector, idx_t count) { |
| 19 | UnifiedVectorFormat vdata; |
| 20 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
| 21 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
| 22 | Verify(); |
| 23 | |
| 24 | for (idx_t i = 0; i < count; i++) { |
| 25 | auto idx = vdata.sel->get_index(idx: i); |
| 26 | size_t string_size = 0; |
| 27 | bool new_string = false; |
| 28 | auto row_is_valid = vdata.validity.RowIsValid(row_idx: idx); |
| 29 | |
| 30 | if (row_is_valid) { |
| 31 | string_size = data[idx].GetSize(); |
| 32 | if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) { |
| 33 | // Big strings not implemented for dictionary compression |
| 34 | return false; |
| 35 | } |
| 36 | new_string = !LookupString(str: data[idx]); |
| 37 | } |
| 38 | |
| 39 | bool fits = CalculateSpaceRequirements(new_string, string_size); |
| 40 | if (!fits) { |
| 41 | Flush(); |
| 42 | new_string = true; |
| 43 | |
| 44 | fits = CalculateSpaceRequirements(new_string, string_size); |
| 45 | if (!fits) { |
| 46 | throw InternalException("Dictionary compression could not write to new segment" ); |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | if (!row_is_valid) { |
| 51 | AddNull(); |
| 52 | } else if (new_string) { |
| 53 | AddNewString(str: data[idx]); |
| 54 | } else { |
| 55 | AddLastLookup(); |
| 56 | } |
| 57 | |
| 58 | Verify(); |
| 59 | } |
| 60 | |
| 61 | return true; |
| 62 | } |
| 63 | |
| 64 | protected: |
| 65 | // Should verify the State |
| 66 | virtual void Verify() = 0; |
| 67 | // Performs a lookup of str, storing the result internally |
| 68 | virtual bool LookupString(string_t str) = 0; |
| 69 | // Add the most recently looked up str to compression state |
| 70 | virtual void AddLastLookup() = 0; |
| 71 | // Add string to the state that is known to not be seen yet |
| 72 | virtual void AddNewString(string_t str) = 0; |
| 73 | // Add a null value to the compression state |
| 74 | virtual void AddNull() = 0; |
| 75 | // Needs to be called before adding a value. Will return false if a flush is required first. |
| 76 | virtual bool CalculateSpaceRequirements(bool new_string, size_t string_size) = 0; |
| 77 | // Flush the segment to disk if compressing or reset the counters if analyzing |
| 78 | virtual void Flush(bool final = false) = 0; |
| 79 | }; |
| 80 | |
| 81 | typedef struct { |
| 82 | uint32_t dict_size; |
| 83 | uint32_t dict_end; |
| 84 | uint32_t index_buffer_offset; |
| 85 | uint32_t index_buffer_count; |
| 86 | uint32_t bitpacking_width; |
| 87 | } ; |
| 88 | |
| 89 | struct DictionaryCompressionStorage { |
| 90 | static constexpr float MINIMUM_COMPRESSION_RATIO = 1.2; |
| 91 | static constexpr uint16_t = sizeof(dictionary_compression_header_t); |
| 92 | static constexpr size_t COMPACTION_FLUSH_LIMIT = (size_t)Storage::BLOCK_SIZE / 5 * 4; |
| 93 | |
| 94 | static unique_ptr<AnalyzeState> StringInitAnalyze(ColumnData &col_data, PhysicalType type); |
| 95 | static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); |
| 96 | static idx_t StringFinalAnalyze(AnalyzeState &state_p); |
| 97 | |
| 98 | static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer, |
| 99 | unique_ptr<AnalyzeState> state); |
| 100 | static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); |
| 101 | static void FinalizeCompress(CompressionState &state_p); |
| 102 | |
| 103 | static unique_ptr<SegmentScanState> StringInitScan(ColumnSegment &segment); |
| 104 | template <bool ALLOW_DICT_VECTORS> |
| 105 | static void StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
| 106 | idx_t result_offset); |
| 107 | static void StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result); |
| 108 | static void StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
| 109 | idx_t result_idx); |
| 110 | |
| 111 | static bool HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
| 112 | bitpacking_width_t packing_width); |
| 113 | static idx_t RequiredSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
| 114 | bitpacking_width_t packing_width); |
| 115 | |
| 116 | static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle); |
| 117 | static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container); |
| 118 | static string_t FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, data_ptr_t baseptr, |
| 119 | int32_t dict_offset, uint16_t string_len); |
| 120 | static uint16_t GetStringLength(uint32_t *index_buffer_ptr, sel_t index); |
| 121 | }; |
| 122 | |
| 123 | // Dictionary compression uses a combination of bitpacking and a dictionary to compress string segments. The data is |
| 124 | // stored across three buffers: the index buffer, the selection buffer and the dictionary. Firstly the Index buffer |
| 125 | // contains the offsets into the dictionary which are also used to determine the string lengths. Each value in the |
| 126 | // dictionary gets a single unique index in the index buffer. Secondly, the selection buffer maps the tuples to an index |
| 127 | // in the index buffer. The selection buffer is compressed with bitpacking. Finally, the dictionary contains simply all |
| 128 | // the unique strings without lenghts or null termination as we can deduce the lengths from the index buffer. The |
| 129 | // addition of the selection buffer is done for two reasons: firstly, to allow the scan to emit dictionary vectors by |
| 130 | // scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it |
| 131 | // allows for efficient bitpacking compression as the selection values should remain relatively small. |
| 132 | struct DictionaryCompressionCompressState : public DictionaryCompressionState { |
| 133 | explicit DictionaryCompressionCompressState(ColumnDataCheckpointer &checkpointer_p) |
| 134 | : checkpointer(checkpointer_p), |
| 135 | function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_DICTIONARY)), |
| 136 | heap(BufferAllocator::Get(db&: checkpointer.GetDatabase())) { |
| 137 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
| 138 | } |
| 139 | |
| 140 | ColumnDataCheckpointer &checkpointer; |
| 141 | CompressionFunction &function; |
| 142 | |
| 143 | // State regarding current segment |
| 144 | unique_ptr<ColumnSegment> current_segment; |
| 145 | BufferHandle current_handle; |
| 146 | StringDictionaryContainer current_dictionary; |
| 147 | data_ptr_t current_end_ptr; |
| 148 | |
| 149 | // Buffers and map for current segment |
| 150 | StringHeap heap; |
| 151 | string_map_t<uint32_t> current_string_map; |
| 152 | vector<uint32_t> index_buffer; |
| 153 | vector<uint32_t> selection_buffer; |
| 154 | |
| 155 | bitpacking_width_t current_width = 0; |
| 156 | bitpacking_width_t next_width = 0; |
| 157 | |
| 158 | // Result of latest LookupString call |
| 159 | uint32_t latest_lookup_result; |
| 160 | |
| 161 | public: |
| 162 | void CreateEmptySegment(idx_t row_start) { |
| 163 | auto &db = checkpointer.GetDatabase(); |
| 164 | auto &type = checkpointer.GetType(); |
| 165 | auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
| 166 | current_segment = std::move(compressed_segment); |
| 167 | |
| 168 | current_segment->function = function; |
| 169 | |
| 170 | // Reset the buffers and string map |
| 171 | current_string_map.clear(); |
| 172 | index_buffer.clear(); |
| 173 | index_buffer.push_back(x: 0); // Reserve index 0 for null strings |
| 174 | selection_buffer.clear(); |
| 175 | |
| 176 | current_width = 0; |
| 177 | next_width = 0; |
| 178 | |
| 179 | // Reset the pointers into the current segment |
| 180 | auto &buffer_manager = BufferManager::GetBufferManager(db&: checkpointer.GetDatabase()); |
| 181 | current_handle = buffer_manager.Pin(handle&: current_segment->block); |
| 182 | current_dictionary = DictionaryCompressionStorage::GetDictionary(segment&: *current_segment, handle&: current_handle); |
| 183 | current_end_ptr = current_handle.Ptr() + current_dictionary.end; |
| 184 | } |
| 185 | |
| 186 | void Verify() override { |
| 187 | current_dictionary.Verify(); |
| 188 | D_ASSERT(current_segment->count == selection_buffer.size()); |
| 189 | D_ASSERT(DictionaryCompressionStorage::HasEnoughSpace(current_segment->count.load(), index_buffer.size(), |
| 190 | current_dictionary.size, current_width)); |
| 191 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
| 192 | D_ASSERT(index_buffer.size() == current_string_map.size() + 1); // +1 is for null value |
| 193 | } |
| 194 | |
| 195 | bool LookupString(string_t str) override { |
| 196 | auto search = current_string_map.find(x: str); |
| 197 | auto has_result = search != current_string_map.end(); |
| 198 | |
| 199 | if (has_result) { |
| 200 | latest_lookup_result = search->second; |
| 201 | } |
| 202 | return has_result; |
| 203 | } |
| 204 | |
| 205 | void AddNewString(string_t str) override { |
| 206 | UncompressedStringStorage::UpdateStringStats(stats&: current_segment->stats, new_value: str); |
| 207 | |
| 208 | // Copy string to dict |
| 209 | current_dictionary.size += str.GetSize(); |
| 210 | auto dict_pos = current_end_ptr - current_dictionary.size; |
| 211 | memcpy(dest: dict_pos, src: str.GetData(), n: str.GetSize()); |
| 212 | current_dictionary.Verify(); |
| 213 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
| 214 | |
| 215 | // Update buffers and map |
| 216 | index_buffer.push_back(x: current_dictionary.size); |
| 217 | selection_buffer.push_back(x: index_buffer.size() - 1); |
| 218 | if (str.IsInlined()) { |
| 219 | current_string_map.insert(x: {str, index_buffer.size() - 1}); |
| 220 | } else { |
| 221 | current_string_map.insert(x: {heap.AddBlob(data: str), index_buffer.size() - 1}); |
| 222 | } |
| 223 | DictionaryCompressionStorage::SetDictionary(segment&: *current_segment, handle&: current_handle, container: current_dictionary); |
| 224 | |
| 225 | current_width = next_width; |
| 226 | current_segment->count++; |
| 227 | } |
| 228 | |
| 229 | void AddNull() override { |
| 230 | selection_buffer.push_back(x: 0); |
| 231 | current_segment->count++; |
| 232 | } |
| 233 | |
| 234 | void AddLastLookup() override { |
| 235 | selection_buffer.push_back(x: latest_lookup_result); |
| 236 | current_segment->count++; |
| 237 | } |
| 238 | |
| 239 | bool CalculateSpaceRequirements(bool new_string, size_t string_size) override { |
| 240 | if (new_string) { |
| 241 | next_width = BitpackingPrimitives::MinimumBitWidth(value: index_buffer.size() - 1 + new_string); |
| 242 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_segment->count.load() + 1, |
| 243 | index_count: index_buffer.size() + 1, |
| 244 | dict_size: current_dictionary.size + string_size, packing_width: next_width); |
| 245 | } else { |
| 246 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_segment->count.load() + 1, index_count: index_buffer.size(), |
| 247 | dict_size: current_dictionary.size, packing_width: current_width); |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | void Flush(bool final = false) override { |
| 252 | auto next_start = current_segment->start + current_segment->count; |
| 253 | |
| 254 | auto segment_size = Finalize(); |
| 255 | auto &state = checkpointer.GetCheckpointState(); |
| 256 | state.FlushSegment(segment: std::move(current_segment), segment_size); |
| 257 | |
| 258 | if (!final) { |
| 259 | CreateEmptySegment(row_start: next_start); |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | idx_t Finalize() { |
| 264 | auto &buffer_manager = BufferManager::GetBufferManager(db&: checkpointer.GetDatabase()); |
| 265 | auto handle = buffer_manager.Pin(handle&: current_segment->block); |
| 266 | D_ASSERT(current_dictionary.end == Storage::BLOCK_SIZE); |
| 267 | |
| 268 | // calculate sizes |
| 269 | auto compressed_selection_buffer_size = |
| 270 | BitpackingPrimitives::GetRequiredSize(count: current_segment->count, width: current_width); |
| 271 | auto index_buffer_size = index_buffer.size() * sizeof(uint32_t); |
| 272 | auto total_size = DictionaryCompressionStorage::DICTIONARY_HEADER_SIZE + compressed_selection_buffer_size + |
| 273 | index_buffer_size + current_dictionary.size; |
| 274 | |
| 275 | // calculate ptr and offsets |
| 276 | auto base_ptr = handle.Ptr(); |
| 277 | auto = reinterpret_cast<dictionary_compression_header_t *>(base_ptr); |
| 278 | auto compressed_selection_buffer_offset = DictionaryCompressionStorage::DICTIONARY_HEADER_SIZE; |
| 279 | auto index_buffer_offset = compressed_selection_buffer_offset + compressed_selection_buffer_size; |
| 280 | |
| 281 | // Write compressed selection buffer |
| 282 | BitpackingPrimitives::PackBuffer<sel_t, false>(dst: base_ptr + compressed_selection_buffer_offset, |
| 283 | src: (sel_t *)(selection_buffer.data()), count: current_segment->count, |
| 284 | width: current_width); |
| 285 | |
| 286 | // Write the index buffer |
| 287 | memcpy(dest: base_ptr + index_buffer_offset, src: index_buffer.data(), n: index_buffer_size); |
| 288 | |
| 289 | // Store sizes and offsets in segment header |
| 290 | Store<uint32_t>(val: index_buffer_offset, ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
| 291 | Store<uint32_t>(val: index_buffer.size(), ptr: data_ptr_cast(src: &header_ptr->index_buffer_count)); |
| 292 | Store<uint32_t>(val: (uint32_t)current_width, ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)); |
| 293 | |
| 294 | D_ASSERT(current_width == BitpackingPrimitives::MinimumBitWidth(index_buffer.size() - 1)); |
| 295 | D_ASSERT(DictionaryCompressionStorage::HasEnoughSpace(current_segment->count, index_buffer.size(), |
| 296 | current_dictionary.size, current_width)); |
| 297 | D_ASSERT((uint64_t)*max_element(std::begin(selection_buffer), std::end(selection_buffer)) == |
| 298 | index_buffer.size() - 1); |
| 299 | |
| 300 | if (total_size >= DictionaryCompressionStorage::COMPACTION_FLUSH_LIMIT) { |
| 301 | // the block is full enough, don't bother moving around the dictionary |
| 302 | return Storage::BLOCK_SIZE; |
| 303 | } |
| 304 | // the block has space left: figure out how much space we can save |
| 305 | auto move_amount = Storage::BLOCK_SIZE - total_size; |
| 306 | // move the dictionary so it lines up exactly with the offsets |
| 307 | auto new_dictionary_offset = index_buffer_offset + index_buffer_size; |
| 308 | memmove(dest: base_ptr + new_dictionary_offset, src: base_ptr + current_dictionary.end - current_dictionary.size, |
| 309 | n: current_dictionary.size); |
| 310 | current_dictionary.end -= move_amount; |
| 311 | D_ASSERT(current_dictionary.end == total_size); |
| 312 | // write the new dictionary (with the updated "end") |
| 313 | DictionaryCompressionStorage::SetDictionary(segment&: *current_segment, handle, container: current_dictionary); |
| 314 | return total_size; |
| 315 | } |
| 316 | }; |
| 317 | |
| 318 | //===--------------------------------------------------------------------===// |
| 319 | // Analyze |
| 320 | //===--------------------------------------------------------------------===// |
| 321 | struct DictionaryAnalyzeState : public DictionaryCompressionState { |
| 322 | DictionaryAnalyzeState() |
| 323 | : segment_count(0), current_tuple_count(0), current_unique_count(0), current_dict_size(0), current_width(0), |
| 324 | next_width(0) { |
| 325 | } |
| 326 | |
| 327 | size_t segment_count; |
| 328 | idx_t current_tuple_count; |
| 329 | idx_t current_unique_count; |
| 330 | size_t current_dict_size; |
| 331 | StringHeap heap; |
| 332 | string_set_t current_set; |
| 333 | bitpacking_width_t current_width; |
| 334 | bitpacking_width_t next_width; |
| 335 | |
| 336 | bool LookupString(string_t str) override { |
| 337 | return current_set.count(x: str); |
| 338 | } |
| 339 | |
| 340 | void AddNewString(string_t str) override { |
| 341 | current_tuple_count++; |
| 342 | current_unique_count++; |
| 343 | current_dict_size += str.GetSize(); |
| 344 | if (str.IsInlined()) { |
| 345 | current_set.insert(x: str); |
| 346 | } else { |
| 347 | current_set.insert(x: heap.AddBlob(data: str)); |
| 348 | } |
| 349 | current_width = next_width; |
| 350 | } |
| 351 | |
| 352 | void AddLastLookup() override { |
| 353 | current_tuple_count++; |
| 354 | } |
| 355 | |
| 356 | void AddNull() override { |
| 357 | current_tuple_count++; |
| 358 | } |
| 359 | |
| 360 | bool CalculateSpaceRequirements(bool new_string, size_t string_size) override { |
| 361 | if (new_string) { |
| 362 | next_width = |
| 363 | BitpackingPrimitives::MinimumBitWidth(value: current_unique_count + 2); // 1 for null, one for new string |
| 364 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_tuple_count + 1, index_count: current_unique_count + 1, |
| 365 | dict_size: current_dict_size + string_size, packing_width: next_width); |
| 366 | } else { |
| 367 | return DictionaryCompressionStorage::HasEnoughSpace(current_count: current_tuple_count + 1, index_count: current_unique_count, |
| 368 | dict_size: current_dict_size, packing_width: current_width); |
| 369 | } |
| 370 | } |
| 371 | |
| 372 | void Flush(bool final = false) override { |
| 373 | segment_count++; |
| 374 | current_tuple_count = 0; |
| 375 | current_unique_count = 0; |
| 376 | current_dict_size = 0; |
| 377 | current_set.clear(); |
| 378 | } |
| 379 | void Verify() override {}; |
| 380 | }; |
| 381 | |
| 382 | struct DictionaryCompressionAnalyzeState : public AnalyzeState { |
| 383 | DictionaryCompressionAnalyzeState() : analyze_state(make_uniq<DictionaryAnalyzeState>()) { |
| 384 | } |
| 385 | |
| 386 | unique_ptr<DictionaryAnalyzeState> analyze_state; |
| 387 | }; |
| 388 | |
| 389 | unique_ptr<AnalyzeState> DictionaryCompressionStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { |
| 390 | return make_uniq<DictionaryCompressionAnalyzeState>(); |
| 391 | } |
| 392 | |
| 393 | bool DictionaryCompressionStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { |
| 394 | auto &state = state_p.Cast<DictionaryCompressionAnalyzeState>(); |
| 395 | return state.analyze_state->UpdateState(scan_vector&: input, count); |
| 396 | } |
| 397 | |
| 398 | idx_t DictionaryCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) { |
| 399 | auto &analyze_state = state_p.Cast<DictionaryCompressionAnalyzeState>(); |
| 400 | auto &state = *analyze_state.analyze_state; |
| 401 | |
| 402 | auto width = BitpackingPrimitives::MinimumBitWidth(value: state.current_unique_count + 1); |
| 403 | auto req_space = |
| 404 | RequiredSpace(current_count: state.current_tuple_count, index_count: state.current_unique_count, dict_size: state.current_dict_size, packing_width: width); |
| 405 | |
| 406 | return MINIMUM_COMPRESSION_RATIO * (state.segment_count * Storage::BLOCK_SIZE + req_space); |
| 407 | } |
| 408 | |
| 409 | //===--------------------------------------------------------------------===// |
| 410 | // Compress |
| 411 | //===--------------------------------------------------------------------===// |
| 412 | unique_ptr<CompressionState> DictionaryCompressionStorage::InitCompression(ColumnDataCheckpointer &checkpointer, |
| 413 | unique_ptr<AnalyzeState> state) { |
| 414 | return make_uniq<DictionaryCompressionCompressState>(args&: checkpointer); |
| 415 | } |
| 416 | |
| 417 | void DictionaryCompressionStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
| 418 | auto &state = state_p.Cast<DictionaryCompressionCompressState>(); |
| 419 | state.UpdateState(scan_vector, count); |
| 420 | } |
| 421 | |
| 422 | void DictionaryCompressionStorage::FinalizeCompress(CompressionState &state_p) { |
| 423 | auto &state = state_p.Cast<DictionaryCompressionCompressState>(); |
| 424 | state.Flush(final: true); |
| 425 | } |
| 426 | |
| 427 | //===--------------------------------------------------------------------===// |
| 428 | // Scan |
| 429 | //===--------------------------------------------------------------------===// |
| 430 | struct CompressedStringScanState : public StringScanState { |
| 431 | BufferHandle handle; |
| 432 | buffer_ptr<Vector> dictionary; |
| 433 | bitpacking_width_t current_width; |
| 434 | buffer_ptr<SelectionVector> sel_vec; |
| 435 | idx_t sel_vec_size = 0; |
| 436 | }; |
| 437 | |
| 438 | unique_ptr<SegmentScanState> DictionaryCompressionStorage::StringInitScan(ColumnSegment &segment) { |
| 439 | auto state = make_uniq<CompressedStringScanState>(); |
| 440 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 441 | state->handle = buffer_manager.Pin(handle&: segment.block); |
| 442 | |
| 443 | auto baseptr = state->handle.Ptr() + segment.GetBlockOffset(); |
| 444 | |
| 445 | // Load header values |
| 446 | auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle&: state->handle); |
| 447 | auto = reinterpret_cast<dictionary_compression_header_t *>(baseptr); |
| 448 | auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
| 449 | auto index_buffer_count = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_count)); |
| 450 | state->current_width = (bitpacking_width_t)(Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width))); |
| 451 | |
| 452 | auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset); |
| 453 | |
| 454 | state->dictionary = make_buffer<Vector>(args&: segment.type, args&: index_buffer_count); |
| 455 | auto dict_child_data = FlatVector::GetData<string_t>(vector&: *(state->dictionary)); |
| 456 | |
| 457 | for (uint32_t i = 0; i < index_buffer_count; i++) { |
| 458 | // NOTE: the passing of dict_child_vector, will not be used, its for big strings |
| 459 | uint16_t str_len = GetStringLength(index_buffer_ptr, index: i); |
| 460 | dict_child_data[i] = FetchStringFromDict(segment, dict, baseptr, dict_offset: index_buffer_ptr[i], string_len: str_len); |
| 461 | } |
| 462 | |
| 463 | return std::move(state); |
| 464 | } |
| 465 | |
| 466 | //===--------------------------------------------------------------------===// |
| 467 | // Scan base data |
| 468 | //===--------------------------------------------------------------------===// |
| 469 | template <bool ALLOW_DICT_VECTORS> |
| 470 | void DictionaryCompressionStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
| 471 | Vector &result, idx_t result_offset) { |
| 472 | // clear any previously locked buffers and get the primary buffer handle |
| 473 | auto &scan_state = state.scan_state->Cast<CompressedStringScanState>(); |
| 474 | auto start = segment.GetRelativeIndex(row_index: state.row_index); |
| 475 | |
| 476 | auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
| 477 | auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle&: scan_state.handle); |
| 478 | |
| 479 | auto = reinterpret_cast<dictionary_compression_header_t *>(baseptr); |
| 480 | auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
| 481 | auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset); |
| 482 | |
| 483 | auto base_data = data_ptr_cast(src: baseptr + DICTIONARY_HEADER_SIZE); |
| 484 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
| 485 | |
| 486 | if (!ALLOW_DICT_VECTORS || scan_count != STANDARD_VECTOR_SIZE || |
| 487 | start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE != 0) { |
| 488 | // Emit regular vector |
| 489 | |
| 490 | // Handling non-bitpacking-group-aligned start values; |
| 491 | idx_t start_offset = start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
| 492 | |
| 493 | // We will scan in blocks of BITPACKING_ALGORITHM_GROUP_SIZE, so we may scan some extra values. |
| 494 | idx_t decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: scan_count + start_offset); |
| 495 | |
| 496 | // Create a decompression buffer of sufficient size if we don't already have one. |
| 497 | if (!scan_state.sel_vec || scan_state.sel_vec_size < decompress_count) { |
| 498 | scan_state.sel_vec_size = decompress_count; |
| 499 | scan_state.sel_vec = make_buffer<SelectionVector>(args&: decompress_count); |
| 500 | } |
| 501 | |
| 502 | data_ptr_t src = &base_data[((start - start_offset) * scan_state.current_width) / 8]; |
| 503 | sel_t *sel_vec_ptr = scan_state.sel_vec->data(); |
| 504 | |
| 505 | BitpackingPrimitives::UnPackBuffer<sel_t>(dst: data_ptr_cast(src: sel_vec_ptr), src, count: decompress_count, |
| 506 | width: scan_state.current_width); |
| 507 | |
| 508 | for (idx_t i = 0; i < scan_count; i++) { |
| 509 | // Lookup dict offset in index buffer |
| 510 | auto string_number = scan_state.sel_vec->get_index(idx: i + start_offset); |
| 511 | auto dict_offset = index_buffer_ptr[string_number]; |
| 512 | uint16_t str_len = GetStringLength(index_buffer_ptr, index: string_number); |
| 513 | result_data[result_offset + i] = FetchStringFromDict(segment, dict, baseptr, dict_offset, string_len: str_len); |
| 514 | } |
| 515 | |
| 516 | } else { |
| 517 | D_ASSERT(start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE == 0); |
| 518 | D_ASSERT(scan_count == STANDARD_VECTOR_SIZE); |
| 519 | D_ASSERT(result_offset == 0); |
| 520 | |
| 521 | idx_t decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: scan_count); |
| 522 | |
| 523 | // Create a selection vector of sufficient size if we don't already have one. |
| 524 | if (!scan_state.sel_vec || scan_state.sel_vec_size < decompress_count) { |
| 525 | scan_state.sel_vec_size = decompress_count; |
| 526 | scan_state.sel_vec = make_buffer<SelectionVector>(args&: decompress_count); |
| 527 | } |
| 528 | |
| 529 | // Scanning 1024 values, emitting a dict vector |
| 530 | data_ptr_t dst = data_ptr_cast(src: scan_state.sel_vec->data()); |
| 531 | data_ptr_t src = data_ptr_cast(src: &base_data[(start * scan_state.current_width) / 8]); |
| 532 | |
| 533 | BitpackingPrimitives::UnPackBuffer<sel_t>(dst, src, count: scan_count, width: scan_state.current_width); |
| 534 | |
| 535 | result.Slice(other&: *(scan_state.dictionary), sel: *scan_state.sel_vec, count: scan_count); |
| 536 | } |
| 537 | } |
| 538 | |
| 539 | void DictionaryCompressionStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
| 540 | Vector &result) { |
| 541 | StringScanPartial<true>(segment, state, scan_count, result, result_offset: 0); |
| 542 | } |
| 543 | |
| 544 | //===--------------------------------------------------------------------===// |
| 545 | // Fetch |
| 546 | //===--------------------------------------------------------------------===// |
| 547 | void DictionaryCompressionStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, |
| 548 | Vector &result, idx_t result_idx) { |
| 549 | // fetch a single row from the string segment |
| 550 | // first pin the main buffer if it is not already pinned |
| 551 | auto &handle = state.GetOrInsertHandle(segment); |
| 552 | |
| 553 | auto baseptr = handle.Ptr() + segment.GetBlockOffset(); |
| 554 | auto = reinterpret_cast<dictionary_compression_header_t *>(baseptr); |
| 555 | auto dict = DictionaryCompressionStorage::GetDictionary(segment, handle); |
| 556 | auto index_buffer_offset = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->index_buffer_offset)); |
| 557 | auto width = (bitpacking_width_t)Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->bitpacking_width)); |
| 558 | auto index_buffer_ptr = reinterpret_cast<uint32_t *>(baseptr + index_buffer_offset); |
| 559 | auto base_data = data_ptr_cast(src: baseptr + DICTIONARY_HEADER_SIZE); |
| 560 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
| 561 | |
| 562 | // Handling non-bitpacking-group-aligned start values; |
| 563 | idx_t start_offset = row_id % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
| 564 | |
| 565 | // Decompress part of selection buffer we need for this value. |
| 566 | sel_t decompression_buffer[BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE]; |
| 567 | data_ptr_t src = data_ptr_cast(src: &base_data[((row_id - start_offset) * width) / 8]); |
| 568 | BitpackingPrimitives::UnPackBuffer<sel_t>(dst: data_ptr_cast(src: decompression_buffer), src, |
| 569 | count: BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE, width); |
| 570 | |
| 571 | auto selection_value = decompression_buffer[start_offset]; |
| 572 | auto dict_offset = index_buffer_ptr[selection_value]; |
| 573 | uint16_t str_len = GetStringLength(index_buffer_ptr, index: selection_value); |
| 574 | |
| 575 | result_data[result_idx] = FetchStringFromDict(segment, dict, baseptr, dict_offset, string_len: str_len); |
| 576 | } |
| 577 | |
| 578 | //===--------------------------------------------------------------------===// |
| 579 | // Helper Functions |
| 580 | //===--------------------------------------------------------------------===// |
| 581 | bool DictionaryCompressionStorage::HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
| 582 | bitpacking_width_t packing_width) { |
| 583 | return RequiredSpace(current_count, index_count, dict_size, packing_width) <= Storage::BLOCK_SIZE; |
| 584 | } |
| 585 | |
| 586 | idx_t DictionaryCompressionStorage::RequiredSpace(idx_t current_count, idx_t index_count, idx_t dict_size, |
| 587 | bitpacking_width_t packing_width) { |
| 588 | idx_t base_space = DICTIONARY_HEADER_SIZE + dict_size; |
| 589 | idx_t string_number_space = BitpackingPrimitives::GetRequiredSize(count: current_count, width: packing_width); |
| 590 | idx_t index_space = index_count * sizeof(uint32_t); |
| 591 | |
| 592 | idx_t used_space = base_space + index_space + string_number_space; |
| 593 | |
| 594 | return used_space; |
| 595 | } |
| 596 | |
| 597 | StringDictionaryContainer DictionaryCompressionStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) { |
| 598 | auto = reinterpret_cast<dictionary_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
| 599 | StringDictionaryContainer container; |
| 600 | container.size = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
| 601 | container.end = Load<uint32_t>(ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
| 602 | return container; |
| 603 | } |
| 604 | |
| 605 | void DictionaryCompressionStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, |
| 606 | StringDictionaryContainer container) { |
| 607 | auto = reinterpret_cast<dictionary_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset()); |
| 608 | Store<uint32_t>(val: container.size, ptr: data_ptr_cast(src: &header_ptr->dict_size)); |
| 609 | Store<uint32_t>(val: container.end, ptr: data_ptr_cast(src: &header_ptr->dict_end)); |
| 610 | } |
| 611 | |
| 612 | string_t DictionaryCompressionStorage::FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, |
| 613 | data_ptr_t baseptr, int32_t dict_offset, |
| 614 | uint16_t string_len) { |
| 615 | D_ASSERT(dict_offset >= 0 && dict_offset <= Storage::BLOCK_SIZE); |
| 616 | |
| 617 | if (dict_offset == 0) { |
| 618 | return string_t(nullptr, 0); |
| 619 | } |
| 620 | // normal string: read string from this block |
| 621 | auto dict_end = baseptr + dict.end; |
| 622 | auto dict_pos = dict_end - dict_offset; |
| 623 | |
| 624 | auto str_ptr = char_ptr_cast(src: dict_pos); |
| 625 | return string_t(str_ptr, string_len); |
| 626 | } |
| 627 | |
| 628 | uint16_t DictionaryCompressionStorage::GetStringLength(uint32_t *index_buffer_ptr, sel_t index) { |
| 629 | if (index == 0) { |
| 630 | return 0; |
| 631 | } else { |
| 632 | return index_buffer_ptr[index] - index_buffer_ptr[index - 1]; |
| 633 | } |
| 634 | } |
| 635 | |
| 636 | //===--------------------------------------------------------------------===// |
| 637 | // Get Function |
| 638 | //===--------------------------------------------------------------------===// |
| 639 | CompressionFunction DictionaryCompressionFun::GetFunction(PhysicalType data_type) { |
| 640 | return CompressionFunction( |
| 641 | CompressionType::COMPRESSION_DICTIONARY, data_type, DictionaryCompressionStorage ::StringInitAnalyze, |
| 642 | DictionaryCompressionStorage::StringAnalyze, DictionaryCompressionStorage::StringFinalAnalyze, |
| 643 | DictionaryCompressionStorage::InitCompression, DictionaryCompressionStorage::Compress, |
| 644 | DictionaryCompressionStorage::FinalizeCompress, DictionaryCompressionStorage::StringInitScan, |
| 645 | DictionaryCompressionStorage::StringScan, DictionaryCompressionStorage::StringScanPartial<false>, |
| 646 | DictionaryCompressionStorage::StringFetchRow, UncompressedFunctions::EmptySkip); |
| 647 | } |
| 648 | |
| 649 | bool DictionaryCompressionFun::TypeIsSupported(PhysicalType type) { |
| 650 | return type == PhysicalType::VARCHAR; |
| 651 | } |
| 652 | } // namespace duckdb |
| 653 | |