| 1 | #include "duckdb/storage/string_uncompressed.hpp" |
| 2 | |
| 3 | #include "duckdb/common/pair.hpp" |
| 4 | #include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp" |
| 5 | #include "miniz_wrapper.hpp" |
| 6 | |
| 7 | namespace duckdb { |
| 8 | |
| 9 | //===--------------------------------------------------------------------===// |
| 10 | // Storage Class |
| 11 | //===--------------------------------------------------------------------===// |
| 12 | UncompressedStringSegmentState::~UncompressedStringSegmentState() { |
| 13 | while (head) { |
| 14 | // prevent deep recursion here |
| 15 | head = std::move(head->next); |
| 16 | } |
| 17 | } |
| 18 | |
| 19 | //===--------------------------------------------------------------------===// |
| 20 | // Analyze |
| 21 | //===--------------------------------------------------------------------===// |
| 22 | struct StringAnalyzeState : public AnalyzeState { |
| 23 | StringAnalyzeState() : count(0), total_string_size(0), overflow_strings(0) { |
| 24 | } |
| 25 | |
| 26 | idx_t count; |
| 27 | idx_t total_string_size; |
| 28 | idx_t overflow_strings; |
| 29 | }; |
| 30 | |
| 31 | unique_ptr<AnalyzeState> UncompressedStringStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { |
| 32 | return make_uniq<StringAnalyzeState>(); |
| 33 | } |
| 34 | |
| 35 | bool UncompressedStringStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { |
| 36 | auto &state = state_p.Cast<StringAnalyzeState>(); |
| 37 | UnifiedVectorFormat vdata; |
| 38 | input.ToUnifiedFormat(count, data&: vdata); |
| 39 | |
| 40 | state.count += count; |
| 41 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
| 42 | for (idx_t i = 0; i < count; i++) { |
| 43 | auto idx = vdata.sel->get_index(idx: i); |
| 44 | if (vdata.validity.RowIsValid(row_idx: idx)) { |
| 45 | auto string_size = data[idx].GetSize(); |
| 46 | state.total_string_size += string_size; |
| 47 | if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) { |
| 48 | state.overflow_strings++; |
| 49 | } |
| 50 | } |
| 51 | } |
| 52 | return true; |
| 53 | } |
| 54 | |
| 55 | idx_t UncompressedStringStorage::StringFinalAnalyze(AnalyzeState &state_p) { |
| 56 | auto &state = state_p.Cast<StringAnalyzeState>(); |
| 57 | return state.count * sizeof(int32_t) + state.total_string_size + state.overflow_strings * BIG_STRING_MARKER_SIZE; |
| 58 | } |
| 59 | |
| 60 | //===--------------------------------------------------------------------===// |
| 61 | // Scan |
| 62 | //===--------------------------------------------------------------------===// |
| 63 | unique_ptr<SegmentScanState> UncompressedStringStorage::StringInitScan(ColumnSegment &segment) { |
| 64 | auto result = make_uniq<StringScanState>(); |
| 65 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 66 | result->handle = buffer_manager.Pin(handle&: segment.block); |
| 67 | return std::move(result); |
| 68 | } |
| 69 | |
| 70 | //===--------------------------------------------------------------------===// |
| 71 | // Scan base data |
| 72 | //===--------------------------------------------------------------------===// |
| 73 | void UncompressedStringStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
| 74 | Vector &result, idx_t result_offset) { |
| 75 | // clear any previously locked buffers and get the primary buffer handle |
| 76 | auto &scan_state = state.scan_state->Cast<StringScanState>(); |
| 77 | auto start = segment.GetRelativeIndex(row_index: state.row_index); |
| 78 | |
| 79 | auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
| 80 | auto dict = GetDictionary(segment, handle&: scan_state.handle); |
| 81 | auto base_data = reinterpret_cast<int32_t *>(baseptr + DICTIONARY_HEADER_SIZE); |
| 82 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
| 83 | |
| 84 | int32_t previous_offset = start > 0 ? base_data[start - 1] : 0; |
| 85 | |
| 86 | for (idx_t i = 0; i < scan_count; i++) { |
| 87 | // std::abs used since offsets can be negative to indicate big strings |
| 88 | uint32_t string_length = std::abs(x: base_data[start + i]) - std::abs(x: previous_offset); |
| 89 | result_data[result_offset + i] = |
| 90 | FetchStringFromDict(segment, dict, result, baseptr, dict_offset: base_data[start + i], string_length); |
| 91 | previous_offset = base_data[start + i]; |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | void UncompressedStringStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
| 96 | Vector &result) { |
| 97 | StringScanPartial(segment, state, scan_count, result, result_offset: 0); |
| 98 | } |
| 99 | |
| 100 | //===--------------------------------------------------------------------===// |
| 101 | // Fetch |
| 102 | //===--------------------------------------------------------------------===// |
| 103 | BufferHandle &ColumnFetchState::GetOrInsertHandle(ColumnSegment &segment) { |
| 104 | auto primary_id = segment.block->BlockId(); |
| 105 | |
| 106 | auto entry = handles.find(x: primary_id); |
| 107 | if (entry == handles.end()) { |
| 108 | // not pinned yet: pin it |
| 109 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 110 | auto handle = buffer_manager.Pin(handle&: segment.block); |
| 111 | auto entry = handles.insert(x: make_pair(x&: primary_id, y: std::move(handle))); |
| 112 | return entry.first->second; |
| 113 | } else { |
| 114 | // already pinned: use the pinned handle |
| 115 | return entry->second; |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | void UncompressedStringStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, |
| 120 | Vector &result, idx_t result_idx) { |
| 121 | // fetch a single row from the string segment |
| 122 | // first pin the main buffer if it is not already pinned |
| 123 | auto &handle = state.GetOrInsertHandle(segment); |
| 124 | |
| 125 | auto baseptr = handle.Ptr() + segment.GetBlockOffset(); |
| 126 | auto dict = GetDictionary(segment, handle); |
| 127 | auto base_data = reinterpret_cast<int32_t *>(baseptr + DICTIONARY_HEADER_SIZE); |
| 128 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
| 129 | |
| 130 | auto dict_offset = base_data[row_id]; |
| 131 | uint32_t string_length; |
| 132 | if ((idx_t)row_id == 0) { |
| 133 | // edge case where this is the first string in the dict |
| 134 | string_length = std::abs(x: dict_offset); |
| 135 | } else { |
| 136 | string_length = std::abs(x: dict_offset) - std::abs(x: base_data[row_id - 1]); |
| 137 | } |
| 138 | result_data[result_idx] = FetchStringFromDict(segment, dict, result, baseptr, dict_offset, string_length); |
| 139 | } |
| 140 | |
| 141 | //===--------------------------------------------------------------------===// |
| 142 | // Append |
| 143 | //===--------------------------------------------------------------------===// |
| 144 | |
| 145 | unique_ptr<CompressedSegmentState> UncompressedStringStorage::StringInitSegment(ColumnSegment &segment, |
| 146 | block_id_t block_id) { |
| 147 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 148 | if (block_id == INVALID_BLOCK) { |
| 149 | auto handle = buffer_manager.Pin(handle&: segment.block); |
| 150 | StringDictionaryContainer dictionary; |
| 151 | dictionary.size = 0; |
| 152 | dictionary.end = segment.SegmentSize(); |
| 153 | SetDictionary(segment, handle, dict: dictionary); |
| 154 | } |
| 155 | return make_uniq<UncompressedStringSegmentState>(); |
| 156 | } |
| 157 | |
| 158 | idx_t UncompressedStringStorage::FinalizeAppend(ColumnSegment &segment, SegmentStatistics &stats) { |
| 159 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 160 | auto handle = buffer_manager.Pin(handle&: segment.block); |
| 161 | auto dict = GetDictionary(segment, handle); |
| 162 | D_ASSERT(dict.end == segment.SegmentSize()); |
| 163 | // compute the total size required to store this segment |
| 164 | auto offset_size = DICTIONARY_HEADER_SIZE + segment.count * sizeof(int32_t); |
| 165 | auto total_size = offset_size + dict.size; |
| 166 | if (total_size >= COMPACTION_FLUSH_LIMIT) { |
| 167 | // the block is full enough, don't bother moving around the dictionary |
| 168 | return segment.SegmentSize(); |
| 169 | } |
| 170 | // the block has space left: figure out how much space we can save |
| 171 | auto move_amount = segment.SegmentSize() - total_size; |
| 172 | // move the dictionary so it lines up exactly with the offsets |
| 173 | auto dataptr = handle.Ptr(); |
| 174 | memmove(dest: dataptr + offset_size, src: dataptr + dict.end - dict.size, n: dict.size); |
| 175 | dict.end -= move_amount; |
| 176 | D_ASSERT(dict.end == total_size); |
| 177 | // write the new dictionary (with the updated "end") |
| 178 | SetDictionary(segment, handle, dict); |
| 179 | return total_size; |
| 180 | } |
| 181 | |
| 182 | //===--------------------------------------------------------------------===// |
| 183 | // Get Function |
| 184 | //===--------------------------------------------------------------------===// |
| 185 | CompressionFunction StringUncompressed::GetFunction(PhysicalType data_type) { |
| 186 | D_ASSERT(data_type == PhysicalType::VARCHAR); |
| 187 | return CompressionFunction(CompressionType::COMPRESSION_UNCOMPRESSED, data_type, |
| 188 | UncompressedStringStorage::StringInitAnalyze, UncompressedStringStorage::StringAnalyze, |
| 189 | UncompressedStringStorage::StringFinalAnalyze, UncompressedFunctions::InitCompression, |
| 190 | UncompressedFunctions::Compress, UncompressedFunctions::FinalizeCompress, |
| 191 | UncompressedStringStorage::StringInitScan, UncompressedStringStorage::StringScan, |
| 192 | UncompressedStringStorage::StringScanPartial, UncompressedStringStorage::StringFetchRow, |
| 193 | UncompressedFunctions::EmptySkip, UncompressedStringStorage::StringInitSegment, |
| 194 | UncompressedStringStorage::StringInitAppend, UncompressedStringStorage::StringAppend, |
| 195 | UncompressedStringStorage::FinalizeAppend); |
| 196 | } |
| 197 | |
| 198 | //===--------------------------------------------------------------------===// |
| 199 | // Helper Functions |
| 200 | //===--------------------------------------------------------------------===// |
| 201 | void UncompressedStringStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, |
| 202 | StringDictionaryContainer container) { |
| 203 | auto startptr = handle.Ptr() + segment.GetBlockOffset(); |
| 204 | Store<uint32_t>(val: container.size, ptr: startptr); |
| 205 | Store<uint32_t>(val: container.end, ptr: startptr + sizeof(uint32_t)); |
| 206 | } |
| 207 | |
| 208 | StringDictionaryContainer UncompressedStringStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) { |
| 209 | auto startptr = handle.Ptr() + segment.GetBlockOffset(); |
| 210 | StringDictionaryContainer container; |
| 211 | container.size = Load<uint32_t>(ptr: startptr); |
| 212 | container.end = Load<uint32_t>(ptr: startptr + sizeof(uint32_t)); |
| 213 | return container; |
| 214 | } |
| 215 | |
| 216 | idx_t UncompressedStringStorage::RemainingSpace(ColumnSegment &segment, BufferHandle &handle) { |
| 217 | auto dictionary = GetDictionary(segment, handle); |
| 218 | D_ASSERT(dictionary.end == segment.SegmentSize()); |
| 219 | idx_t used_space = dictionary.size + segment.count * sizeof(int32_t) + DICTIONARY_HEADER_SIZE; |
| 220 | D_ASSERT(segment.SegmentSize() >= used_space); |
| 221 | return segment.SegmentSize() - used_space; |
| 222 | } |
| 223 | |
| 224 | void UncompressedStringStorage::WriteString(ColumnSegment &segment, string_t string, block_id_t &result_block, |
| 225 | int32_t &result_offset) { |
| 226 | auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>(); |
| 227 | if (state.overflow_writer) { |
| 228 | // overflow writer is set: write string there |
| 229 | state.overflow_writer->WriteString(string, result_block, result_offset); |
| 230 | } else { |
| 231 | // default overflow behavior: use in-memory buffer to store the overflow string |
| 232 | WriteStringMemory(segment, string, result_block, result_offset); |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | void UncompressedStringStorage::WriteStringMemory(ColumnSegment &segment, string_t string, block_id_t &result_block, |
| 237 | int32_t &result_offset) { |
| 238 | uint32_t total_length = string.GetSize() + sizeof(uint32_t); |
| 239 | shared_ptr<BlockHandle> block; |
| 240 | BufferHandle handle; |
| 241 | |
| 242 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 243 | auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>(); |
| 244 | // check if the string fits in the current block |
| 245 | if (!state.head || state.head->offset + total_length >= state.head->size) { |
| 246 | // string does not fit, allocate space for it |
| 247 | // create a new string block |
| 248 | idx_t alloc_size = MaxValue<idx_t>(a: total_length, b: Storage::BLOCK_SIZE); |
| 249 | auto new_block = make_uniq<StringBlock>(); |
| 250 | new_block->offset = 0; |
| 251 | new_block->size = alloc_size; |
| 252 | // allocate an in-memory buffer for it |
| 253 | handle = buffer_manager.Allocate(block_size: alloc_size, can_destroy: false, block: &block); |
| 254 | state.overflow_blocks[block->BlockId()] = new_block.get(); |
| 255 | new_block->block = std::move(block); |
| 256 | new_block->next = std::move(state.head); |
| 257 | state.head = std::move(new_block); |
| 258 | } else { |
| 259 | // string fits, copy it into the current block |
| 260 | handle = buffer_manager.Pin(handle&: state.head->block); |
| 261 | } |
| 262 | |
| 263 | result_block = state.head->block->BlockId(); |
| 264 | result_offset = state.head->offset; |
| 265 | |
| 266 | // copy the string and the length there |
| 267 | auto ptr = handle.Ptr() + state.head->offset; |
| 268 | Store<uint32_t>(val: string.GetSize(), ptr); |
| 269 | ptr += sizeof(uint32_t); |
| 270 | memcpy(dest: ptr, src: string.GetData(), n: string.GetSize()); |
| 271 | state.head->offset += total_length; |
| 272 | } |
| 273 | |
| 274 | string_t UncompressedStringStorage::ReadOverflowString(ColumnSegment &segment, Vector &result, block_id_t block, |
| 275 | int32_t offset) { |
| 276 | D_ASSERT(block != INVALID_BLOCK); |
| 277 | D_ASSERT(offset < Storage::BLOCK_SIZE); |
| 278 | |
| 279 | auto &block_manager = segment.GetBlockManager(); |
| 280 | auto &buffer_manager = block_manager.buffer_manager; |
| 281 | auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>(); |
| 282 | if (block < MAXIMUM_BLOCK) { |
| 283 | // read the overflow string from disk |
| 284 | // pin the initial handle and read the length |
| 285 | auto block_handle = block_manager.RegisterBlock(block_id: block); |
| 286 | auto handle = buffer_manager.Pin(handle&: block_handle); |
| 287 | |
| 288 | // read header |
| 289 | uint32_t compressed_size = Load<uint32_t>(ptr: handle.Ptr() + offset); |
| 290 | uint32_t uncompressed_size = Load<uint32_t>(ptr: handle.Ptr() + offset + sizeof(uint32_t)); |
| 291 | uint32_t remaining = compressed_size; |
| 292 | offset += 2 * sizeof(uint32_t); |
| 293 | |
| 294 | data_ptr_t decompression_ptr; |
| 295 | unsafe_unique_array<data_t> decompression_buffer; |
| 296 | |
| 297 | // If string is in single block we decompress straight from it, else we copy first |
| 298 | if (remaining <= Storage::BLOCK_SIZE - sizeof(block_id_t) - offset) { |
| 299 | decompression_ptr = handle.Ptr() + offset; |
| 300 | } else { |
| 301 | decompression_buffer = make_unsafe_uniq_array<data_t>(n: compressed_size); |
| 302 | auto target_ptr = decompression_buffer.get(); |
| 303 | |
| 304 | // now append the string to the single buffer |
| 305 | while (remaining > 0) { |
| 306 | idx_t to_write = MinValue<idx_t>(a: remaining, b: Storage::BLOCK_SIZE - sizeof(block_id_t) - offset); |
| 307 | memcpy(dest: target_ptr, src: handle.Ptr() + offset, n: to_write); |
| 308 | |
| 309 | remaining -= to_write; |
| 310 | offset += to_write; |
| 311 | target_ptr += to_write; |
| 312 | if (remaining > 0) { |
| 313 | // read the next block |
| 314 | block_id_t next_block = Load<block_id_t>(ptr: handle.Ptr() + offset); |
| 315 | block_handle = block_manager.RegisterBlock(block_id: next_block); |
| 316 | handle = buffer_manager.Pin(handle&: block_handle); |
| 317 | offset = 0; |
| 318 | } |
| 319 | } |
| 320 | decompression_ptr = decompression_buffer.get(); |
| 321 | } |
| 322 | |
| 323 | // overflow strings on disk are gzipped, decompress here |
| 324 | auto decompressed_target_handle = |
| 325 | buffer_manager.Allocate(block_size: MaxValue<idx_t>(a: Storage::BLOCK_SIZE, b: uncompressed_size)); |
| 326 | auto decompressed_target_ptr = decompressed_target_handle.Ptr(); |
| 327 | MiniZStream s; |
| 328 | s.Decompress(compressed_data: const_char_ptr_cast(src: decompression_ptr), compressed_size, out_data: char_ptr_cast(src: decompressed_target_ptr), |
| 329 | out_size: uncompressed_size); |
| 330 | |
| 331 | auto final_buffer = decompressed_target_handle.Ptr(); |
| 332 | StringVector::AddHandle(vector&: result, handle: std::move(decompressed_target_handle)); |
| 333 | return ReadString(target: final_buffer, offset: 0, string_length: uncompressed_size); |
| 334 | } else { |
| 335 | // read the overflow string from memory |
| 336 | // first pin the handle, if it is not pinned yet |
| 337 | auto entry = state.overflow_blocks.find(x: block); |
| 338 | D_ASSERT(entry != state.overflow_blocks.end()); |
| 339 | auto handle = buffer_manager.Pin(handle&: entry->second->block); |
| 340 | auto final_buffer = handle.Ptr(); |
| 341 | StringVector::AddHandle(vector&: result, handle: std::move(handle)); |
| 342 | return ReadStringWithLength(target: final_buffer, offset); |
| 343 | } |
| 344 | } |
| 345 | |
| 346 | string_t UncompressedStringStorage::ReadString(data_ptr_t target, int32_t offset, uint32_t string_length) { |
| 347 | auto ptr = target + offset; |
| 348 | auto str_ptr = char_ptr_cast(src: ptr); |
| 349 | return string_t(str_ptr, string_length); |
| 350 | } |
| 351 | |
| 352 | string_t UncompressedStringStorage::ReadStringWithLength(data_ptr_t target, int32_t offset) { |
| 353 | auto ptr = target + offset; |
| 354 | auto str_length = Load<uint32_t>(ptr); |
| 355 | auto str_ptr = char_ptr_cast(src: ptr + sizeof(uint32_t)); |
| 356 | return string_t(str_ptr, str_length); |
| 357 | } |
| 358 | |
| 359 | void UncompressedStringStorage::WriteStringMarker(data_ptr_t target, block_id_t block_id, int32_t offset) { |
| 360 | memcpy(dest: target, src: &block_id, n: sizeof(block_id_t)); |
| 361 | target += sizeof(block_id_t); |
| 362 | memcpy(dest: target, src: &offset, n: sizeof(int32_t)); |
| 363 | } |
| 364 | |
| 365 | void UncompressedStringStorage::ReadStringMarker(data_ptr_t target, block_id_t &block_id, int32_t &offset) { |
| 366 | memcpy(dest: &block_id, src: target, n: sizeof(block_id_t)); |
| 367 | target += sizeof(block_id_t); |
| 368 | memcpy(dest: &offset, src: target, n: sizeof(int32_t)); |
| 369 | } |
| 370 | |
| 371 | string_location_t UncompressedStringStorage::FetchStringLocation(StringDictionaryContainer dict, data_ptr_t baseptr, |
| 372 | int32_t dict_offset) { |
| 373 | D_ASSERT(dict_offset >= -1 * Storage::BLOCK_SIZE && dict_offset <= Storage::BLOCK_SIZE); |
| 374 | if (dict_offset < 0) { |
| 375 | string_location_t result; |
| 376 | ReadStringMarker(target: baseptr + dict.end - (-1 * dict_offset), block_id&: result.block_id, offset&: result.offset); |
| 377 | return result; |
| 378 | } else { |
| 379 | return string_location_t(INVALID_BLOCK, dict_offset); |
| 380 | } |
| 381 | } |
| 382 | |
| 383 | string_t UncompressedStringStorage::FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, |
| 384 | Vector &result, data_ptr_t baseptr, int32_t dict_offset, |
| 385 | uint32_t string_length) { |
| 386 | // fetch base data |
| 387 | D_ASSERT(dict_offset <= Storage::BLOCK_SIZE); |
| 388 | string_location_t location = FetchStringLocation(dict, baseptr, dict_offset); |
| 389 | return FetchString(segment, dict, result, baseptr, location, string_length); |
| 390 | } |
| 391 | |
| 392 | string_t UncompressedStringStorage::FetchString(ColumnSegment &segment, StringDictionaryContainer dict, Vector &result, |
| 393 | data_ptr_t baseptr, string_location_t location, |
| 394 | uint32_t string_length) { |
| 395 | if (location.block_id != INVALID_BLOCK) { |
| 396 | // big string marker: read from separate block |
| 397 | return ReadOverflowString(segment, result, block: location.block_id, offset: location.offset); |
| 398 | } else { |
| 399 | if (location.offset == 0) { |
| 400 | return string_t(nullptr, 0); |
| 401 | } |
| 402 | // normal string: read string from this block |
| 403 | auto dict_end = baseptr + dict.end; |
| 404 | auto dict_pos = dict_end - location.offset; |
| 405 | |
| 406 | auto str_ptr = char_ptr_cast(src: dict_pos); |
| 407 | return string_t(str_ptr, string_length); |
| 408 | } |
| 409 | } |
| 410 | |
| 411 | } // namespace duckdb |
| 412 | |