1#include "duckdb/storage/string_uncompressed.hpp"
2
3#include "duckdb/common/pair.hpp"
4#include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp"
5#include "miniz_wrapper.hpp"
6
7namespace duckdb {
8
9//===--------------------------------------------------------------------===//
10// Storage Class
11//===--------------------------------------------------------------------===//
12UncompressedStringSegmentState::~UncompressedStringSegmentState() {
13 while (head) {
14 // prevent deep recursion here
15 head = std::move(head->next);
16 }
17}
18
19//===--------------------------------------------------------------------===//
20// Analyze
21//===--------------------------------------------------------------------===//
22struct StringAnalyzeState : public AnalyzeState {
23 StringAnalyzeState() : count(0), total_string_size(0), overflow_strings(0) {
24 }
25
26 idx_t count;
27 idx_t total_string_size;
28 idx_t overflow_strings;
29};
30
31unique_ptr<AnalyzeState> UncompressedStringStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) {
32 return make_uniq<StringAnalyzeState>();
33}
34
35bool UncompressedStringStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) {
36 auto &state = state_p.Cast<StringAnalyzeState>();
37 UnifiedVectorFormat vdata;
38 input.ToUnifiedFormat(count, data&: vdata);
39
40 state.count += count;
41 auto data = UnifiedVectorFormat::GetData<string_t>(format: vdata);
42 for (idx_t i = 0; i < count; i++) {
43 auto idx = vdata.sel->get_index(idx: i);
44 if (vdata.validity.RowIsValid(row_idx: idx)) {
45 auto string_size = data[idx].GetSize();
46 state.total_string_size += string_size;
47 if (string_size >= StringUncompressed::STRING_BLOCK_LIMIT) {
48 state.overflow_strings++;
49 }
50 }
51 }
52 return true;
53}
54
55idx_t UncompressedStringStorage::StringFinalAnalyze(AnalyzeState &state_p) {
56 auto &state = state_p.Cast<StringAnalyzeState>();
57 return state.count * sizeof(int32_t) + state.total_string_size + state.overflow_strings * BIG_STRING_MARKER_SIZE;
58}
59
60//===--------------------------------------------------------------------===//
61// Scan
62//===--------------------------------------------------------------------===//
63unique_ptr<SegmentScanState> UncompressedStringStorage::StringInitScan(ColumnSegment &segment) {
64 auto result = make_uniq<StringScanState>();
65 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
66 result->handle = buffer_manager.Pin(handle&: segment.block);
67 return std::move(result);
68}
69
70//===--------------------------------------------------------------------===//
71// Scan base data
72//===--------------------------------------------------------------------===//
73void UncompressedStringStorage::StringScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count,
74 Vector &result, idx_t result_offset) {
75 // clear any previously locked buffers and get the primary buffer handle
76 auto &scan_state = state.scan_state->Cast<StringScanState>();
77 auto start = segment.GetRelativeIndex(row_index: state.row_index);
78
79 auto baseptr = scan_state.handle.Ptr() + segment.GetBlockOffset();
80 auto dict = GetDictionary(segment, handle&: scan_state.handle);
81 auto base_data = reinterpret_cast<int32_t *>(baseptr + DICTIONARY_HEADER_SIZE);
82 auto result_data = FlatVector::GetData<string_t>(vector&: result);
83
84 int32_t previous_offset = start > 0 ? base_data[start - 1] : 0;
85
86 for (idx_t i = 0; i < scan_count; i++) {
87 // std::abs used since offsets can be negative to indicate big strings
88 uint32_t string_length = std::abs(x: base_data[start + i]) - std::abs(x: previous_offset);
89 result_data[result_offset + i] =
90 FetchStringFromDict(segment, dict, result, baseptr, dict_offset: base_data[start + i], string_length);
91 previous_offset = base_data[start + i];
92 }
93}
94
95void UncompressedStringStorage::StringScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count,
96 Vector &result) {
97 StringScanPartial(segment, state, scan_count, result, result_offset: 0);
98}
99
100//===--------------------------------------------------------------------===//
101// Fetch
102//===--------------------------------------------------------------------===//
103BufferHandle &ColumnFetchState::GetOrInsertHandle(ColumnSegment &segment) {
104 auto primary_id = segment.block->BlockId();
105
106 auto entry = handles.find(x: primary_id);
107 if (entry == handles.end()) {
108 // not pinned yet: pin it
109 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
110 auto handle = buffer_manager.Pin(handle&: segment.block);
111 auto entry = handles.insert(x: make_pair(x&: primary_id, y: std::move(handle)));
112 return entry.first->second;
113 } else {
114 // already pinned: use the pinned handle
115 return entry->second;
116 }
117}
118
119void UncompressedStringStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id,
120 Vector &result, idx_t result_idx) {
121 // fetch a single row from the string segment
122 // first pin the main buffer if it is not already pinned
123 auto &handle = state.GetOrInsertHandle(segment);
124
125 auto baseptr = handle.Ptr() + segment.GetBlockOffset();
126 auto dict = GetDictionary(segment, handle);
127 auto base_data = reinterpret_cast<int32_t *>(baseptr + DICTIONARY_HEADER_SIZE);
128 auto result_data = FlatVector::GetData<string_t>(vector&: result);
129
130 auto dict_offset = base_data[row_id];
131 uint32_t string_length;
132 if ((idx_t)row_id == 0) {
133 // edge case where this is the first string in the dict
134 string_length = std::abs(x: dict_offset);
135 } else {
136 string_length = std::abs(x: dict_offset) - std::abs(x: base_data[row_id - 1]);
137 }
138 result_data[result_idx] = FetchStringFromDict(segment, dict, result, baseptr, dict_offset, string_length);
139}
140
141//===--------------------------------------------------------------------===//
142// Append
143//===--------------------------------------------------------------------===//
144
145unique_ptr<CompressedSegmentState> UncompressedStringStorage::StringInitSegment(ColumnSegment &segment,
146 block_id_t block_id) {
147 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
148 if (block_id == INVALID_BLOCK) {
149 auto handle = buffer_manager.Pin(handle&: segment.block);
150 StringDictionaryContainer dictionary;
151 dictionary.size = 0;
152 dictionary.end = segment.SegmentSize();
153 SetDictionary(segment, handle, dict: dictionary);
154 }
155 return make_uniq<UncompressedStringSegmentState>();
156}
157
158idx_t UncompressedStringStorage::FinalizeAppend(ColumnSegment &segment, SegmentStatistics &stats) {
159 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
160 auto handle = buffer_manager.Pin(handle&: segment.block);
161 auto dict = GetDictionary(segment, handle);
162 D_ASSERT(dict.end == segment.SegmentSize());
163 // compute the total size required to store this segment
164 auto offset_size = DICTIONARY_HEADER_SIZE + segment.count * sizeof(int32_t);
165 auto total_size = offset_size + dict.size;
166 if (total_size >= COMPACTION_FLUSH_LIMIT) {
167 // the block is full enough, don't bother moving around the dictionary
168 return segment.SegmentSize();
169 }
170 // the block has space left: figure out how much space we can save
171 auto move_amount = segment.SegmentSize() - total_size;
172 // move the dictionary so it lines up exactly with the offsets
173 auto dataptr = handle.Ptr();
174 memmove(dest: dataptr + offset_size, src: dataptr + dict.end - dict.size, n: dict.size);
175 dict.end -= move_amount;
176 D_ASSERT(dict.end == total_size);
177 // write the new dictionary (with the updated "end")
178 SetDictionary(segment, handle, dict);
179 return total_size;
180}
181
182//===--------------------------------------------------------------------===//
183// Get Function
184//===--------------------------------------------------------------------===//
185CompressionFunction StringUncompressed::GetFunction(PhysicalType data_type) {
186 D_ASSERT(data_type == PhysicalType::VARCHAR);
187 return CompressionFunction(CompressionType::COMPRESSION_UNCOMPRESSED, data_type,
188 UncompressedStringStorage::StringInitAnalyze, UncompressedStringStorage::StringAnalyze,
189 UncompressedStringStorage::StringFinalAnalyze, UncompressedFunctions::InitCompression,
190 UncompressedFunctions::Compress, UncompressedFunctions::FinalizeCompress,
191 UncompressedStringStorage::StringInitScan, UncompressedStringStorage::StringScan,
192 UncompressedStringStorage::StringScanPartial, UncompressedStringStorage::StringFetchRow,
193 UncompressedFunctions::EmptySkip, UncompressedStringStorage::StringInitSegment,
194 UncompressedStringStorage::StringInitAppend, UncompressedStringStorage::StringAppend,
195 UncompressedStringStorage::FinalizeAppend);
196}
197
198//===--------------------------------------------------------------------===//
199// Helper Functions
200//===--------------------------------------------------------------------===//
201void UncompressedStringStorage::SetDictionary(ColumnSegment &segment, BufferHandle &handle,
202 StringDictionaryContainer container) {
203 auto startptr = handle.Ptr() + segment.GetBlockOffset();
204 Store<uint32_t>(val: container.size, ptr: startptr);
205 Store<uint32_t>(val: container.end, ptr: startptr + sizeof(uint32_t));
206}
207
208StringDictionaryContainer UncompressedStringStorage::GetDictionary(ColumnSegment &segment, BufferHandle &handle) {
209 auto startptr = handle.Ptr() + segment.GetBlockOffset();
210 StringDictionaryContainer container;
211 container.size = Load<uint32_t>(ptr: startptr);
212 container.end = Load<uint32_t>(ptr: startptr + sizeof(uint32_t));
213 return container;
214}
215
216idx_t UncompressedStringStorage::RemainingSpace(ColumnSegment &segment, BufferHandle &handle) {
217 auto dictionary = GetDictionary(segment, handle);
218 D_ASSERT(dictionary.end == segment.SegmentSize());
219 idx_t used_space = dictionary.size + segment.count * sizeof(int32_t) + DICTIONARY_HEADER_SIZE;
220 D_ASSERT(segment.SegmentSize() >= used_space);
221 return segment.SegmentSize() - used_space;
222}
223
224void UncompressedStringStorage::WriteString(ColumnSegment &segment, string_t string, block_id_t &result_block,
225 int32_t &result_offset) {
226 auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>();
227 if (state.overflow_writer) {
228 // overflow writer is set: write string there
229 state.overflow_writer->WriteString(string, result_block, result_offset);
230 } else {
231 // default overflow behavior: use in-memory buffer to store the overflow string
232 WriteStringMemory(segment, string, result_block, result_offset);
233 }
234}
235
236void UncompressedStringStorage::WriteStringMemory(ColumnSegment &segment, string_t string, block_id_t &result_block,
237 int32_t &result_offset) {
238 uint32_t total_length = string.GetSize() + sizeof(uint32_t);
239 shared_ptr<BlockHandle> block;
240 BufferHandle handle;
241
242 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
243 auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>();
244 // check if the string fits in the current block
245 if (!state.head || state.head->offset + total_length >= state.head->size) {
246 // string does not fit, allocate space for it
247 // create a new string block
248 idx_t alloc_size = MaxValue<idx_t>(a: total_length, b: Storage::BLOCK_SIZE);
249 auto new_block = make_uniq<StringBlock>();
250 new_block->offset = 0;
251 new_block->size = alloc_size;
252 // allocate an in-memory buffer for it
253 handle = buffer_manager.Allocate(block_size: alloc_size, can_destroy: false, block: &block);
254 state.overflow_blocks[block->BlockId()] = new_block.get();
255 new_block->block = std::move(block);
256 new_block->next = std::move(state.head);
257 state.head = std::move(new_block);
258 } else {
259 // string fits, copy it into the current block
260 handle = buffer_manager.Pin(handle&: state.head->block);
261 }
262
263 result_block = state.head->block->BlockId();
264 result_offset = state.head->offset;
265
266 // copy the string and the length there
267 auto ptr = handle.Ptr() + state.head->offset;
268 Store<uint32_t>(val: string.GetSize(), ptr);
269 ptr += sizeof(uint32_t);
270 memcpy(dest: ptr, src: string.GetData(), n: string.GetSize());
271 state.head->offset += total_length;
272}
273
274string_t UncompressedStringStorage::ReadOverflowString(ColumnSegment &segment, Vector &result, block_id_t block,
275 int32_t offset) {
276 D_ASSERT(block != INVALID_BLOCK);
277 D_ASSERT(offset < Storage::BLOCK_SIZE);
278
279 auto &block_manager = segment.GetBlockManager();
280 auto &buffer_manager = block_manager.buffer_manager;
281 auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>();
282 if (block < MAXIMUM_BLOCK) {
283 // read the overflow string from disk
284 // pin the initial handle and read the length
285 auto block_handle = block_manager.RegisterBlock(block_id: block);
286 auto handle = buffer_manager.Pin(handle&: block_handle);
287
288 // read header
289 uint32_t compressed_size = Load<uint32_t>(ptr: handle.Ptr() + offset);
290 uint32_t uncompressed_size = Load<uint32_t>(ptr: handle.Ptr() + offset + sizeof(uint32_t));
291 uint32_t remaining = compressed_size;
292 offset += 2 * sizeof(uint32_t);
293
294 data_ptr_t decompression_ptr;
295 unsafe_unique_array<data_t> decompression_buffer;
296
297 // If string is in single block we decompress straight from it, else we copy first
298 if (remaining <= Storage::BLOCK_SIZE - sizeof(block_id_t) - offset) {
299 decompression_ptr = handle.Ptr() + offset;
300 } else {
301 decompression_buffer = make_unsafe_uniq_array<data_t>(n: compressed_size);
302 auto target_ptr = decompression_buffer.get();
303
304 // now append the string to the single buffer
305 while (remaining > 0) {
306 idx_t to_write = MinValue<idx_t>(a: remaining, b: Storage::BLOCK_SIZE - sizeof(block_id_t) - offset);
307 memcpy(dest: target_ptr, src: handle.Ptr() + offset, n: to_write);
308
309 remaining -= to_write;
310 offset += to_write;
311 target_ptr += to_write;
312 if (remaining > 0) {
313 // read the next block
314 block_id_t next_block = Load<block_id_t>(ptr: handle.Ptr() + offset);
315 block_handle = block_manager.RegisterBlock(block_id: next_block);
316 handle = buffer_manager.Pin(handle&: block_handle);
317 offset = 0;
318 }
319 }
320 decompression_ptr = decompression_buffer.get();
321 }
322
323 // overflow strings on disk are gzipped, decompress here
324 auto decompressed_target_handle =
325 buffer_manager.Allocate(block_size: MaxValue<idx_t>(a: Storage::BLOCK_SIZE, b: uncompressed_size));
326 auto decompressed_target_ptr = decompressed_target_handle.Ptr();
327 MiniZStream s;
328 s.Decompress(compressed_data: const_char_ptr_cast(src: decompression_ptr), compressed_size, out_data: char_ptr_cast(src: decompressed_target_ptr),
329 out_size: uncompressed_size);
330
331 auto final_buffer = decompressed_target_handle.Ptr();
332 StringVector::AddHandle(vector&: result, handle: std::move(decompressed_target_handle));
333 return ReadString(target: final_buffer, offset: 0, string_length: uncompressed_size);
334 } else {
335 // read the overflow string from memory
336 // first pin the handle, if it is not pinned yet
337 auto entry = state.overflow_blocks.find(x: block);
338 D_ASSERT(entry != state.overflow_blocks.end());
339 auto handle = buffer_manager.Pin(handle&: entry->second->block);
340 auto final_buffer = handle.Ptr();
341 StringVector::AddHandle(vector&: result, handle: std::move(handle));
342 return ReadStringWithLength(target: final_buffer, offset);
343 }
344}
345
346string_t UncompressedStringStorage::ReadString(data_ptr_t target, int32_t offset, uint32_t string_length) {
347 auto ptr = target + offset;
348 auto str_ptr = char_ptr_cast(src: ptr);
349 return string_t(str_ptr, string_length);
350}
351
352string_t UncompressedStringStorage::ReadStringWithLength(data_ptr_t target, int32_t offset) {
353 auto ptr = target + offset;
354 auto str_length = Load<uint32_t>(ptr);
355 auto str_ptr = char_ptr_cast(src: ptr + sizeof(uint32_t));
356 return string_t(str_ptr, str_length);
357}
358
359void UncompressedStringStorage::WriteStringMarker(data_ptr_t target, block_id_t block_id, int32_t offset) {
360 memcpy(dest: target, src: &block_id, n: sizeof(block_id_t));
361 target += sizeof(block_id_t);
362 memcpy(dest: target, src: &offset, n: sizeof(int32_t));
363}
364
365void UncompressedStringStorage::ReadStringMarker(data_ptr_t target, block_id_t &block_id, int32_t &offset) {
366 memcpy(dest: &block_id, src: target, n: sizeof(block_id_t));
367 target += sizeof(block_id_t);
368 memcpy(dest: &offset, src: target, n: sizeof(int32_t));
369}
370
371string_location_t UncompressedStringStorage::FetchStringLocation(StringDictionaryContainer dict, data_ptr_t baseptr,
372 int32_t dict_offset) {
373 D_ASSERT(dict_offset >= -1 * Storage::BLOCK_SIZE && dict_offset <= Storage::BLOCK_SIZE);
374 if (dict_offset < 0) {
375 string_location_t result;
376 ReadStringMarker(target: baseptr + dict.end - (-1 * dict_offset), block_id&: result.block_id, offset&: result.offset);
377 return result;
378 } else {
379 return string_location_t(INVALID_BLOCK, dict_offset);
380 }
381}
382
383string_t UncompressedStringStorage::FetchStringFromDict(ColumnSegment &segment, StringDictionaryContainer dict,
384 Vector &result, data_ptr_t baseptr, int32_t dict_offset,
385 uint32_t string_length) {
386 // fetch base data
387 D_ASSERT(dict_offset <= Storage::BLOCK_SIZE);
388 string_location_t location = FetchStringLocation(dict, baseptr, dict_offset);
389 return FetchString(segment, dict, result, baseptr, location, string_length);
390}
391
392string_t UncompressedStringStorage::FetchString(ColumnSegment &segment, StringDictionaryContainer dict, Vector &result,
393 data_ptr_t baseptr, string_location_t location,
394 uint32_t string_length) {
395 if (location.block_id != INVALID_BLOCK) {
396 // big string marker: read from separate block
397 return ReadOverflowString(segment, result, block: location.block_id, offset: location.offset);
398 } else {
399 if (location.offset == 0) {
400 return string_t(nullptr, 0);
401 }
402 // normal string: read string from this block
403 auto dict_end = baseptr + dict.end;
404 auto dict_pos = dict_end - location.offset;
405
406 auto str_ptr = char_ptr_cast(src: dict_pos);
407 return string_t(str_ptr, string_length);
408 }
409}
410
411} // namespace duckdb
412