1 | #include "duckdb/common/bitpacking.hpp" |
2 | |
3 | #include "duckdb/common/limits.hpp" |
4 | #include "duckdb/function/compression/compression.hpp" |
5 | #include "duckdb/function/compression_function.hpp" |
6 | #include "duckdb/main/config.hpp" |
7 | #include "duckdb/storage/buffer_manager.hpp" |
8 | |
9 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
10 | #include "duckdb/storage/table/column_segment.hpp" |
11 | #include "duckdb/common/operator/subtract.hpp" |
12 | #include "duckdb/storage/compression/bitpacking.hpp" |
13 | #include "duckdb/storage/table/scan_state.hpp" |
14 | |
15 | #include <functional> |
16 | |
17 | namespace duckdb { |
18 | |
19 | static constexpr const idx_t BITPACKING_METADATA_GROUP_SIZE = STANDARD_VECTOR_SIZE > 512 ? STANDARD_VECTOR_SIZE : 2048; |
20 | |
21 | BitpackingMode BitpackingModeFromString(const string &str) { |
22 | auto mode = StringUtil::Lower(str); |
23 | |
24 | if (mode == "auto" ) { |
25 | return BitpackingMode::AUTO; |
26 | } else if (mode == "constant" ) { |
27 | return BitpackingMode::CONSTANT; |
28 | } else if (mode == "constant_delta" ) { |
29 | return BitpackingMode::CONSTANT_DELTA; |
30 | } else if (mode == "delta_for" ) { |
31 | return BitpackingMode::DELTA_FOR; |
32 | } else if (mode == "for" ) { |
33 | return BitpackingMode::FOR; |
34 | } else { |
35 | return BitpackingMode::AUTO; |
36 | } |
37 | } |
38 | |
39 | string BitpackingModeToString(const BitpackingMode &mode) { |
40 | switch (mode) { |
41 | case (BitpackingMode::AUTO): |
42 | return "auto" ; |
43 | case (BitpackingMode::CONSTANT): |
44 | return "constant" ; |
45 | case (BitpackingMode::CONSTANT_DELTA): |
46 | return "constant_delta" ; |
47 | case (BitpackingMode::DELTA_FOR): |
48 | return "delta_for" ; |
49 | case (BitpackingMode::FOR): |
50 | return "for" ; |
51 | default: |
52 | throw NotImplementedException("Unknown bitpacking mode: " + to_string(val: (uint8_t)mode) + "\n" ); |
53 | } |
54 | } |
55 | |
56 | typedef struct { |
57 | BitpackingMode mode; |
58 | uint32_t offset; |
59 | } bitpacking_metadata_t; |
60 | |
61 | typedef uint32_t bitpacking_metadata_encoded_t; |
62 | |
63 | static bitpacking_metadata_encoded_t EncodeMeta(bitpacking_metadata_t metadata) { |
64 | D_ASSERT(metadata.offset <= 16777215); // max uint24_t |
65 | bitpacking_metadata_encoded_t encoded_value = metadata.offset; |
66 | encoded_value |= (uint8_t)metadata.mode << 24; |
67 | return encoded_value; |
68 | } |
69 | static bitpacking_metadata_t DecodeMeta(bitpacking_metadata_encoded_t *metadata_encoded) { |
70 | bitpacking_metadata_t metadata; |
71 | metadata.mode = Load<BitpackingMode>(ptr: data_ptr_cast(src: metadata_encoded) + 3); |
72 | metadata.offset = *metadata_encoded & 0x00FFFFFF; |
73 | return metadata; |
74 | } |
75 | |
76 | struct EmptyBitpackingWriter { |
77 | template <class T> |
78 | static void WriteConstant(T constant, idx_t count, void *data_ptr, bool all_invalid) { |
79 | } |
80 | template <class T, class T_S = typename std::make_signed<T>::type> |
81 | static void WriteConstantDelta(T_S constant, T frame_of_reference, idx_t count, T *values, bool *validity, |
82 | void *data_ptr) { |
83 | } |
84 | template <class T, class T_S = typename std::make_signed<T>::type> |
85 | static void WriteDeltaFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference, |
86 | T_S delta_offset, T *original_values, idx_t count, void *data_ptr) { |
87 | } |
88 | template <class T> |
89 | static void WriteFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference, idx_t count, |
90 | void *data_ptr) { |
91 | } |
92 | }; |
93 | |
94 | template <class T, class T_U = typename std::make_unsigned<T>::type, class T_S = typename std::make_signed<T>::type> |
95 | struct BitpackingState { |
96 | public: |
97 | BitpackingState() : compression_buffer_idx(0), total_size(0), data_ptr(nullptr) { |
98 | compression_buffer_internal[0] = (T)0; |
99 | compression_buffer = &compression_buffer_internal[1]; |
100 | Reset(); |
101 | } |
102 | |
103 | // Extra val for delta encoding |
104 | T compression_buffer_internal[BITPACKING_METADATA_GROUP_SIZE + 1]; |
105 | T *compression_buffer; |
106 | T_S delta_buffer[BITPACKING_METADATA_GROUP_SIZE]; |
107 | bool compression_buffer_validity[BITPACKING_METADATA_GROUP_SIZE]; |
108 | idx_t compression_buffer_idx; |
109 | idx_t total_size; |
110 | |
111 | // Used to pass CompressionState ptr through the Bitpacking writer |
112 | void *data_ptr; |
113 | |
114 | // Stats on current compression buffer |
115 | T minimum; |
116 | T maximum; |
117 | T min_max_diff; |
118 | T_S minimum_delta; |
119 | T_S maximum_delta; |
120 | T_S min_max_delta_diff; |
121 | T_S delta_offset; |
122 | bool all_valid; |
123 | bool all_invalid; |
124 | |
125 | bool can_do_delta; |
126 | bool can_do_for; |
127 | |
128 | // Used to force a specific mode, useful in testing |
129 | BitpackingMode mode = BitpackingMode::AUTO; |
130 | |
131 | public: |
132 | void Reset() { |
133 | minimum = NumericLimits<T>::Maximum(); |
134 | minimum_delta = NumericLimits<T_S>::Maximum(); |
135 | maximum = NumericLimits<T>::Minimum(); |
136 | maximum_delta = NumericLimits<T_S>::Minimum(); |
137 | delta_offset = 0; |
138 | all_valid = true; |
139 | all_invalid = true; |
140 | can_do_delta = false; |
141 | can_do_for = false; |
142 | compression_buffer_idx = 0; |
143 | min_max_diff = 0; |
144 | min_max_delta_diff = 0; |
145 | } |
146 | |
147 | void CalculateFORStats() { |
148 | can_do_for = TrySubtractOperator::Operation(maximum, minimum, min_max_diff); |
149 | } |
150 | |
151 | void CalculateDeltaStats() { |
152 | // TODO: currently we dont support delta compression of values above NumericLimits<T_S>::Maximum(), |
153 | // we could support this with some clever substract trickery? |
154 | if (maximum > (T)NumericLimits<T_S>::Maximum()) { |
155 | return; |
156 | } |
157 | |
158 | // Don't delta encoding 1 value makes no sense |
159 | if (compression_buffer_idx < 2) { |
160 | return; |
161 | }; |
162 | |
163 | // TODO: handle NULLS here? |
164 | // Currently we cannot handle nulls because we would need an additional step of patching for this. |
165 | // we could for example copy the last value on a null insert. This would help a bit, but not be optimal for |
166 | // large deltas since theres suddenly a zero then. Ideally we would insert a value that leads to a delta within |
167 | // the current domain of deltas however we dont know that domain here yet |
168 | if (!all_valid) { |
169 | return; |
170 | } |
171 | |
172 | // Note: since we dont allow any values over NumericLimits<T_S>::Maximum(), all subtractions for unsigned types |
173 | // are guaranteed not to overflow |
174 | bool can_do_all = true; |
175 | if (std::is_signed<T>()) { |
176 | T_S bogus; |
177 | can_do_all = TrySubtractOperator::Operation((T_S)(minimum), (T_S)(maximum), bogus) && |
178 | TrySubtractOperator::Operation((T_S)(maximum), (T_S)(minimum), bogus); |
179 | } |
180 | |
181 | // Calculate delta's |
182 | if (can_do_all) { |
183 | for (int64_t i = 0; i < (int64_t)compression_buffer_idx; i++) { |
184 | delta_buffer[i] = (T_S)compression_buffer[i] - (T_S)compression_buffer[i - 1]; |
185 | } |
186 | } else { |
187 | for (int64_t i = 0; i < (int64_t)compression_buffer_idx; i++) { |
188 | auto success = TrySubtractOperator::Operation((T_S)(compression_buffer[i]), |
189 | (T_S)(compression_buffer[i - 1]), delta_buffer[i]); |
190 | if (!success) { |
191 | return; |
192 | } |
193 | } |
194 | } |
195 | |
196 | can_do_delta = true; |
197 | |
198 | for (int64_t i = 1; i < (int64_t)compression_buffer_idx; i++) { |
199 | maximum_delta = MaxValue<T_S>(maximum_delta, delta_buffer[i]); |
200 | minimum_delta = MinValue<T_S>(minimum_delta, delta_buffer[i]); |
201 | } |
202 | |
203 | // Since we can set the first value arbitrarily, we want to pick one from the current domain, note that |
204 | // we will store the original first value - this offset as the delta_offset to be able to decode this again. |
205 | delta_buffer[0] = minimum_delta; |
206 | |
207 | can_do_delta = can_do_delta && TrySubtractOperator::Operation(maximum_delta, minimum_delta, min_max_delta_diff); |
208 | can_do_delta = |
209 | can_do_delta && TrySubtractOperator::Operation((T_S)(compression_buffer[0]), minimum_delta, delta_offset); |
210 | } |
211 | |
212 | template <class T_INNER> |
213 | void SubtractFrameOfReference(T_INNER *buffer, T_INNER frame_of_reference) { |
214 | static_assert(std::is_integral<T_INNER>::value, "Integral type required." ); |
215 | for (idx_t i = 0; i < compression_buffer_idx; i++) { |
216 | buffer[i] -= uint64_t(frame_of_reference); |
217 | } |
218 | } |
219 | |
220 | template <class OP> |
221 | bool Flush() { |
222 | if (compression_buffer_idx == 0) { |
223 | return true; |
224 | } |
225 | |
226 | if ((all_invalid || maximum == minimum) && (mode == BitpackingMode::AUTO || mode == BitpackingMode::CONSTANT)) { |
227 | OP::WriteConstant(maximum, compression_buffer_idx, data_ptr, all_invalid); |
228 | total_size += sizeof(T) + sizeof(bitpacking_metadata_encoded_t); |
229 | return true; |
230 | } |
231 | |
232 | CalculateFORStats(); |
233 | CalculateDeltaStats(); |
234 | |
235 | if (can_do_delta) { |
236 | if (maximum_delta == minimum_delta && mode != BitpackingMode::FOR && mode != BitpackingMode::DELTA_FOR) { |
237 | idx_t frame_of_reference = compression_buffer[0]; |
238 | OP::WriteConstantDelta((T_S)maximum_delta, (T)frame_of_reference, compression_buffer_idx, |
239 | (T *)compression_buffer, (bool *)compression_buffer_validity, data_ptr); |
240 | total_size += sizeof(T) + sizeof(T) + sizeof(bitpacking_metadata_encoded_t); |
241 | return true; |
242 | } |
243 | |
244 | // Check if delta has benefit |
245 | auto delta_required_bitwidth = BitpackingPrimitives::MinimumBitWidth<T_U>(min_max_delta_diff); |
246 | auto regular_required_bitwidth = BitpackingPrimitives::MinimumBitWidth(min_max_diff); |
247 | |
248 | if (delta_required_bitwidth < regular_required_bitwidth && mode != BitpackingMode::FOR) { |
249 | SubtractFrameOfReference(delta_buffer, minimum_delta); |
250 | |
251 | OP::WriteDeltaFor((T *)delta_buffer, compression_buffer_validity, delta_required_bitwidth, |
252 | (T)minimum_delta, delta_offset, (T *)compression_buffer, compression_buffer_idx, |
253 | data_ptr); |
254 | |
255 | total_size += BitpackingPrimitives::GetRequiredSize(count: compression_buffer_idx, width: delta_required_bitwidth); |
256 | total_size += sizeof(T); // FOR value |
257 | total_size += sizeof(T); // Delta offset value |
258 | total_size += AlignValue(n: sizeof(bitpacking_width_t)); // FOR value |
259 | |
260 | return true; |
261 | } |
262 | } |
263 | |
264 | if (can_do_for) { |
265 | auto width = BitpackingPrimitives::MinimumBitWidth<T_U>(min_max_diff); |
266 | SubtractFrameOfReference(compression_buffer, minimum); |
267 | OP::WriteFor(compression_buffer, compression_buffer_validity, width, minimum, compression_buffer_idx, |
268 | data_ptr); |
269 | |
270 | total_size += BitpackingPrimitives::GetRequiredSize(count: compression_buffer_idx, width); |
271 | total_size += sizeof(T); // FOR value |
272 | total_size += AlignValue(n: sizeof(bitpacking_width_t)); |
273 | |
274 | return true; |
275 | } |
276 | |
277 | return false; |
278 | } |
279 | |
280 | template <class OP = EmptyBitpackingWriter> |
281 | bool Update(T value, bool is_valid) { |
282 | compression_buffer_validity[compression_buffer_idx] = is_valid; |
283 | all_valid = all_valid && is_valid; |
284 | all_invalid = all_invalid && !is_valid; |
285 | |
286 | if (is_valid) { |
287 | compression_buffer[compression_buffer_idx] = value; |
288 | minimum = MinValue<T>(minimum, value); |
289 | maximum = MaxValue<T>(maximum, value); |
290 | } |
291 | |
292 | compression_buffer_idx++; |
293 | |
294 | if (compression_buffer_idx == BITPACKING_METADATA_GROUP_SIZE) { |
295 | bool success = Flush<OP>(); |
296 | Reset(); |
297 | return success; |
298 | } |
299 | return true; |
300 | } |
301 | }; |
302 | |
303 | //===--------------------------------------------------------------------===// |
304 | // Analyze |
305 | //===--------------------------------------------------------------------===// |
306 | template <class T> |
307 | struct BitpackingAnalyzeState : public AnalyzeState { |
308 | BitpackingState<T> state; |
309 | }; |
310 | |
311 | template <class T> |
312 | unique_ptr<AnalyzeState> BitpackingInitAnalyze(ColumnData &col_data, PhysicalType type) { |
313 | auto &config = DBConfig::GetConfig(db&: col_data.GetDatabase()); |
314 | |
315 | auto state = make_uniq<BitpackingAnalyzeState<T>>(); |
316 | state->state.mode = config.options.force_bitpacking_mode; |
317 | |
318 | return std::move(state); |
319 | } |
320 | |
321 | template <class T> |
322 | bool BitpackingAnalyze(AnalyzeState &state, Vector &input, idx_t count) { |
323 | auto &analyze_state = (BitpackingAnalyzeState<T> &)state; |
324 | UnifiedVectorFormat vdata; |
325 | input.ToUnifiedFormat(count, data&: vdata); |
326 | |
327 | auto data = UnifiedVectorFormat::GetData<T>(vdata); |
328 | for (idx_t i = 0; i < count; i++) { |
329 | auto idx = vdata.sel->get_index(idx: i); |
330 | if (!analyze_state.state.template Update<EmptyBitpackingWriter>(data[idx], vdata.validity.RowIsValid(row_idx: idx))) { |
331 | return false; |
332 | } |
333 | } |
334 | return true; |
335 | } |
336 | |
337 | template <class T> |
338 | idx_t BitpackingFinalAnalyze(AnalyzeState &state) { |
339 | auto &bitpacking_state = (BitpackingAnalyzeState<T> &)state; |
340 | auto flush_result = bitpacking_state.state.template Flush<EmptyBitpackingWriter>(); |
341 | if (!flush_result) { |
342 | return DConstants::INVALID_INDEX; |
343 | } |
344 | return bitpacking_state.state.total_size; |
345 | } |
346 | |
347 | //===--------------------------------------------------------------------===// |
348 | // Compress |
349 | //===--------------------------------------------------------------------===// |
350 | template <class T, bool WRITE_STATISTICS, class T_S = typename std::make_signed<T>::type> |
351 | struct BitpackingCompressState : public CompressionState { |
352 | public: |
353 | explicit BitpackingCompressState(ColumnDataCheckpointer &checkpointer) |
354 | : checkpointer(checkpointer), |
355 | function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_BITPACKING)) { |
356 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
357 | |
358 | state.data_ptr = (void *)this; |
359 | |
360 | auto &config = DBConfig::GetConfig(db&: checkpointer.GetDatabase()); |
361 | state.mode = config.options.force_bitpacking_mode; |
362 | } |
363 | |
364 | ColumnDataCheckpointer &checkpointer; |
365 | CompressionFunction &function; |
366 | unique_ptr<ColumnSegment> current_segment; |
367 | BufferHandle handle; |
368 | |
369 | // Ptr to next free spot in segment; |
370 | data_ptr_t data_ptr; |
371 | // Ptr to next free spot for storing bitwidths and frame-of-references (growing downwards). |
372 | data_ptr_t metadata_ptr; |
373 | |
374 | BitpackingState<T> state; |
375 | |
376 | public: |
377 | struct BitpackingWriter { |
378 | static void WriteConstant(T constant, idx_t count, void *data_ptr, bool all_invalid) { |
379 | auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr; |
380 | |
381 | ReserveSpace(state, data_bytes: sizeof(T)); |
382 | WriteMetaData(state, mode: BitpackingMode::CONSTANT); |
383 | WriteData(state->data_ptr, constant); |
384 | |
385 | UpdateStats(state, count); |
386 | } |
387 | |
388 | static void WriteConstantDelta(T_S constant, T frame_of_reference, idx_t count, T *values, bool *validity, |
389 | void *data_ptr) { |
390 | auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr; |
391 | |
392 | ReserveSpace(state, data_bytes: 2 * sizeof(T)); |
393 | WriteMetaData(state, mode: BitpackingMode::CONSTANT_DELTA); |
394 | WriteData(state->data_ptr, frame_of_reference); |
395 | WriteData(state->data_ptr, constant); |
396 | |
397 | UpdateStats(state, count); |
398 | } |
399 | |
400 | static void WriteDeltaFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference, |
401 | T_S delta_offset, T *original_values, idx_t count, void *data_ptr) { |
402 | auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr; |
403 | |
404 | auto bp_size = BitpackingPrimitives::GetRequiredSize(count, width); |
405 | ReserveSpace(state, data_bytes: bp_size + 3 * sizeof(T)); |
406 | |
407 | WriteMetaData(state, mode: BitpackingMode::DELTA_FOR); |
408 | WriteData(state->data_ptr, frame_of_reference); |
409 | WriteData(state->data_ptr, (T)width); |
410 | WriteData(state->data_ptr, delta_offset); |
411 | |
412 | BitpackingPrimitives::PackBuffer<T, false>(state->data_ptr, values, count, width); |
413 | state->data_ptr += bp_size; |
414 | |
415 | UpdateStats(state, count); |
416 | } |
417 | |
418 | static void WriteFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference, idx_t count, |
419 | void *data_ptr) { |
420 | auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr; |
421 | |
422 | auto bp_size = BitpackingPrimitives::GetRequiredSize(count, width); |
423 | ReserveSpace(state, data_bytes: bp_size + 2 * sizeof(T)); |
424 | |
425 | WriteMetaData(state, mode: BitpackingMode::FOR); |
426 | WriteData(state->data_ptr, frame_of_reference); |
427 | WriteData(state->data_ptr, (T)width); |
428 | |
429 | BitpackingPrimitives::PackBuffer<T, false>(state->data_ptr, values, count, width); |
430 | state->data_ptr += bp_size; |
431 | |
432 | UpdateStats(state, count); |
433 | } |
434 | |
435 | template <class T_OUT> |
436 | static void WriteData(data_ptr_t &ptr, T_OUT val) { |
437 | *((T_OUT *)ptr) = val; |
438 | ptr += sizeof(T_OUT); |
439 | } |
440 | |
441 | static void WriteMetaData(BitpackingCompressState<T, WRITE_STATISTICS> *state, BitpackingMode mode) { |
442 | bitpacking_metadata_t metadata {.mode: mode, .offset: (uint32_t)(state->data_ptr - state->handle.Ptr())}; |
443 | state->metadata_ptr -= sizeof(bitpacking_metadata_encoded_t); |
444 | Store<bitpacking_metadata_encoded_t>(EncodeMeta(metadata), state->metadata_ptr); |
445 | } |
446 | |
447 | static void ReserveSpace(BitpackingCompressState<T, WRITE_STATISTICS> *state, idx_t data_bytes) { |
448 | idx_t meta_bytes = sizeof(bitpacking_metadata_encoded_t); |
449 | state->FlushAndCreateSegmentIfFull(data_bytes, meta_bytes); |
450 | D_ASSERT(state->CanStore(data_bytes, meta_bytes)); |
451 | } |
452 | |
453 | static void UpdateStats(BitpackingCompressState<T, WRITE_STATISTICS> *state, idx_t count) { |
454 | state->current_segment->count += count; |
455 | |
456 | if (WRITE_STATISTICS && !state->state.all_invalid) { |
457 | NumericStats::Update<T>(state->current_segment->stats.statistics, state->state.minimum); |
458 | NumericStats::Update<T>(state->current_segment->stats.statistics, state->state.maximum); |
459 | } |
460 | } |
461 | }; |
462 | |
463 | bool CanStore(idx_t data_bytes, idx_t meta_bytes) { |
464 | auto required_data_bytes = AlignValue<idx_t>(n: (data_ptr + data_bytes) - data_ptr); |
465 | auto required_meta_bytes = Storage::BLOCK_SIZE - (metadata_ptr - data_ptr) + meta_bytes; |
466 | |
467 | return required_data_bytes + required_meta_bytes <= |
468 | Storage::BLOCK_SIZE - BitpackingPrimitives::BITPACKING_HEADER_SIZE; |
469 | } |
470 | |
471 | void CreateEmptySegment(idx_t row_start) { |
472 | auto &db = checkpointer.GetDatabase(); |
473 | auto &type = checkpointer.GetType(); |
474 | auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
475 | compressed_segment->function = function; |
476 | current_segment = std::move(compressed_segment); |
477 | auto &buffer_manager = BufferManager::GetBufferManager(db); |
478 | handle = buffer_manager.Pin(handle&: current_segment->block); |
479 | |
480 | data_ptr = handle.Ptr() + BitpackingPrimitives::BITPACKING_HEADER_SIZE; |
481 | metadata_ptr = handle.Ptr() + Storage::BLOCK_SIZE; |
482 | } |
483 | |
484 | void Append(UnifiedVectorFormat &vdata, idx_t count) { |
485 | auto data = UnifiedVectorFormat::GetData<T>(vdata); |
486 | |
487 | for (idx_t i = 0; i < count; i++) { |
488 | auto idx = vdata.sel->get_index(idx: i); |
489 | state.template Update<BitpackingCompressState<T, WRITE_STATISTICS, T_S>::BitpackingWriter>( |
490 | data[idx], vdata.validity.RowIsValid(row_idx: idx)); |
491 | } |
492 | } |
493 | |
494 | void FlushAndCreateSegmentIfFull(idx_t required_data_bytes, idx_t required_meta_bytes) { |
495 | if (!CanStore(data_bytes: required_data_bytes, meta_bytes: required_meta_bytes)) { |
496 | auto row_start = current_segment->start + current_segment->count; |
497 | FlushSegment(); |
498 | CreateEmptySegment(row_start); |
499 | } |
500 | } |
501 | |
502 | void FlushSegment() { |
503 | auto &state = checkpointer.GetCheckpointState(); |
504 | auto base_ptr = handle.Ptr(); |
505 | |
506 | // Compact the segment by moving the metadata next to the data. |
507 | idx_t metadata_offset = AlignValue(n: data_ptr - base_ptr); |
508 | idx_t metadata_size = base_ptr + Storage::BLOCK_SIZE - metadata_ptr; |
509 | idx_t total_segment_size = metadata_offset + metadata_size; |
510 | |
511 | // Asserting things are still sane here |
512 | if (!CanStore(data_bytes: 0, meta_bytes: 0)) { |
513 | throw InternalException("Error in bitpacking size calculation" ); |
514 | } |
515 | |
516 | memmove(dest: base_ptr + metadata_offset, src: metadata_ptr, n: metadata_size); |
517 | |
518 | // Store the offset of the metadata of the first group (which is at the highest address). |
519 | Store<idx_t>(val: metadata_offset + metadata_size, ptr: base_ptr); |
520 | handle.Destroy(); |
521 | |
522 | state.FlushSegment(segment: std::move(current_segment), segment_size: total_segment_size); |
523 | } |
524 | |
525 | void Finalize() { |
526 | state.template Flush<BitpackingCompressState<T, WRITE_STATISTICS, T_S>::BitpackingWriter>(); |
527 | FlushSegment(); |
528 | current_segment.reset(); |
529 | } |
530 | }; |
531 | |
532 | template <class T, bool WRITE_STATISTICS> |
533 | unique_ptr<CompressionState> BitpackingInitCompression(ColumnDataCheckpointer &checkpointer, |
534 | unique_ptr<AnalyzeState> state) { |
535 | return make_uniq<BitpackingCompressState<T, WRITE_STATISTICS>>(checkpointer); |
536 | } |
537 | |
538 | template <class T, bool WRITE_STATISTICS> |
539 | void BitpackingCompress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
540 | auto &state = (BitpackingCompressState<T, WRITE_STATISTICS> &)state_p; |
541 | UnifiedVectorFormat vdata; |
542 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
543 | state.Append(vdata, count); |
544 | } |
545 | |
546 | template <class T, bool WRITE_STATISTICS> |
547 | void BitpackingFinalizeCompress(CompressionState &state_p) { |
548 | auto &state = (BitpackingCompressState<T, WRITE_STATISTICS> &)state_p; |
549 | state.Finalize(); |
550 | } |
551 | |
552 | //===--------------------------------------------------------------------===// |
553 | // Scan |
554 | //===--------------------------------------------------------------------===// |
555 | template <class T> |
556 | static void ApplyFrameOfReference(T *dst, T frame_of_reference, idx_t size) { |
557 | if (!frame_of_reference) { |
558 | return; |
559 | } |
560 | for (idx_t i = 0; i < size; i++) { |
561 | dst[i] += frame_of_reference; |
562 | } |
563 | } |
564 | |
565 | // Based on https://github.com/lemire/FastPFor (Apache License 2.0) |
566 | template <class T> |
567 | static T DeltaDecode(T *data, T previous_value, const size_t size) { |
568 | D_ASSERT(size >= 1); |
569 | |
570 | data[0] += previous_value; |
571 | |
572 | const size_t UnrollQty = 4; |
573 | const size_t sz0 = (size / UnrollQty) * UnrollQty; // equal to 0, if size < UnrollQty |
574 | size_t i = 1; |
575 | if (sz0 >= UnrollQty) { |
576 | T a = data[0]; |
577 | for (; i < sz0 - UnrollQty; i += UnrollQty) { |
578 | a = data[i] += a; |
579 | a = data[i + 1] += a; |
580 | a = data[i + 2] += a; |
581 | a = data[i + 3] += a; |
582 | } |
583 | } |
584 | for (; i != size; ++i) { |
585 | data[i] += data[i - 1]; |
586 | } |
587 | |
588 | return data[size - 1]; |
589 | } |
590 | |
591 | template <class T, class T_S = typename std::make_signed<T>::type> |
592 | struct BitpackingScanState : public SegmentScanState { |
593 | public: |
594 | explicit BitpackingScanState(ColumnSegment &segment) : current_segment(segment) { |
595 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
596 | handle = buffer_manager.Pin(handle&: segment.block); |
597 | auto dataptr = handle.Ptr(); |
598 | |
599 | // load offset to bitpacking widths pointer |
600 | auto bitpacking_metadata_offset = Load<idx_t>(ptr: dataptr + segment.GetBlockOffset()); |
601 | bitpacking_metadata_ptr = |
602 | dataptr + segment.GetBlockOffset() + bitpacking_metadata_offset - sizeof(bitpacking_metadata_encoded_t); |
603 | |
604 | // load the first group |
605 | LoadNextGroup(); |
606 | } |
607 | |
608 | BufferHandle handle; |
609 | ColumnSegment ¤t_segment; |
610 | |
611 | T decompression_buffer[BITPACKING_METADATA_GROUP_SIZE]; |
612 | |
613 | bitpacking_metadata_t current_group; |
614 | |
615 | bitpacking_width_t current_width; |
616 | T current_frame_of_reference; |
617 | T current_constant; |
618 | T current_delta_offset; |
619 | |
620 | idx_t current_group_offset = 0; |
621 | data_ptr_t current_group_ptr; |
622 | data_ptr_t bitpacking_metadata_ptr; |
623 | |
624 | public: |
625 | //! Loads the metadata for the current metadata group. This will set bitpacking_metadata_ptr to the next group. |
626 | //! this will also load any metadata that is at the start of a compressed buffer (e.g. the width, for, or constant |
627 | //! value) depending on the bitpacking mode for that group |
628 | void LoadNextGroup() { |
629 | D_ASSERT(bitpacking_metadata_ptr > handle.Ptr() && |
630 | bitpacking_metadata_ptr < handle.Ptr() + Storage::BLOCK_SIZE); |
631 | current_group_offset = 0; |
632 | current_group = DecodeMeta(metadata_encoded: (bitpacking_metadata_encoded_t *)bitpacking_metadata_ptr); |
633 | |
634 | bitpacking_metadata_ptr -= sizeof(bitpacking_metadata_encoded_t); |
635 | current_group_ptr = GetPtr(group: current_group); |
636 | |
637 | // Read first value |
638 | switch (current_group.mode) { |
639 | case BitpackingMode::CONSTANT: |
640 | current_constant = *(T *)(current_group_ptr); |
641 | current_group_ptr += sizeof(T); |
642 | break; |
643 | case BitpackingMode::FOR: |
644 | case BitpackingMode::CONSTANT_DELTA: |
645 | case BitpackingMode::DELTA_FOR: |
646 | current_frame_of_reference = *(T *)(current_group_ptr); |
647 | current_group_ptr += sizeof(T); |
648 | break; |
649 | default: |
650 | throw InternalException("Invalid bitpacking mode" ); |
651 | } |
652 | |
653 | // Read second value |
654 | switch (current_group.mode) { |
655 | case BitpackingMode::CONSTANT_DELTA: |
656 | current_constant = *(T *)(current_group_ptr); |
657 | current_group_ptr += sizeof(T); |
658 | break; |
659 | case BitpackingMode::FOR: |
660 | case BitpackingMode::DELTA_FOR: |
661 | current_width = (bitpacking_width_t) * (T *)(current_group_ptr); |
662 | current_group_ptr += MaxValue(a: sizeof(T), b: sizeof(bitpacking_width_t)); |
663 | break; |
664 | case BitpackingMode::CONSTANT: |
665 | break; |
666 | default: |
667 | throw InternalException("Invalid bitpacking mode" ); |
668 | } |
669 | |
670 | // Read third value |
671 | if (current_group.mode == BitpackingMode::DELTA_FOR) { |
672 | current_delta_offset = *(T *)(current_group_ptr); |
673 | current_group_ptr += sizeof(T); |
674 | } |
675 | } |
676 | |
677 | void Skip(ColumnSegment &segment, idx_t skip_count) { |
678 | while (skip_count > 0) { |
679 | if (current_group_offset + skip_count < BITPACKING_METADATA_GROUP_SIZE) { |
680 | // Skipping Delta FOR requires a bit of decoding to figure out the new delta |
681 | if (current_group.mode == BitpackingMode::DELTA_FOR) { |
682 | // if current_group_offset points into the middle of a |
683 | // BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE, we need to scan a few |
684 | // values before current_group_offset to align with the algorithm groups |
685 | idx_t = current_group_offset % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
686 | |
687 | // Calculate total offset and count to bitunpack |
688 | idx_t base_decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: skip_count); |
689 | idx_t decompress_count = base_decompress_count + extra_count; |
690 | idx_t decompress_offset = current_group_offset - extra_count; |
691 | bool skip_sign_extension = true; |
692 | |
693 | BitpackingPrimitives::UnPackBuffer<T>(data_ptr_cast(decompression_buffer), |
694 | current_group_ptr + decompress_offset, decompress_count, |
695 | current_width, skip_sign_extension); |
696 | |
697 | ApplyFrameOfReference<T_S>((T_S *)&decompression_buffer[extra_count], current_frame_of_reference, |
698 | skip_count); |
699 | DeltaDecode<T_S>((T_S *)&decompression_buffer[extra_count], (T_S)current_delta_offset, |
700 | (idx_t)skip_count); |
701 | current_delta_offset = decompression_buffer[extra_count + skip_count - 1]; |
702 | |
703 | current_group_offset += skip_count; |
704 | } else { |
705 | current_group_offset += skip_count; |
706 | } |
707 | break; |
708 | } else { |
709 | auto left_in_this_group = BITPACKING_METADATA_GROUP_SIZE - current_group_offset; |
710 | auto number_of_groups_to_skip = (skip_count - left_in_this_group) / BITPACKING_METADATA_GROUP_SIZE; |
711 | |
712 | current_group_offset = 0; |
713 | bitpacking_metadata_ptr -= number_of_groups_to_skip * sizeof(bitpacking_metadata_encoded_t); |
714 | |
715 | LoadNextGroup(); |
716 | |
717 | skip_count -= left_in_this_group; |
718 | skip_count -= number_of_groups_to_skip * BITPACKING_METADATA_GROUP_SIZE; |
719 | } |
720 | } |
721 | } |
722 | |
723 | data_ptr_t GetPtr(bitpacking_metadata_t group) { |
724 | return handle.Ptr() + current_segment.GetBlockOffset() + group.offset; |
725 | } |
726 | }; |
727 | |
728 | template <class T> |
729 | unique_ptr<SegmentScanState> BitpackingInitScan(ColumnSegment &segment) { |
730 | auto result = make_uniq<BitpackingScanState<T>>(segment); |
731 | return std::move(result); |
732 | } |
733 | |
734 | //===--------------------------------------------------------------------===// |
735 | // Scan base data |
736 | //===--------------------------------------------------------------------===// |
737 | template <class T, class T_S = typename std::make_signed<T>::type> |
738 | void BitpackingScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
739 | idx_t result_offset) { |
740 | auto &scan_state = (BitpackingScanState<T> &)*state.scan_state; |
741 | |
742 | T *result_data = FlatVector::GetData<T>(result); |
743 | result.SetVectorType(VectorType::FLAT_VECTOR); |
744 | |
745 | //! Because FOR offsets all our values to be 0 or above, we can always skip sign extension here |
746 | bool skip_sign_extend = true; |
747 | |
748 | idx_t scanned = 0; |
749 | |
750 | while (scanned < scan_count) { |
751 | // Exhausted this metadata group, move pointers to next group and load metadata for next group. |
752 | if (scan_state.current_group_offset >= BITPACKING_METADATA_GROUP_SIZE) { |
753 | scan_state.LoadNextGroup(); |
754 | } |
755 | |
756 | idx_t offset_in_compression_group = |
757 | scan_state.current_group_offset % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
758 | |
759 | if (scan_state.current_group.mode == BitpackingMode::CONSTANT) { |
760 | idx_t remaining = scan_count - scanned; |
761 | idx_t to_scan = MinValue(remaining, BITPACKING_METADATA_GROUP_SIZE - scan_state.current_group_offset); |
762 | T *begin = result_data + result_offset + scanned; |
763 | T *end = begin + remaining; |
764 | std::fill(begin, end, scan_state.current_constant); |
765 | scanned += to_scan; |
766 | scan_state.current_group_offset += to_scan; |
767 | continue; |
768 | } |
769 | if (scan_state.current_group.mode == BitpackingMode::CONSTANT_DELTA) { |
770 | idx_t remaining = scan_count - scanned; |
771 | idx_t to_scan = MinValue(remaining, BITPACKING_METADATA_GROUP_SIZE - scan_state.current_group_offset); |
772 | T *target_ptr = result_data + result_offset + scanned; |
773 | |
774 | for (idx_t i = 0; i < to_scan; i++) { |
775 | target_ptr[i] = ((scan_state.current_group_offset + i) * scan_state.current_constant) + |
776 | scan_state.current_frame_of_reference; |
777 | } |
778 | |
779 | scanned += to_scan; |
780 | scan_state.current_group_offset += to_scan; |
781 | continue; |
782 | } |
783 | D_ASSERT(scan_state.current_group.mode == BitpackingMode::FOR || |
784 | scan_state.current_group.mode == BitpackingMode::DELTA_FOR); |
785 | |
786 | idx_t to_scan = MinValue<idx_t>(a: scan_count - scanned, b: BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE - |
787 | offset_in_compression_group); |
788 | // Calculate start of compression algorithm group |
789 | data_ptr_t current_position_ptr = |
790 | scan_state.current_group_ptr + scan_state.current_group_offset * scan_state.current_width / 8; |
791 | data_ptr_t decompression_group_start_pointer = |
792 | current_position_ptr - offset_in_compression_group * scan_state.current_width / 8; |
793 | |
794 | T *current_result_ptr = result_data + result_offset + scanned; |
795 | |
796 | if (to_scan == BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE && offset_in_compression_group == 0) { |
797 | // Decompress directly into result vector |
798 | BitpackingPrimitives::UnPackBlock<T>(data_ptr_cast(current_result_ptr), decompression_group_start_pointer, |
799 | scan_state.current_width, skip_sign_extend); |
800 | } else { |
801 | // Decompress compression algorithm to buffer |
802 | BitpackingPrimitives::UnPackBlock<T>(data_ptr_cast(scan_state.decompression_buffer), |
803 | decompression_group_start_pointer, scan_state.current_width, |
804 | skip_sign_extend); |
805 | |
806 | memcpy(current_result_ptr, scan_state.decompression_buffer + offset_in_compression_group, |
807 | to_scan * sizeof(T)); |
808 | } |
809 | |
810 | if (scan_state.current_group.mode == BitpackingMode::DELTA_FOR) { |
811 | ApplyFrameOfReference<T_S>((T_S *)current_result_ptr, (T_S)scan_state.current_frame_of_reference, to_scan); |
812 | DeltaDecode<T_S>((T_S *)current_result_ptr, (T_S)scan_state.current_delta_offset, to_scan); |
813 | scan_state.current_delta_offset = ((T *)current_result_ptr)[to_scan - 1]; |
814 | } else { |
815 | ApplyFrameOfReference<T>(current_result_ptr, scan_state.current_frame_of_reference, to_scan); |
816 | } |
817 | |
818 | scanned += to_scan; |
819 | scan_state.current_group_offset += to_scan; |
820 | } |
821 | } |
822 | |
823 | template <class T> |
824 | void BitpackingScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { |
825 | BitpackingScanPartial<T>(segment, state, scan_count, result, 0); |
826 | } |
827 | |
828 | //===--------------------------------------------------------------------===// |
829 | // Fetch |
830 | //===--------------------------------------------------------------------===// |
831 | template <class T> |
832 | void BitpackingFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, |
833 | idx_t result_idx) { |
834 | BitpackingScanState<T> scan_state(segment); |
835 | scan_state.Skip(segment, row_id); |
836 | auto result_data = FlatVector::GetData<T>(result); |
837 | T *current_result_ptr = result_data + result_idx; |
838 | |
839 | idx_t offset_in_compression_group = |
840 | scan_state.current_group_offset % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
841 | |
842 | data_ptr_t decompression_group_start_pointer = |
843 | scan_state.current_group_ptr + |
844 | (scan_state.current_group_offset - offset_in_compression_group) * scan_state.current_width / 8; |
845 | |
846 | //! Because FOR offsets all our values to be 0 or above, we can always skip sign extension here |
847 | bool skip_sign_extend = true; |
848 | |
849 | if (scan_state.current_group.mode == BitpackingMode::CONSTANT) { |
850 | *current_result_ptr = scan_state.current_constant; |
851 | return; |
852 | } |
853 | |
854 | if (scan_state.current_group.mode == BitpackingMode::CONSTANT_DELTA) { |
855 | *current_result_ptr = |
856 | ((scan_state.current_group_offset) * scan_state.current_constant) + scan_state.current_frame_of_reference; |
857 | return; |
858 | } |
859 | |
860 | D_ASSERT(scan_state.current_group.mode == BitpackingMode::FOR || |
861 | scan_state.current_group.mode == BitpackingMode::DELTA_FOR); |
862 | |
863 | BitpackingPrimitives::UnPackBlock<T>(data_ptr_cast(scan_state.decompression_buffer), |
864 | decompression_group_start_pointer, scan_state.current_width, skip_sign_extend); |
865 | |
866 | *current_result_ptr = *(T *)(scan_state.decompression_buffer + offset_in_compression_group); |
867 | *current_result_ptr += scan_state.current_frame_of_reference; |
868 | |
869 | if (scan_state.current_group.mode == BitpackingMode::DELTA_FOR) { |
870 | *current_result_ptr += scan_state.current_delta_offset; |
871 | } |
872 | } |
873 | template <class T> |
874 | void BitpackingSkip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) { |
875 | auto &scan_state = (BitpackingScanState<T> &)*state.scan_state; |
876 | scan_state.Skip(segment, skip_count); |
877 | } |
878 | |
879 | //===--------------------------------------------------------------------===// |
880 | // Get Function |
881 | //===--------------------------------------------------------------------===// |
882 | template <class T, bool WRITE_STATISTICS = true> |
883 | CompressionFunction GetBitpackingFunction(PhysicalType data_type) { |
884 | return CompressionFunction(CompressionType::COMPRESSION_BITPACKING, data_type, BitpackingInitAnalyze<T>, |
885 | BitpackingAnalyze<T>, BitpackingFinalAnalyze<T>, |
886 | BitpackingInitCompression<T, WRITE_STATISTICS>, BitpackingCompress<T, WRITE_STATISTICS>, |
887 | BitpackingFinalizeCompress<T, WRITE_STATISTICS>, BitpackingInitScan<T>, |
888 | BitpackingScan<T>, BitpackingScanPartial<T>, BitpackingFetchRow<T>, BitpackingSkip<T>); |
889 | } |
890 | |
891 | CompressionFunction BitpackingFun::GetFunction(PhysicalType type) { |
892 | switch (type) { |
893 | case PhysicalType::BOOL: |
894 | case PhysicalType::INT8: |
895 | return GetBitpackingFunction<int8_t>(data_type: type); |
896 | case PhysicalType::INT16: |
897 | return GetBitpackingFunction<int16_t>(data_type: type); |
898 | case PhysicalType::INT32: |
899 | return GetBitpackingFunction<int32_t>(data_type: type); |
900 | case PhysicalType::INT64: |
901 | return GetBitpackingFunction<int64_t>(data_type: type); |
902 | case PhysicalType::UINT8: |
903 | return GetBitpackingFunction<uint8_t>(data_type: type); |
904 | case PhysicalType::UINT16: |
905 | return GetBitpackingFunction<uint16_t>(data_type: type); |
906 | case PhysicalType::UINT32: |
907 | return GetBitpackingFunction<uint32_t>(data_type: type); |
908 | case PhysicalType::UINT64: |
909 | return GetBitpackingFunction<uint64_t>(data_type: type); |
910 | case PhysicalType::LIST: |
911 | return GetBitpackingFunction<uint64_t, false>(data_type: type); |
912 | default: |
913 | throw InternalException("Unsupported type for Bitpacking" ); |
914 | } |
915 | } |
916 | |
917 | bool BitpackingFun::TypeIsSupported(PhysicalType type) { |
918 | switch (type) { |
919 | case PhysicalType::BOOL: |
920 | case PhysicalType::INT8: |
921 | case PhysicalType::INT16: |
922 | case PhysicalType::INT32: |
923 | case PhysicalType::INT64: |
924 | case PhysicalType::UINT8: |
925 | case PhysicalType::UINT16: |
926 | case PhysicalType::UINT32: |
927 | case PhysicalType::UINT64: |
928 | case PhysicalType::LIST: |
929 | return true; |
930 | default: |
931 | return false; |
932 | } |
933 | } |
934 | |
935 | } // namespace duckdb |
936 | |