1 | #include "duckdb/storage/string_uncompressed.hpp" |
2 | |
3 | #include "duckdb/common/pair.hpp" |
4 | #include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp" |
5 | #include "miniz_wrapper.hpp" |
6 | |
7 | namespace duckdb { |
8 | |
9 | //===--------------------------------------------------------------------===// |
10 | // Storage Class |
11 | //===--------------------------------------------------------------------===// |
12 | UncompressedStringSegmentState::~UncompressedStringSegmentState() { |
13 | while (head) { |
14 | // prevent deep recursion here |
15 | head = std::move(head->next); |
16 | } |
17 | } |
18 | |
19 | //===--------------------------------------------------------------------===// |
20 | // Analyze |
21 | //===--------------------------------------------------------------------===// |
22 | struct StringAnalyzeState : public AnalyzeState { |
23 | StringAnalyzeState() : count(0), total_string_size(0), overflow_strings(0) { |
24 | } |
25 | |
26 | idx_t count; |
27 | idx_t total_string_size; |
28 | idx_t overflow_strings; |
29 | }; |
30 | |
31 | unique_ptr<AnalyzeState> UncompressedStringStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { |
32 | return make_uniq<StringAnalyzeState>(); |
33 | } |
34 | |
35 | bool UncompressedStringStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { |
36 | auto &state = state_p.Cast<StringAnalyzeState>(); |
37 | UnifiedVectorFormat vdata; |
38 | input.ToUnifiedFormat(count, data&: vdata); |
39 | |
40 | state.count += count; |
41 | auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata); |
42 | for (idx_t i = 0; i < count; i++) { |
43 | auto idx = vdata.sel->get_index(idx: i); |
44 | if (vdata.validity.RowIsValid(row_idx: idx)) { |
45 | auto string_size = data[idx].GetSize(); |
46 | state.total_string_size += string_size; |
47 | if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) { |
48 | state.overflow_strings++; |
49 | } |
50 | } |
51 | } |
52 | return true; |
53 | } |
54 | |
55 | idx_t UncompressedStringStorage::StringFinalAnalyze(AnalyzeState &state_p) { |
56 | auto &state = state_p.Cast<StringAnalyzeState>(); |
57 | return state.count * sizeof(int32_t) + state.total_string_size + state.overflow_strings * BIG_STRING_MARKER_SIZE; |
58 | } |
59 | |
60 | //===--------------------------------------------------------------------===// |
61 | // Scan |
62 | //===--------------------------------------------------------------------===// |
63 | unique_ptr<SegmentScanState> UncompressedStringStorage::StringInitScan(ColumnSegment &segment) { |
64 | auto result = make_uniq<StringScanState>(); |
65 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
66 | result->handle = buffer_manager.Pin(handle&: segment.block); |
67 | return std::move(result); |
68 | } |
69 | |
70 | //===--------------------------------------------------------------------===// |
71 | // Scan base data |
72 | //===--------------------------------------------------------------------===// |
73 | void UncompressedStringStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
74 | Vector &result, idx_t result_offset) { |
75 | // clear any previously locked buffers and get the primary buffer handle |
76 | auto &scan_state = state.scan_state->Cast<StringScanState>(); |
77 | auto start = segment.GetRelativeIndex(row_index: state.row_index); |
78 | |
79 | auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
80 | auto dict = GetDictionary(segment, handle&: scan_state.handle); |
81 | auto base_data = reinterpret_cast<int32_t *>(baseptr + DICTIONARY_HEADER_SIZE); |
82 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
83 | |
84 | int32_t previous_offset = start > 0 ? base_data[start - 1] : 0; |
85 | |
86 | for (idx_t i = 0; i < scan_count; i++) { |
87 | // std::abs used since offsets can be negative to indicate big strings |
88 | uint32_t string_length = std::abs(x: base_data[start + i]) - std::abs(x: previous_offset); |
89 | result_data[result_offset + i] = |
90 | FetchStringFromDict(segment, dict, result, baseptr, dict_offset: base_data[start + i], string_length); |
91 | previous_offset = base_data[start + i]; |
92 | } |
93 | } |
94 | |
95 | void UncompressedStringStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, |
96 | Vector &result) { |
97 | StringScanPartial(segment, state, scan_count, result, result_offset: 0); |
98 | } |
99 | |
100 | //===--------------------------------------------------------------------===// |
101 | // Fetch |
102 | //===--------------------------------------------------------------------===// |
103 | BufferHandle &ColumnFetchState::GetOrInsertHandle(ColumnSegment &segment) { |
104 | auto primary_id = segment.block->BlockId(); |
105 | |
106 | auto entry = handles.find(x: primary_id); |
107 | if (entry == handles.end()) { |
108 | // not pinned yet: pin it |
109 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
110 | auto handle = buffer_manager.Pin(handle&: segment.block); |
111 | auto entry = handles.insert(x: make_pair(x&: primary_id, y: std::move(handle))); |
112 | return entry.first->second; |
113 | } else { |
114 | // already pinned: use the pinned handle |
115 | return entry->second; |
116 | } |
117 | } |
118 | |
119 | void UncompressedStringStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, |
120 | Vector &result, idx_t result_idx) { |
121 | // fetch a single row from the string segment |
122 | // first pin the main buffer if it is not already pinned |
123 | auto &handle = state.GetOrInsertHandle(segment); |
124 | |
125 | auto baseptr = handle.Ptr() + segment.GetBlockOffset(); |
126 | auto dict = GetDictionary(segment, handle); |
127 | auto base_data = reinterpret_cast<int32_t *>(baseptr + DICTIONARY_HEADER_SIZE); |
128 | auto result_data = FlatVector::GetData<string_t>(vector&: result); |
129 | |
130 | auto dict_offset = base_data[row_id]; |
131 | uint32_t string_length; |
132 | if ((idx_t)row_id == 0) { |
133 | // edge case where this is the first string in the dict |
134 | string_length = std::abs(x: dict_offset); |
135 | } else { |
136 | string_length = std::abs(x: dict_offset) - std::abs(x: base_data[row_id - 1]); |
137 | } |
138 | result_data[result_idx] = FetchStringFromDict(segment, dict, result, baseptr, dict_offset, string_length); |
139 | } |
140 | |
141 | //===--------------------------------------------------------------------===// |
142 | // Append |
143 | //===--------------------------------------------------------------------===// |
144 | |
145 | unique_ptr<CompressedSegmentState> UncompressedStringStorage::StringInitSegment(ColumnSegment &segment, |
146 | block_id_t block_id) { |
147 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
148 | if (block_id == INVALID_BLOCK) { |
149 | auto handle = buffer_manager.Pin(handle&: segment.block); |
150 | StringDictionaryContainer dictionary; |
151 | dictionary.size = 0; |
152 | dictionary.end = segment.SegmentSize(); |
153 | SetDictionary(segment, handle, dict: dictionary); |
154 | } |
155 | return make_uniq<UncompressedStringSegmentState>(); |
156 | } |
157 | |
158 | idx_t UncompressedStringStorage::FinalizeAppend(ColumnSegment &segment, SegmentStatistics &stats) { |
159 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
160 | auto handle = buffer_manager.Pin(handle&: segment.block); |
161 | auto dict = GetDictionary(segment, handle); |
162 | D_ASSERT(dict.end == segment.SegmentSize()); |
163 | // compute the total size required to store this segment |
164 | auto offset_size = DICTIONARY_HEADER_SIZE + segment.count * sizeof(int32_t); |
165 | auto total_size = offset_size + dict.size; |
166 | if (total_size >= COMPACTION_FLUSH_LIMIT) { |
167 | // the block is full enough, don't bother moving around the dictionary |
168 | return segment.SegmentSize(); |
169 | } |
170 | // the block has space left: figure out how much space we can save |
171 | auto move_amount = segment.SegmentSize() - total_size; |
172 | // move the dictionary so it lines up exactly with the offsets |
173 | auto dataptr = handle.Ptr(); |
174 | memmove(dest: dataptr + offset_size, src: dataptr + dict.end - dict.size, n: dict.size); |
175 | dict.end -= move_amount; |
176 | D_ASSERT(dict.end == total_size); |
177 | // write the new dictionary (with the updated "end") |
178 | SetDictionary(segment, handle, dict); |
179 | return total_size; |
180 | } |
181 | |
182 | //===--------------------------------------------------------------------===// |
183 | // Get Function |
184 | //===--------------------------------------------------------------------===// |
185 | CompressionFunction StringUncompressed::GetFunction(PhysicalType data_type) { |
186 | D_ASSERT(data_type == PhysicalType::VARCHAR); |
187 | return CompressionFunction(CompressionType::COMPRESSION_UNCOMPRESSED, data_type, |
188 | UncompressedStringStorage::StringInitAnalyze, UncompressedStringStorage::StringAnalyze, |
189 | UncompressedStringStorage::StringFinalAnalyze, UncompressedFunctions::InitCompression, |
190 | UncompressedFunctions::Compress, UncompressedFunctions::FinalizeCompress, |
191 | UncompressedStringStorage::StringInitScan, UncompressedStringStorage::StringScan, |
192 | UncompressedStringStorage::StringScanPartial, UncompressedStringStorage::StringFetchRow, |
193 | UncompressedFunctions::EmptySkip, UncompressedStringStorage::StringInitSegment, |
194 | UncompressedStringStorage::StringInitAppend, UncompressedStringStorage::StringAppend, |
195 | UncompressedStringStorage::FinalizeAppend); |
196 | } |
197 | |
198 | //===--------------------------------------------------------------------===// |
199 | // Helper Functions |
200 | //===--------------------------------------------------------------------===// |
201 | void UncompressedStringStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle, |
202 | StringDictionaryContainer container) { |
203 | auto startptr = handle.Ptr() + segment.GetBlockOffset(); |
204 | Store<uint32_t>(val: container.size, ptr: startptr); |
205 | Store<uint32_t>(val: container.end, ptr: startptr + sizeof(uint32_t)); |
206 | } |
207 | |
208 | StringDictionaryContainer UncompressedStringStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) { |
209 | auto startptr = handle.Ptr() + segment.GetBlockOffset(); |
210 | StringDictionaryContainer container; |
211 | container.size = Load<uint32_t>(ptr: startptr); |
212 | container.end = Load<uint32_t>(ptr: startptr + sizeof(uint32_t)); |
213 | return container; |
214 | } |
215 | |
216 | idx_t UncompressedStringStorage::RemainingSpace(ColumnSegment &segment, BufferHandle &handle) { |
217 | auto dictionary = GetDictionary(segment, handle); |
218 | D_ASSERT(dictionary.end == segment.SegmentSize()); |
219 | idx_t used_space = dictionary.size + segment.count * sizeof(int32_t) + DICTIONARY_HEADER_SIZE; |
220 | D_ASSERT(segment.SegmentSize() >= used_space); |
221 | return segment.SegmentSize() - used_space; |
222 | } |
223 | |
224 | void UncompressedStringStorage::WriteString(ColumnSegment &segment, string_t string, block_id_t &result_block, |
225 | int32_t &result_offset) { |
226 | auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>(); |
227 | if (state.overflow_writer) { |
228 | // overflow writer is set: write string there |
229 | state.overflow_writer->WriteString(string, result_block, result_offset); |
230 | } else { |
231 | // default overflow behavior: use in-memory buffer to store the overflow string |
232 | WriteStringMemory(segment, string, result_block, result_offset); |
233 | } |
234 | } |
235 | |
236 | void UncompressedStringStorage::WriteStringMemory(ColumnSegment &segment, string_t string, block_id_t &result_block, |
237 | int32_t &result_offset) { |
238 | uint32_t total_length = string.GetSize() + sizeof(uint32_t); |
239 | shared_ptr<BlockHandle> block; |
240 | BufferHandle handle; |
241 | |
242 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
243 | auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>(); |
244 | // check if the string fits in the current block |
245 | if (!state.head || state.head->offset + total_length >= state.head->size) { |
246 | // string does not fit, allocate space for it |
247 | // create a new string block |
248 | idx_t alloc_size = MaxValue<idx_t>(a: total_length, b: Storage::BLOCK_SIZE); |
249 | auto new_block = make_uniq<StringBlock>(); |
250 | new_block->offset = 0; |
251 | new_block->size = alloc_size; |
252 | // allocate an in-memory buffer for it |
253 | handle = buffer_manager.Allocate(block_size: alloc_size, can_destroy: false, block: &block); |
254 | state.overflow_blocks[block->BlockId()] = new_block.get(); |
255 | new_block->block = std::move(block); |
256 | new_block->next = std::move(state.head); |
257 | state.head = std::move(new_block); |
258 | } else { |
259 | // string fits, copy it into the current block |
260 | handle = buffer_manager.Pin(handle&: state.head->block); |
261 | } |
262 | |
263 | result_block = state.head->block->BlockId(); |
264 | result_offset = state.head->offset; |
265 | |
266 | // copy the string and the length there |
267 | auto ptr = handle.Ptr() + state.head->offset; |
268 | Store<uint32_t>(val: string.GetSize(), ptr); |
269 | ptr += sizeof(uint32_t); |
270 | memcpy(dest: ptr, src: string.GetData(), n: string.GetSize()); |
271 | state.head->offset += total_length; |
272 | } |
273 | |
274 | string_t UncompressedStringStorage::ReadOverflowString(ColumnSegment &segment, Vector &result, block_id_t block, |
275 | int32_t offset) { |
276 | D_ASSERT(block != INVALID_BLOCK); |
277 | D_ASSERT(offset < Storage::BLOCK_SIZE); |
278 | |
279 | auto &block_manager = segment.GetBlockManager(); |
280 | auto &buffer_manager = block_manager.buffer_manager; |
281 | auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>(); |
282 | if (block < MAXIMUM_BLOCK) { |
283 | // read the overflow string from disk |
284 | // pin the initial handle and read the length |
285 | auto block_handle = block_manager.RegisterBlock(block_id: block); |
286 | auto handle = buffer_manager.Pin(handle&: block_handle); |
287 | |
288 | // read header |
289 | uint32_t compressed_size = Load<uint32_t>(ptr: handle.Ptr() + offset); |
290 | uint32_t uncompressed_size = Load<uint32_t>(ptr: handle.Ptr() + offset + sizeof(uint32_t)); |
291 | uint32_t remaining = compressed_size; |
292 | offset += 2 * sizeof(uint32_t); |
293 | |
294 | data_ptr_t decompression_ptr; |
295 | unsafe_unique_array<data_t> decompression_buffer; |
296 | |
297 | // If string is in single block we decompress straight from it, else we copy first |
298 | if (remaining <= Storage::BLOCK_SIZE - sizeof(block_id_t) - offset) { |
299 | decompression_ptr = handle.Ptr() + offset; |
300 | } else { |
301 | decompression_buffer = make_unsafe_uniq_array<data_t>(n: compressed_size); |
302 | auto target_ptr = decompression_buffer.get(); |
303 | |
304 | // now append the string to the single buffer |
305 | while (remaining > 0) { |
306 | idx_t to_write = MinValue<idx_t>(a: remaining, b: Storage::BLOCK_SIZE - sizeof(block_id_t) - offset); |
307 | memcpy(dest: target_ptr, src: handle.Ptr() + offset, n: to_write); |
308 | |
309 | remaining -= to_write; |
310 | offset += to_write; |
311 | target_ptr += to_write; |
312 | if (remaining > 0) { |
313 | // read the next block |
314 | block_id_t next_block = Load<block_id_t>(ptr: handle.Ptr() + offset); |
315 | block_handle = block_manager.RegisterBlock(block_id: next_block); |
316 | handle = buffer_manager.Pin(handle&: block_handle); |
317 | offset = 0; |
318 | } |
319 | } |
320 | decompression_ptr = decompression_buffer.get(); |
321 | } |
322 | |
323 | // overflow strings on disk are gzipped, decompress here |
324 | auto decompressed_target_handle = |
325 | buffer_manager.Allocate(block_size: MaxValue<idx_t>(a: Storage::BLOCK_SIZE, b: uncompressed_size)); |
326 | auto decompressed_target_ptr = decompressed_target_handle.Ptr(); |
327 | MiniZStream s; |
328 | s.Decompress(compressed_data: const_char_ptr_cast(src: decompression_ptr), compressed_size, out_data: char_ptr_cast(src: decompressed_target_ptr), |
329 | out_size: uncompressed_size); |
330 | |
331 | auto final_buffer = decompressed_target_handle.Ptr(); |
332 | StringVector::AddHandle(vector&: result, handle: std::move(decompressed_target_handle)); |
333 | return ReadString(target: final_buffer, offset: 0, string_length: uncompressed_size); |
334 | } else { |
335 | // read the overflow string from memory |
336 | // first pin the handle, if it is not pinned yet |
337 | auto entry = state.overflow_blocks.find(x: block); |
338 | D_ASSERT(entry != state.overflow_blocks.end()); |
339 | auto handle = buffer_manager.Pin(handle&: entry->second->block); |
340 | auto final_buffer = handle.Ptr(); |
341 | StringVector::AddHandle(vector&: result, handle: std::move(handle)); |
342 | return ReadStringWithLength(target: final_buffer, offset); |
343 | } |
344 | } |
345 | |
346 | string_t UncompressedStringStorage::ReadString(data_ptr_t target, int32_t offset, uint32_t string_length) { |
347 | auto ptr = target + offset; |
348 | auto str_ptr = char_ptr_cast(src: ptr); |
349 | return string_t(str_ptr, string_length); |
350 | } |
351 | |
352 | string_t UncompressedStringStorage::ReadStringWithLength(data_ptr_t target, int32_t offset) { |
353 | auto ptr = target + offset; |
354 | auto str_length = Load<uint32_t>(ptr); |
355 | auto str_ptr = char_ptr_cast(src: ptr + sizeof(uint32_t)); |
356 | return string_t(str_ptr, str_length); |
357 | } |
358 | |
359 | void UncompressedStringStorage::WriteStringMarker(data_ptr_t target, block_id_t block_id, int32_t offset) { |
360 | memcpy(dest: target, src: &block_id, n: sizeof(block_id_t)); |
361 | target += sizeof(block_id_t); |
362 | memcpy(dest: target, src: &offset, n: sizeof(int32_t)); |
363 | } |
364 | |
365 | void UncompressedStringStorage::ReadStringMarker(data_ptr_t target, block_id_t &block_id, int32_t &offset) { |
366 | memcpy(dest: &block_id, src: target, n: sizeof(block_id_t)); |
367 | target += sizeof(block_id_t); |
368 | memcpy(dest: &offset, src: target, n: sizeof(int32_t)); |
369 | } |
370 | |
371 | string_location_t UncompressedStringStorage::FetchStringLocation(StringDictionaryContainer dict, data_ptr_t baseptr, |
372 | int32_t dict_offset) { |
373 | D_ASSERT(dict_offset >= -1 * Storage::BLOCK_SIZE && dict_offset <= Storage::BLOCK_SIZE); |
374 | if (dict_offset < 0) { |
375 | string_location_t result; |
376 | ReadStringMarker(target: baseptr + dict.end - (-1 * dict_offset), block_id&: result.block_id, offset&: result.offset); |
377 | return result; |
378 | } else { |
379 | return string_location_t(INVALID_BLOCK, dict_offset); |
380 | } |
381 | } |
382 | |
383 | string_t UncompressedStringStorage::FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict, |
384 | Vector &result, data_ptr_t baseptr, int32_t dict_offset, |
385 | uint32_t string_length) { |
386 | // fetch base data |
387 | D_ASSERT(dict_offset <= Storage::BLOCK_SIZE); |
388 | string_location_t location = FetchStringLocation(dict, baseptr, dict_offset); |
389 | return FetchString(segment, dict, result, baseptr, location, string_length); |
390 | } |
391 | |
392 | string_t UncompressedStringStorage::FetchString(ColumnSegment &segment, StringDictionaryContainer dict, Vector &result, |
393 | data_ptr_t baseptr, string_location_t location, |
394 | uint32_t string_length) { |
395 | if (location.block_id != INVALID_BLOCK) { |
396 | // big string marker: read from separate block |
397 | return ReadOverflowString(segment, result, block: location.block_id, offset: location.offset); |
398 | } else { |
399 | if (location.offset == 0) { |
400 | return string_t(nullptr, 0); |
401 | } |
402 | // normal string: read string from this block |
403 | auto dict_end = baseptr + dict.end; |
404 | auto dict_pos = dict_end - location.offset; |
405 | |
406 | auto str_ptr = char_ptr_cast(src: dict_pos); |
407 | return string_t(str_ptr, string_length); |
408 | } |
409 | } |
410 | |
411 | } // namespace duckdb |
412 | |