1#include "duckdb/common/bitpacking.hpp"
2
3#include "duckdb/common/limits.hpp"
4#include "duckdb/function/compression/compression.hpp"
5#include "duckdb/function/compression_function.hpp"
6#include "duckdb/main/config.hpp"
7#include "duckdb/storage/buffer_manager.hpp"
8
9#include "duckdb/storage/table/column_data_checkpointer.hpp"
10#include "duckdb/storage/table/column_segment.hpp"
11#include "duckdb/common/operator/subtract.hpp"
12#include "duckdb/storage/compression/bitpacking.hpp"
13#include "duckdb/storage/table/scan_state.hpp"
14
15#include <functional>
16
17namespace duckdb {
18
19static constexpr const idx_t BITPACKING_METADATA_GROUP_SIZE = STANDARD_VECTOR_SIZE > 512 ? STANDARD_VECTOR_SIZE : 2048;
20
21BitpackingMode BitpackingModeFromString(const string &str) {
22 auto mode = StringUtil::Lower(str);
23
24 if (mode == "auto") {
25 return BitpackingMode::AUTO;
26 } else if (mode == "constant") {
27 return BitpackingMode::CONSTANT;
28 } else if (mode == "constant_delta") {
29 return BitpackingMode::CONSTANT_DELTA;
30 } else if (mode == "delta_for") {
31 return BitpackingMode::DELTA_FOR;
32 } else if (mode == "for") {
33 return BitpackingMode::FOR;
34 } else {
35 return BitpackingMode::AUTO;
36 }
37}
38
39string BitpackingModeToString(const BitpackingMode &mode) {
40 switch (mode) {
41 case (BitpackingMode::AUTO):
42 return "auto";
43 case (BitpackingMode::CONSTANT):
44 return "constant";
45 case (BitpackingMode::CONSTANT_DELTA):
46 return "constant_delta";
47 case (BitpackingMode::DELTA_FOR):
48 return "delta_for";
49 case (BitpackingMode::FOR):
50 return "for";
51 default:
52 throw NotImplementedException("Unknown bitpacking mode: " + to_string(val: (uint8_t)mode) + "\n");
53 }
54}
55
56typedef struct {
57 BitpackingMode mode;
58 uint32_t offset;
59} bitpacking_metadata_t;
60
61typedef uint32_t bitpacking_metadata_encoded_t;
62
63static bitpacking_metadata_encoded_t EncodeMeta(bitpacking_metadata_t metadata) {
64 D_ASSERT(metadata.offset <= 16777215); // max uint24_t
65 bitpacking_metadata_encoded_t encoded_value = metadata.offset;
66 encoded_value |= (uint8_t)metadata.mode << 24;
67 return encoded_value;
68}
69static bitpacking_metadata_t DecodeMeta(bitpacking_metadata_encoded_t *metadata_encoded) {
70 bitpacking_metadata_t metadata;
71 metadata.mode = Load<BitpackingMode>(ptr: data_ptr_cast(src: metadata_encoded) + 3);
72 metadata.offset = *metadata_encoded & 0x00FFFFFF;
73 return metadata;
74}
75
76struct EmptyBitpackingWriter {
77 template <class T>
78 static void WriteConstant(T constant, idx_t count, void *data_ptr, bool all_invalid) {
79 }
80 template <class T, class T_S = typename std::make_signed<T>::type>
81 static void WriteConstantDelta(T_S constant, T frame_of_reference, idx_t count, T *values, bool *validity,
82 void *data_ptr) {
83 }
84 template <class T, class T_S = typename std::make_signed<T>::type>
85 static void WriteDeltaFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference,
86 T_S delta_offset, T *original_values, idx_t count, void *data_ptr) {
87 }
88 template <class T>
89 static void WriteFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference, idx_t count,
90 void *data_ptr) {
91 }
92};
93
94template <class T, class T_U = typename std::make_unsigned<T>::type, class T_S = typename std::make_signed<T>::type>
95struct BitpackingState {
96public:
97 BitpackingState() : compression_buffer_idx(0), total_size(0), data_ptr(nullptr) {
98 compression_buffer_internal[0] = (T)0;
99 compression_buffer = &compression_buffer_internal[1];
100 Reset();
101 }
102
103 // Extra val for delta encoding
104 T compression_buffer_internal[BITPACKING_METADATA_GROUP_SIZE + 1];
105 T *compression_buffer;
106 T_S delta_buffer[BITPACKING_METADATA_GROUP_SIZE];
107 bool compression_buffer_validity[BITPACKING_METADATA_GROUP_SIZE];
108 idx_t compression_buffer_idx;
109 idx_t total_size;
110
111 // Used to pass CompressionState ptr through the Bitpacking writer
112 void *data_ptr;
113
114 // Stats on current compression buffer
115 T minimum;
116 T maximum;
117 T min_max_diff;
118 T_S minimum_delta;
119 T_S maximum_delta;
120 T_S min_max_delta_diff;
121 T_S delta_offset;
122 bool all_valid;
123 bool all_invalid;
124
125 bool can_do_delta;
126 bool can_do_for;
127
128 // Used to force a specific mode, useful in testing
129 BitpackingMode mode = BitpackingMode::AUTO;
130
131public:
132 void Reset() {
133 minimum = NumericLimits<T>::Maximum();
134 minimum_delta = NumericLimits<T_S>::Maximum();
135 maximum = NumericLimits<T>::Minimum();
136 maximum_delta = NumericLimits<T_S>::Minimum();
137 delta_offset = 0;
138 all_valid = true;
139 all_invalid = true;
140 can_do_delta = false;
141 can_do_for = false;
142 compression_buffer_idx = 0;
143 min_max_diff = 0;
144 min_max_delta_diff = 0;
145 }
146
147 void CalculateFORStats() {
148 can_do_for = TrySubtractOperator::Operation(maximum, minimum, min_max_diff);
149 }
150
151 void CalculateDeltaStats() {
152 // TODO: currently we dont support delta compression of values above NumericLimits<T_S>::Maximum(),
153 // we could support this with some clever substract trickery?
154 if (maximum > (T)NumericLimits<T_S>::Maximum()) {
155 return;
156 }
157
158 // Don't delta encoding 1 value makes no sense
159 if (compression_buffer_idx < 2) {
160 return;
161 };
162
163 // TODO: handle NULLS here?
164 // Currently we cannot handle nulls because we would need an additional step of patching for this.
165 // we could for example copy the last value on a null insert. This would help a bit, but not be optimal for
166 // large deltas since theres suddenly a zero then. Ideally we would insert a value that leads to a delta within
167 // the current domain of deltas however we dont know that domain here yet
168 if (!all_valid) {
169 return;
170 }
171
172 // Note: since we dont allow any values over NumericLimits<T_S>::Maximum(), all subtractions for unsigned types
173 // are guaranteed not to overflow
174 bool can_do_all = true;
175 if (std::is_signed<T>()) {
176 T_S bogus;
177 can_do_all = TrySubtractOperator::Operation((T_S)(minimum), (T_S)(maximum), bogus) &&
178 TrySubtractOperator::Operation((T_S)(maximum), (T_S)(minimum), bogus);
179 }
180
181 // Calculate delta's
182 if (can_do_all) {
183 for (int64_t i = 0; i < (int64_t)compression_buffer_idx; i++) {
184 delta_buffer[i] = (T_S)compression_buffer[i] - (T_S)compression_buffer[i - 1];
185 }
186 } else {
187 for (int64_t i = 0; i < (int64_t)compression_buffer_idx; i++) {
188 auto success = TrySubtractOperator::Operation((T_S)(compression_buffer[i]),
189 (T_S)(compression_buffer[i - 1]), delta_buffer[i]);
190 if (!success) {
191 return;
192 }
193 }
194 }
195
196 can_do_delta = true;
197
198 for (int64_t i = 1; i < (int64_t)compression_buffer_idx; i++) {
199 maximum_delta = MaxValue<T_S>(maximum_delta, delta_buffer[i]);
200 minimum_delta = MinValue<T_S>(minimum_delta, delta_buffer[i]);
201 }
202
203 // Since we can set the first value arbitrarily, we want to pick one from the current domain, note that
204 // we will store the original first value - this offset as the delta_offset to be able to decode this again.
205 delta_buffer[0] = minimum_delta;
206
207 can_do_delta = can_do_delta && TrySubtractOperator::Operation(maximum_delta, minimum_delta, min_max_delta_diff);
208 can_do_delta =
209 can_do_delta && TrySubtractOperator::Operation((T_S)(compression_buffer[0]), minimum_delta, delta_offset);
210 }
211
212 template <class T_INNER>
213 void SubtractFrameOfReference(T_INNER *buffer, T_INNER frame_of_reference) {
214 static_assert(std::is_integral<T_INNER>::value, "Integral type required.");
215 for (idx_t i = 0; i < compression_buffer_idx; i++) {
216 buffer[i] -= uint64_t(frame_of_reference);
217 }
218 }
219
220 template <class OP>
221 bool Flush() {
222 if (compression_buffer_idx == 0) {
223 return true;
224 }
225
226 if ((all_invalid || maximum == minimum) && (mode == BitpackingMode::AUTO || mode == BitpackingMode::CONSTANT)) {
227 OP::WriteConstant(maximum, compression_buffer_idx, data_ptr, all_invalid);
228 total_size += sizeof(T) + sizeof(bitpacking_metadata_encoded_t);
229 return true;
230 }
231
232 CalculateFORStats();
233 CalculateDeltaStats();
234
235 if (can_do_delta) {
236 if (maximum_delta == minimum_delta && mode != BitpackingMode::FOR && mode != BitpackingMode::DELTA_FOR) {
237 idx_t frame_of_reference = compression_buffer[0];
238 OP::WriteConstantDelta((T_S)maximum_delta, (T)frame_of_reference, compression_buffer_idx,
239 (T *)compression_buffer, (bool *)compression_buffer_validity, data_ptr);
240 total_size += sizeof(T) + sizeof(T) + sizeof(bitpacking_metadata_encoded_t);
241 return true;
242 }
243
244 // Check if delta has benefit
245 auto delta_required_bitwidth = BitpackingPrimitives::MinimumBitWidth<T_U>(min_max_delta_diff);
246 auto regular_required_bitwidth = BitpackingPrimitives::MinimumBitWidth(min_max_diff);
247
248 if (delta_required_bitwidth < regular_required_bitwidth && mode != BitpackingMode::FOR) {
249 SubtractFrameOfReference(delta_buffer, minimum_delta);
250
251 OP::WriteDeltaFor((T *)delta_buffer, compression_buffer_validity, delta_required_bitwidth,
252 (T)minimum_delta, delta_offset, (T *)compression_buffer, compression_buffer_idx,
253 data_ptr);
254
255 total_size += BitpackingPrimitives::GetRequiredSize(count: compression_buffer_idx, width: delta_required_bitwidth);
256 total_size += sizeof(T); // FOR value
257 total_size += sizeof(T); // Delta offset value
258 total_size += AlignValue(n: sizeof(bitpacking_width_t)); // FOR value
259
260 return true;
261 }
262 }
263
264 if (can_do_for) {
265 auto width = BitpackingPrimitives::MinimumBitWidth<T_U>(min_max_diff);
266 SubtractFrameOfReference(compression_buffer, minimum);
267 OP::WriteFor(compression_buffer, compression_buffer_validity, width, minimum, compression_buffer_idx,
268 data_ptr);
269
270 total_size += BitpackingPrimitives::GetRequiredSize(count: compression_buffer_idx, width);
271 total_size += sizeof(T); // FOR value
272 total_size += AlignValue(n: sizeof(bitpacking_width_t));
273
274 return true;
275 }
276
277 return false;
278 }
279
280 template <class OP = EmptyBitpackingWriter>
281 bool Update(T value, bool is_valid) {
282 compression_buffer_validity[compression_buffer_idx] = is_valid;
283 all_valid = all_valid && is_valid;
284 all_invalid = all_invalid && !is_valid;
285
286 if (is_valid) {
287 compression_buffer[compression_buffer_idx] = value;
288 minimum = MinValue<T>(minimum, value);
289 maximum = MaxValue<T>(maximum, value);
290 }
291
292 compression_buffer_idx++;
293
294 if (compression_buffer_idx == BITPACKING_METADATA_GROUP_SIZE) {
295 bool success = Flush<OP>();
296 Reset();
297 return success;
298 }
299 return true;
300 }
301};
302
303//===--------------------------------------------------------------------===//
304// Analyze
305//===--------------------------------------------------------------------===//
306template <class T>
307struct BitpackingAnalyzeState : public AnalyzeState {
308 BitpackingState<T> state;
309};
310
311template <class T>
312unique_ptr<AnalyzeState> BitpackingInitAnalyze(ColumnData &col_data, PhysicalType type) {
313 auto &config = DBConfig::GetConfig(db&: col_data.GetDatabase());
314
315 auto state = make_uniq<BitpackingAnalyzeState<T>>();
316 state->state.mode = config.options.force_bitpacking_mode;
317
318 return std::move(state);
319}
320
321template <class T>
322bool BitpackingAnalyze(AnalyzeState &state, Vector &input, idx_t count) {
323 auto &analyze_state = (BitpackingAnalyzeState<T> &)state;
324 UnifiedVectorFormat vdata;
325 input.ToUnifiedFormat(count, data&: vdata);
326
327 auto data = UnifiedVectorFormat::GetData<T>(vdata);
328 for (idx_t i = 0; i < count; i++) {
329 auto idx = vdata.sel->get_index(idx: i);
330 if (!analyze_state.state.template Update<EmptyBitpackingWriter>(data[idx], vdata.validity.RowIsValid(row_idx: idx))) {
331 return false;
332 }
333 }
334 return true;
335}
336
337template <class T>
338idx_t BitpackingFinalAnalyze(AnalyzeState &state) {
339 auto &bitpacking_state = (BitpackingAnalyzeState<T> &)state;
340 auto flush_result = bitpacking_state.state.template Flush<EmptyBitpackingWriter>();
341 if (!flush_result) {
342 return DConstants::INVALID_INDEX;
343 }
344 return bitpacking_state.state.total_size;
345}
346
347//===--------------------------------------------------------------------===//
348// Compress
349//===--------------------------------------------------------------------===//
350template <class T, bool WRITE_STATISTICS, class T_S = typename std::make_signed<T>::type>
351struct BitpackingCompressState : public CompressionState {
352public:
353 explicit BitpackingCompressState(ColumnDataCheckpointer &checkpointer)
354 : checkpointer(checkpointer),
355 function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_BITPACKING)) {
356 CreateEmptySegment(row_start: checkpointer.GetRowGroup().start);
357
358 state.data_ptr = (void *)this;
359
360 auto &config = DBConfig::GetConfig(db&: checkpointer.GetDatabase());
361 state.mode = config.options.force_bitpacking_mode;
362 }
363
364 ColumnDataCheckpointer &checkpointer;
365 CompressionFunction &function;
366 unique_ptr<ColumnSegment> current_segment;
367 BufferHandle handle;
368
369 // Ptr to next free spot in segment;
370 data_ptr_t data_ptr;
371 // Ptr to next free spot for storing bitwidths and frame-of-references (growing downwards).
372 data_ptr_t metadata_ptr;
373
374 BitpackingState<T> state;
375
376public:
377 struct BitpackingWriter {
378 static void WriteConstant(T constant, idx_t count, void *data_ptr, bool all_invalid) {
379 auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr;
380
381 ReserveSpace(state, data_bytes: sizeof(T));
382 WriteMetaData(state, mode: BitpackingMode::CONSTANT);
383 WriteData(state->data_ptr, constant);
384
385 UpdateStats(state, count);
386 }
387
388 static void WriteConstantDelta(T_S constant, T frame_of_reference, idx_t count, T *values, bool *validity,
389 void *data_ptr) {
390 auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr;
391
392 ReserveSpace(state, data_bytes: 2 * sizeof(T));
393 WriteMetaData(state, mode: BitpackingMode::CONSTANT_DELTA);
394 WriteData(state->data_ptr, frame_of_reference);
395 WriteData(state->data_ptr, constant);
396
397 UpdateStats(state, count);
398 }
399
400 static void WriteDeltaFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference,
401 T_S delta_offset, T *original_values, idx_t count, void *data_ptr) {
402 auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr;
403
404 auto bp_size = BitpackingPrimitives::GetRequiredSize(count, width);
405 ReserveSpace(state, data_bytes: bp_size + 3 * sizeof(T));
406
407 WriteMetaData(state, mode: BitpackingMode::DELTA_FOR);
408 WriteData(state->data_ptr, frame_of_reference);
409 WriteData(state->data_ptr, (T)width);
410 WriteData(state->data_ptr, delta_offset);
411
412 BitpackingPrimitives::PackBuffer<T, false>(state->data_ptr, values, count, width);
413 state->data_ptr += bp_size;
414
415 UpdateStats(state, count);
416 }
417
418 static void WriteFor(T *values, bool *validity, bitpacking_width_t width, T frame_of_reference, idx_t count,
419 void *data_ptr) {
420 auto state = (BitpackingCompressState<T, WRITE_STATISTICS> *)data_ptr;
421
422 auto bp_size = BitpackingPrimitives::GetRequiredSize(count, width);
423 ReserveSpace(state, data_bytes: bp_size + 2 * sizeof(T));
424
425 WriteMetaData(state, mode: BitpackingMode::FOR);
426 WriteData(state->data_ptr, frame_of_reference);
427 WriteData(state->data_ptr, (T)width);
428
429 BitpackingPrimitives::PackBuffer<T, false>(state->data_ptr, values, count, width);
430 state->data_ptr += bp_size;
431
432 UpdateStats(state, count);
433 }
434
435 template <class T_OUT>
436 static void WriteData(data_ptr_t &ptr, T_OUT val) {
437 *((T_OUT *)ptr) = val;
438 ptr += sizeof(T_OUT);
439 }
440
441 static void WriteMetaData(BitpackingCompressState<T, WRITE_STATISTICS> *state, BitpackingMode mode) {
442 bitpacking_metadata_t metadata {.mode: mode, .offset: (uint32_t)(state->data_ptr - state->handle.Ptr())};
443 state->metadata_ptr -= sizeof(bitpacking_metadata_encoded_t);
444 Store<bitpacking_metadata_encoded_t>(EncodeMeta(metadata), state->metadata_ptr);
445 }
446
447 static void ReserveSpace(BitpackingCompressState<T, WRITE_STATISTICS> *state, idx_t data_bytes) {
448 idx_t meta_bytes = sizeof(bitpacking_metadata_encoded_t);
449 state->FlushAndCreateSegmentIfFull(data_bytes, meta_bytes);
450 D_ASSERT(state->CanStore(data_bytes, meta_bytes));
451 }
452
453 static void UpdateStats(BitpackingCompressState<T, WRITE_STATISTICS> *state, idx_t count) {
454 state->current_segment->count += count;
455
456 if (WRITE_STATISTICS && !state->state.all_invalid) {
457 NumericStats::Update<T>(state->current_segment->stats.statistics, state->state.minimum);
458 NumericStats::Update<T>(state->current_segment->stats.statistics, state->state.maximum);
459 }
460 }
461 };
462
463 bool CanStore(idx_t data_bytes, idx_t meta_bytes) {
464 auto required_data_bytes = AlignValue<idx_t>(n: (data_ptr + data_bytes) - data_ptr);
465 auto required_meta_bytes = Storage::BLOCK_SIZE - (metadata_ptr - data_ptr) + meta_bytes;
466
467 return required_data_bytes + required_meta_bytes <=
468 Storage::BLOCK_SIZE - BitpackingPrimitives::BITPACKING_HEADER_SIZE;
469 }
470
471 void CreateEmptySegment(idx_t row_start) {
472 auto &db = checkpointer.GetDatabase();
473 auto &type = checkpointer.GetType();
474 auto compressed_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start);
475 compressed_segment->function = function;
476 current_segment = std::move(compressed_segment);
477 auto &buffer_manager = BufferManager::GetBufferManager(db);
478 handle = buffer_manager.Pin(handle&: current_segment->block);
479
480 data_ptr = handle.Ptr() + BitpackingPrimitives::BITPACKING_HEADER_SIZE;
481 metadata_ptr = handle.Ptr() + Storage::BLOCK_SIZE;
482 }
483
484 void Append(UnifiedVectorFormat &vdata, idx_t count) {
485 auto data = UnifiedVectorFormat::GetData<T>(vdata);
486
487 for (idx_t i = 0; i < count; i++) {
488 auto idx = vdata.sel->get_index(idx: i);
489 state.template Update<BitpackingCompressState<T, WRITE_STATISTICS, T_S>::BitpackingWriter>(
490 data[idx], vdata.validity.RowIsValid(row_idx: idx));
491 }
492 }
493
494 void FlushAndCreateSegmentIfFull(idx_t required_data_bytes, idx_t required_meta_bytes) {
495 if (!CanStore(data_bytes: required_data_bytes, meta_bytes: required_meta_bytes)) {
496 auto row_start = current_segment->start + current_segment->count;
497 FlushSegment();
498 CreateEmptySegment(row_start);
499 }
500 }
501
502 void FlushSegment() {
503 auto &state = checkpointer.GetCheckpointState();
504 auto base_ptr = handle.Ptr();
505
506 // Compact the segment by moving the metadata next to the data.
507 idx_t metadata_offset = AlignValue(n: data_ptr - base_ptr);
508 idx_t metadata_size = base_ptr + Storage::BLOCK_SIZE - metadata_ptr;
509 idx_t total_segment_size = metadata_offset + metadata_size;
510
511 // Asserting things are still sane here
512 if (!CanStore(data_bytes: 0, meta_bytes: 0)) {
513 throw InternalException("Error in bitpacking size calculation");
514 }
515
516 memmove(dest: base_ptr + metadata_offset, src: metadata_ptr, n: metadata_size);
517
518 // Store the offset of the metadata of the first group (which is at the highest address).
519 Store<idx_t>(val: metadata_offset + metadata_size, ptr: base_ptr);
520 handle.Destroy();
521
522 state.FlushSegment(segment: std::move(current_segment), segment_size: total_segment_size);
523 }
524
525 void Finalize() {
526 state.template Flush<BitpackingCompressState<T, WRITE_STATISTICS, T_S>::BitpackingWriter>();
527 FlushSegment();
528 current_segment.reset();
529 }
530};
531
532template <class T, bool WRITE_STATISTICS>
533unique_ptr<CompressionState> BitpackingInitCompression(ColumnDataCheckpointer &checkpointer,
534 unique_ptr<AnalyzeState> state) {
535 return make_uniq<BitpackingCompressState<T, WRITE_STATISTICS>>(checkpointer);
536}
537
538template <class T, bool WRITE_STATISTICS>
539void BitpackingCompress(CompressionState &state_p, Vector &scan_vector, idx_t count) {
540 auto &state = (BitpackingCompressState<T, WRITE_STATISTICS> &)state_p;
541 UnifiedVectorFormat vdata;
542 scan_vector.ToUnifiedFormat(count, data&: vdata);
543 state.Append(vdata, count);
544}
545
546template <class T, bool WRITE_STATISTICS>
547void BitpackingFinalizeCompress(CompressionState &state_p) {
548 auto &state = (BitpackingCompressState<T, WRITE_STATISTICS> &)state_p;
549 state.Finalize();
550}
551
552//===--------------------------------------------------------------------===//
553// Scan
554//===--------------------------------------------------------------------===//
555template <class T>
556static void ApplyFrameOfReference(T *dst, T frame_of_reference, idx_t size) {
557 if (!frame_of_reference) {
558 return;
559 }
560 for (idx_t i = 0; i < size; i++) {
561 dst[i] += frame_of_reference;
562 }
563}
564
565// Based on https://github.com/lemire/FastPFor (Apache License 2.0)
566template <class T>
567static T DeltaDecode(T *data, T previous_value, const size_t size) {
568 D_ASSERT(size >= 1);
569
570 data[0] += previous_value;
571
572 const size_t UnrollQty = 4;
573 const size_t sz0 = (size / UnrollQty) * UnrollQty; // equal to 0, if size < UnrollQty
574 size_t i = 1;
575 if (sz0 >= UnrollQty) {
576 T a = data[0];
577 for (; i < sz0 - UnrollQty; i += UnrollQty) {
578 a = data[i] += a;
579 a = data[i + 1] += a;
580 a = data[i + 2] += a;
581 a = data[i + 3] += a;
582 }
583 }
584 for (; i != size; ++i) {
585 data[i] += data[i - 1];
586 }
587
588 return data[size - 1];
589}
590
591template <class T, class T_S = typename std::make_signed<T>::type>
592struct BitpackingScanState : public SegmentScanState {
593public:
594 explicit BitpackingScanState(ColumnSegment &segment) : current_segment(segment) {
595 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
596 handle = buffer_manager.Pin(handle&: segment.block);
597 auto dataptr = handle.Ptr();
598
599 // load offset to bitpacking widths pointer
600 auto bitpacking_metadata_offset = Load<idx_t>(ptr: dataptr + segment.GetBlockOffset());
601 bitpacking_metadata_ptr =
602 dataptr + segment.GetBlockOffset() + bitpacking_metadata_offset - sizeof(bitpacking_metadata_encoded_t);
603
604 // load the first group
605 LoadNextGroup();
606 }
607
608 BufferHandle handle;
609 ColumnSegment &current_segment;
610
611 T decompression_buffer[BITPACKING_METADATA_GROUP_SIZE];
612
613 bitpacking_metadata_t current_group;
614
615 bitpacking_width_t current_width;
616 T current_frame_of_reference;
617 T current_constant;
618 T current_delta_offset;
619
620 idx_t current_group_offset = 0;
621 data_ptr_t current_group_ptr;
622 data_ptr_t bitpacking_metadata_ptr;
623
624public:
625 //! Loads the metadata for the current metadata group. This will set bitpacking_metadata_ptr to the next group.
626 //! this will also load any metadata that is at the start of a compressed buffer (e.g. the width, for, or constant
627 //! value) depending on the bitpacking mode for that group
628 void LoadNextGroup() {
629 D_ASSERT(bitpacking_metadata_ptr > handle.Ptr() &&
630 bitpacking_metadata_ptr < handle.Ptr() + Storage::BLOCK_SIZE);
631 current_group_offset = 0;
632 current_group = DecodeMeta(metadata_encoded: (bitpacking_metadata_encoded_t *)bitpacking_metadata_ptr);
633
634 bitpacking_metadata_ptr -= sizeof(bitpacking_metadata_encoded_t);
635 current_group_ptr = GetPtr(group: current_group);
636
637 // Read first value
638 switch (current_group.mode) {
639 case BitpackingMode::CONSTANT:
640 current_constant = *(T *)(current_group_ptr);
641 current_group_ptr += sizeof(T);
642 break;
643 case BitpackingMode::FOR:
644 case BitpackingMode::CONSTANT_DELTA:
645 case BitpackingMode::DELTA_FOR:
646 current_frame_of_reference = *(T *)(current_group_ptr);
647 current_group_ptr += sizeof(T);
648 break;
649 default:
650 throw InternalException("Invalid bitpacking mode");
651 }
652
653 // Read second value
654 switch (current_group.mode) {
655 case BitpackingMode::CONSTANT_DELTA:
656 current_constant = *(T *)(current_group_ptr);
657 current_group_ptr += sizeof(T);
658 break;
659 case BitpackingMode::FOR:
660 case BitpackingMode::DELTA_FOR:
661 current_width = (bitpacking_width_t) * (T *)(current_group_ptr);
662 current_group_ptr += MaxValue(a: sizeof(T), b: sizeof(bitpacking_width_t));
663 break;
664 case BitpackingMode::CONSTANT:
665 break;
666 default:
667 throw InternalException("Invalid bitpacking mode");
668 }
669
670 // Read third value
671 if (current_group.mode == BitpackingMode::DELTA_FOR) {
672 current_delta_offset = *(T *)(current_group_ptr);
673 current_group_ptr += sizeof(T);
674 }
675 }
676
677 void Skip(ColumnSegment &segment, idx_t skip_count) {
678 while (skip_count > 0) {
679 if (current_group_offset + skip_count < BITPACKING_METADATA_GROUP_SIZE) {
680 // Skipping Delta FOR requires a bit of decoding to figure out the new delta
681 if (current_group.mode == BitpackingMode::DELTA_FOR) {
682 // if current_group_offset points into the middle of a
683 // BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE, we need to scan a few
684 // values before current_group_offset to align with the algorithm groups
685 idx_t extra_count = current_group_offset % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
686
687 // Calculate total offset and count to bitunpack
688 idx_t base_decompress_count = BitpackingPrimitives::RoundUpToAlgorithmGroupSize(num_to_round: skip_count);
689 idx_t decompress_count = base_decompress_count + extra_count;
690 idx_t decompress_offset = current_group_offset - extra_count;
691 bool skip_sign_extension = true;
692
693 BitpackingPrimitives::UnPackBuffer<T>(data_ptr_cast(decompression_buffer),
694 current_group_ptr + decompress_offset, decompress_count,
695 current_width, skip_sign_extension);
696
697 ApplyFrameOfReference<T_S>((T_S *)&decompression_buffer[extra_count], current_frame_of_reference,
698 skip_count);
699 DeltaDecode<T_S>((T_S *)&decompression_buffer[extra_count], (T_S)current_delta_offset,
700 (idx_t)skip_count);
701 current_delta_offset = decompression_buffer[extra_count + skip_count - 1];
702
703 current_group_offset += skip_count;
704 } else {
705 current_group_offset += skip_count;
706 }
707 break;
708 } else {
709 auto left_in_this_group = BITPACKING_METADATA_GROUP_SIZE - current_group_offset;
710 auto number_of_groups_to_skip = (skip_count - left_in_this_group) / BITPACKING_METADATA_GROUP_SIZE;
711
712 current_group_offset = 0;
713 bitpacking_metadata_ptr -= number_of_groups_to_skip * sizeof(bitpacking_metadata_encoded_t);
714
715 LoadNextGroup();
716
717 skip_count -= left_in_this_group;
718 skip_count -= number_of_groups_to_skip * BITPACKING_METADATA_GROUP_SIZE;
719 }
720 }
721 }
722
723 data_ptr_t GetPtr(bitpacking_metadata_t group) {
724 return handle.Ptr() + current_segment.GetBlockOffset() + group.offset;
725 }
726};
727
728template <class T>
729unique_ptr<SegmentScanState> BitpackingInitScan(ColumnSegment &segment) {
730 auto result = make_uniq<BitpackingScanState<T>>(segment);
731 return std::move(result);
732}
733
734//===--------------------------------------------------------------------===//
735// Scan base data
736//===--------------------------------------------------------------------===//
737template <class T, class T_S = typename std::make_signed<T>::type>
738void BitpackingScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result,
739 idx_t result_offset) {
740 auto &scan_state = (BitpackingScanState<T> &)*state.scan_state;
741
742 T *result_data = FlatVector::GetData<T>(result);
743 result.SetVectorType(VectorType::FLAT_VECTOR);
744
745 //! Because FOR offsets all our values to be 0 or above, we can always skip sign extension here
746 bool skip_sign_extend = true;
747
748 idx_t scanned = 0;
749
750 while (scanned < scan_count) {
751 // Exhausted this metadata group, move pointers to next group and load metadata for next group.
752 if (scan_state.current_group_offset >= BITPACKING_METADATA_GROUP_SIZE) {
753 scan_state.LoadNextGroup();
754 }
755
756 idx_t offset_in_compression_group =
757 scan_state.current_group_offset % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
758
759 if (scan_state.current_group.mode == BitpackingMode::CONSTANT) {
760 idx_t remaining = scan_count - scanned;
761 idx_t to_scan = MinValue(remaining, BITPACKING_METADATA_GROUP_SIZE - scan_state.current_group_offset);
762 T *begin = result_data + result_offset + scanned;
763 T *end = begin + remaining;
764 std::fill(begin, end, scan_state.current_constant);
765 scanned += to_scan;
766 scan_state.current_group_offset += to_scan;
767 continue;
768 }
769 if (scan_state.current_group.mode == BitpackingMode::CONSTANT_DELTA) {
770 idx_t remaining = scan_count - scanned;
771 idx_t to_scan = MinValue(remaining, BITPACKING_METADATA_GROUP_SIZE - scan_state.current_group_offset);
772 T *target_ptr = result_data + result_offset + scanned;
773
774 for (idx_t i = 0; i < to_scan; i++) {
775 target_ptr[i] = ((scan_state.current_group_offset + i) * scan_state.current_constant) +
776 scan_state.current_frame_of_reference;
777 }
778
779 scanned += to_scan;
780 scan_state.current_group_offset += to_scan;
781 continue;
782 }
783 D_ASSERT(scan_state.current_group.mode == BitpackingMode::FOR ||
784 scan_state.current_group.mode == BitpackingMode::DELTA_FOR);
785
786 idx_t to_scan = MinValue<idx_t>(a: scan_count - scanned, b: BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE -
787 offset_in_compression_group);
788 // Calculate start of compression algorithm group
789 data_ptr_t current_position_ptr =
790 scan_state.current_group_ptr + scan_state.current_group_offset * scan_state.current_width / 8;
791 data_ptr_t decompression_group_start_pointer =
792 current_position_ptr - offset_in_compression_group * scan_state.current_width / 8;
793
794 T *current_result_ptr = result_data + result_offset + scanned;
795
796 if (to_scan == BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE && offset_in_compression_group == 0) {
797 // Decompress directly into result vector
798 BitpackingPrimitives::UnPackBlock<T>(data_ptr_cast(current_result_ptr), decompression_group_start_pointer,
799 scan_state.current_width, skip_sign_extend);
800 } else {
801 // Decompress compression algorithm to buffer
802 BitpackingPrimitives::UnPackBlock<T>(data_ptr_cast(scan_state.decompression_buffer),
803 decompression_group_start_pointer, scan_state.current_width,
804 skip_sign_extend);
805
806 memcpy(current_result_ptr, scan_state.decompression_buffer + offset_in_compression_group,
807 to_scan * sizeof(T));
808 }
809
810 if (scan_state.current_group.mode == BitpackingMode::DELTA_FOR) {
811 ApplyFrameOfReference<T_S>((T_S *)current_result_ptr, (T_S)scan_state.current_frame_of_reference, to_scan);
812 DeltaDecode<T_S>((T_S *)current_result_ptr, (T_S)scan_state.current_delta_offset, to_scan);
813 scan_state.current_delta_offset = ((T *)current_result_ptr)[to_scan - 1];
814 } else {
815 ApplyFrameOfReference<T>(current_result_ptr, scan_state.current_frame_of_reference, to_scan);
816 }
817
818 scanned += to_scan;
819 scan_state.current_group_offset += to_scan;
820 }
821}
822
823template <class T>
824void BitpackingScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) {
825 BitpackingScanPartial<T>(segment, state, scan_count, result, 0);
826}
827
828//===--------------------------------------------------------------------===//
829// Fetch
830//===--------------------------------------------------------------------===//
831template <class T>
832void BitpackingFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result,
833 idx_t result_idx) {
834 BitpackingScanState<T> scan_state(segment);
835 scan_state.Skip(segment, row_id);
836 auto result_data = FlatVector::GetData<T>(result);
837 T *current_result_ptr = result_data + result_idx;
838
839 idx_t offset_in_compression_group =
840 scan_state.current_group_offset % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
841
842 data_ptr_t decompression_group_start_pointer =
843 scan_state.current_group_ptr +
844 (scan_state.current_group_offset - offset_in_compression_group) * scan_state.current_width / 8;
845
846 //! Because FOR offsets all our values to be 0 or above, we can always skip sign extension here
847 bool skip_sign_extend = true;
848
849 if (scan_state.current_group.mode == BitpackingMode::CONSTANT) {
850 *current_result_ptr = scan_state.current_constant;
851 return;
852 }
853
854 if (scan_state.current_group.mode == BitpackingMode::CONSTANT_DELTA) {
855 *current_result_ptr =
856 ((scan_state.current_group_offset) * scan_state.current_constant) + scan_state.current_frame_of_reference;
857 return;
858 }
859
860 D_ASSERT(scan_state.current_group.mode == BitpackingMode::FOR ||
861 scan_state.current_group.mode == BitpackingMode::DELTA_FOR);
862
863 BitpackingPrimitives::UnPackBlock<T>(data_ptr_cast(scan_state.decompression_buffer),
864 decompression_group_start_pointer, scan_state.current_width, skip_sign_extend);
865
866 *current_result_ptr = *(T *)(scan_state.decompression_buffer + offset_in_compression_group);
867 *current_result_ptr += scan_state.current_frame_of_reference;
868
869 if (scan_state.current_group.mode == BitpackingMode::DELTA_FOR) {
870 *current_result_ptr += scan_state.current_delta_offset;
871 }
872}
873template <class T>
874void BitpackingSkip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) {
875 auto &scan_state = (BitpackingScanState<T> &)*state.scan_state;
876 scan_state.Skip(segment, skip_count);
877}
878
879//===--------------------------------------------------------------------===//
880// Get Function
881//===--------------------------------------------------------------------===//
882template <class T, bool WRITE_STATISTICS = true>
883CompressionFunction GetBitpackingFunction(PhysicalType data_type) {
884 return CompressionFunction(CompressionType::COMPRESSION_BITPACKING, data_type, BitpackingInitAnalyze<T>,
885 BitpackingAnalyze<T>, BitpackingFinalAnalyze<T>,
886 BitpackingInitCompression<T, WRITE_STATISTICS>, BitpackingCompress<T, WRITE_STATISTICS>,
887 BitpackingFinalizeCompress<T, WRITE_STATISTICS>, BitpackingInitScan<T>,
888 BitpackingScan<T>, BitpackingScanPartial<T>, BitpackingFetchRow<T>, BitpackingSkip<T>);
889}
890
891CompressionFunction BitpackingFun::GetFunction(PhysicalType type) {
892 switch (type) {
893 case PhysicalType::BOOL:
894 case PhysicalType::INT8:
895 return GetBitpackingFunction<int8_t>(data_type: type);
896 case PhysicalType::INT16:
897 return GetBitpackingFunction<int16_t>(data_type: type);
898 case PhysicalType::INT32:
899 return GetBitpackingFunction<int32_t>(data_type: type);
900 case PhysicalType::INT64:
901 return GetBitpackingFunction<int64_t>(data_type: type);
902 case PhysicalType::UINT8:
903 return GetBitpackingFunction<uint8_t>(data_type: type);
904 case PhysicalType::UINT16:
905 return GetBitpackingFunction<uint16_t>(data_type: type);
906 case PhysicalType::UINT32:
907 return GetBitpackingFunction<uint32_t>(data_type: type);
908 case PhysicalType::UINT64:
909 return GetBitpackingFunction<uint64_t>(data_type: type);
910 case PhysicalType::LIST:
911 return GetBitpackingFunction<uint64_t, false>(data_type: type);
912 default:
913 throw InternalException("Unsupported type for Bitpacking");
914 }
915}
916
917bool BitpackingFun::TypeIsSupported(PhysicalType type) {
918 switch (type) {
919 case PhysicalType::BOOL:
920 case PhysicalType::INT8:
921 case PhysicalType::INT16:
922 case PhysicalType::INT32:
923 case PhysicalType::INT64:
924 case PhysicalType::UINT8:
925 case PhysicalType::UINT16:
926 case PhysicalType::UINT32:
927 case PhysicalType::UINT64:
928 case PhysicalType::LIST:
929 return true;
930 default:
931 return false;
932 }
933}
934
935} // namespace duckdb
936