1#include "duckdb/function/compression/compression.hpp"
2
3#include "duckdb/storage/table/column_segment.hpp"
4#include "duckdb/function/compression_function.hpp"
5#include "duckdb/main/config.hpp"
6#include "duckdb/storage/table/column_data_checkpointer.hpp"
7#include "duckdb/storage/buffer_manager.hpp"
8#include "duckdb/common/types/null_value.hpp"
9#include "duckdb/storage/table/scan_state.hpp"
10#include <functional>
11
12namespace duckdb {
13
14using rle_count_t = uint16_t;
15
16//===--------------------------------------------------------------------===//
17// Analyze
18//===--------------------------------------------------------------------===//
19struct EmptyRLEWriter {
20 template <class VALUE_TYPE>
21 static void Operation(VALUE_TYPE value, rle_count_t count, void *dataptr, bool is_null) {
22 }
23};
24
25template <class T>
26struct RLEState {
27 RLEState() : seen_count(0), last_value(NullValue<T>()), last_seen_count(0), dataptr(nullptr) {
28 }
29
30 idx_t seen_count;
31 T last_value;
32 rle_count_t last_seen_count;
33 void *dataptr;
34 bool all_null = true;
35
36public:
37 template <class OP>
38 void Flush() {
39 OP::template Operation<T>(last_value, last_seen_count, dataptr, all_null);
40 }
41
42 template <class OP = EmptyRLEWriter>
43 void Update(const T *data, ValidityMask &validity, idx_t idx) {
44 if (validity.RowIsValid(row_idx: idx)) {
45 if (all_null) {
46 // no value seen yet
47 // assign the current value, and increment the seen_count
48 // note that we increment last_seen_count rather than setting it to 1
49 // this is intentional: this is the first VALID value we see
50 // but it might not be the first value in case of nulls!
51 last_value = data[idx];
52 seen_count++;
53 last_seen_count++;
54 all_null = false;
55 } else if (last_value == data[idx]) {
56 // the last value is identical to this value: increment the last_seen_count
57 last_seen_count++;
58 } else {
59 // the values are different
60 // issue the callback on the last value
61 Flush<OP>();
62
63 // increment the seen_count and put the new value into the RLE slot
64 last_value = data[idx];
65 seen_count++;
66 last_seen_count = 1;
67 }
68 } else {
69 // NULL value: we merely increment the last_seen_count
70 last_seen_count++;
71 }
72 if (last_seen_count == NumericLimits<rle_count_t>::Maximum()) {
73 // we have seen the same value so many times in a row we are at the limit of what fits in our count
74 // write away the value and move to the next value
75 Flush<OP>();
76 last_seen_count = 0;
77 seen_count++;
78 }
79 }
80};
81
82template <class T>
83struct RLEAnalyzeState : public AnalyzeState {
84 RLEAnalyzeState() {
85 }
86
87 RLEState<T> state;
88};
89
90template <class T>
91unique_ptr<AnalyzeState> RLEInitAnalyze(ColumnData &col_data, PhysicalType type) {
92 return make_uniq<RLEAnalyzeState<T>>();
93}
94
95template <class T>
96bool RLEAnalyze(AnalyzeState &state, Vector &input, idx_t count) {
97 auto &rle_state = state.template Cast<RLEAnalyzeState<T>>();
98 UnifiedVectorFormat vdata;
99 input.ToUnifiedFormat(count, data&: vdata);
100
101 auto data = UnifiedVectorFormat::GetData<T>(vdata);
102 for (idx_t i = 0; i < count; i++) {
103 auto idx = vdata.sel->get_index(idx: i);
104 rle_state.state.Update(data, vdata.validity, idx);
105 }
106 return true;
107}
108
109template <class T>
110idx_t RLEFinalAnalyze(AnalyzeState &state) {
111 auto &rle_state = state.template Cast<RLEAnalyzeState<T>>();
112 return (sizeof(rle_count_t) + sizeof(T)) * rle_state.state.seen_count;
113}
114
115//===--------------------------------------------------------------------===//
116// Compress
117//===--------------------------------------------------------------------===//
118struct RLEConstants {
119 static constexpr const idx_t RLE_HEADER_SIZE = sizeof(uint64_t);
120};
121
122template <class T, bool WRITE_STATISTICS>
123struct RLECompressState : public CompressionState {
124 struct RLEWriter {
125 template <class VALUE_TYPE>
126 static void Operation(VALUE_TYPE value, rle_count_t count, void *dataptr, bool is_null) {
127 auto state = reinterpret_cast<RLECompressState<T, WRITE_STATISTICS> *>(dataptr);
128 state->WriteValue(value, count, is_null);
129 }
130 };
131
132 static idx_t MaxRLECount() {
133 auto entry_size = sizeof(T) + sizeof(rle_count_t);
134 auto entry_count = (Storage::BLOCK_SIZE - RLEConstants::RLE_HEADER_SIZE) / entry_size;
135 auto max_vector_count = entry_count / STANDARD_VECTOR_SIZE;
136 return max_vector_count * STANDARD_VECTOR_SIZE;
137 }
138
139 explicit RLECompressState(ColumnDataCheckpointer &checkpointer_p)
140 : checkpointer(checkpointer_p),
141 function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_RLE)) {
142 CreateEmptySegment(row_start: checkpointer.GetRowGroup().start);
143
144 state.dataptr = (void *)this;
145 max_rle_count = MaxRLECount();
146 }
147
148 void CreateEmptySegment(idx_t row_start) {
149 auto &db = checkpointer.GetDatabase();
150 auto &type = checkpointer.GetType();
151 auto column_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start);
152 column_segment->function = function;
153 current_segment = std::move(column_segment);
154 auto &buffer_manager = BufferManager::GetBufferManager(db);
155 handle = buffer_manager.Pin(handle&: current_segment->block);
156 }
157
158 void Append(UnifiedVectorFormat &vdata, idx_t count) {
159 auto data = UnifiedVectorFormat::GetData<T>(vdata);
160 for (idx_t i = 0; i < count; i++) {
161 auto idx = vdata.sel->get_index(idx: i);
162 state.template Update<RLECompressState<T, WRITE_STATISTICS>::RLEWriter>(data, vdata.validity, idx);
163 }
164 }
165
166 void WriteValue(T value, rle_count_t count, bool is_null) {
167 // write the RLE entry
168 auto handle_ptr = handle.Ptr() + RLEConstants::RLE_HEADER_SIZE;
169 auto data_pointer = (T *)handle_ptr;
170 auto index_pointer = (rle_count_t *)(handle_ptr + max_rle_count * sizeof(T));
171 data_pointer[entry_count] = value;
172 index_pointer[entry_count] = count;
173 entry_count++;
174
175 // update meta data
176 if (WRITE_STATISTICS && !is_null) {
177 NumericStats::Update<T>(current_segment->stats.statistics, value);
178 }
179 current_segment->count += count;
180
181 if (entry_count == max_rle_count) {
182 // we have finished writing this segment: flush it and create a new segment
183 auto row_start = current_segment->start + current_segment->count;
184 FlushSegment();
185 CreateEmptySegment(row_start);
186 entry_count = 0;
187 }
188 }
189
190 void FlushSegment() {
191 // flush the segment
192 // we compact the segment by moving the counts so they are directly next to the values
193 idx_t counts_size = sizeof(rle_count_t) * entry_count;
194 idx_t original_rle_offset = RLEConstants::RLE_HEADER_SIZE + max_rle_count * sizeof(T);
195 idx_t minimal_rle_offset = AlignValue(n: RLEConstants::RLE_HEADER_SIZE + sizeof(T) * entry_count);
196 idx_t total_segment_size = minimal_rle_offset + counts_size;
197 auto data_ptr = handle.Ptr();
198 memmove(dest: data_ptr + minimal_rle_offset, src: data_ptr + original_rle_offset, n: counts_size);
199 // store the final RLE offset within the segment
200 Store<uint64_t>(val: minimal_rle_offset, ptr: data_ptr);
201 handle.Destroy();
202
203 auto &state = checkpointer.GetCheckpointState();
204 state.FlushSegment(segment: std::move(current_segment), segment_size: total_segment_size);
205 }
206
207 void Finalize() {
208 state.template Flush<RLECompressState<T, WRITE_STATISTICS>::RLEWriter>();
209
210 FlushSegment();
211 current_segment.reset();
212 }
213
214 ColumnDataCheckpointer &checkpointer;
215 CompressionFunction &function;
216 unique_ptr<ColumnSegment> current_segment;
217 BufferHandle handle;
218
219 RLEState<T> state;
220 idx_t entry_count = 0;
221 idx_t max_rle_count;
222};
223
224template <class T, bool WRITE_STATISTICS>
225unique_ptr<CompressionState> RLEInitCompression(ColumnDataCheckpointer &checkpointer, unique_ptr<AnalyzeState> state) {
226 return make_uniq<RLECompressState<T, WRITE_STATISTICS>>(checkpointer);
227}
228
229template <class T, bool WRITE_STATISTICS>
230void RLECompress(CompressionState &state_p, Vector &scan_vector, idx_t count) {
231 auto &state = (RLECompressState<T, WRITE_STATISTICS> &)state_p;
232 UnifiedVectorFormat vdata;
233 scan_vector.ToUnifiedFormat(count, data&: vdata);
234
235 state.Append(vdata, count);
236}
237
238template <class T, bool WRITE_STATISTICS>
239void RLEFinalizeCompress(CompressionState &state_p) {
240 auto &state = (RLECompressState<T, WRITE_STATISTICS> &)state_p;
241 state.Finalize();
242}
243
244//===--------------------------------------------------------------------===//
245// Scan
246//===--------------------------------------------------------------------===//
247template <class T>
248struct RLEScanState : public SegmentScanState {
249 explicit RLEScanState(ColumnSegment &segment) {
250 auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db);
251 handle = buffer_manager.Pin(handle&: segment.block);
252 entry_pos = 0;
253 position_in_entry = 0;
254 rle_count_offset = Load<uint64_t>(ptr: handle.Ptr() + segment.GetBlockOffset());
255 D_ASSERT(rle_count_offset <= Storage::BLOCK_SIZE);
256 }
257
258 void Skip(ColumnSegment &segment, idx_t skip_count) {
259 auto data = handle.Ptr() + segment.GetBlockOffset();
260 auto index_pointer = (rle_count_t *)(data + rle_count_offset);
261
262 for (idx_t i = 0; i < skip_count; i++) {
263 // assign the current value
264 position_in_entry++;
265 if (position_in_entry >= index_pointer[entry_pos]) {
266 // handled all entries in this RLE value
267 // move to the next entry
268 entry_pos++;
269 position_in_entry = 0;
270 }
271 }
272 }
273
274 BufferHandle handle;
275 uint32_t rle_offset;
276 idx_t entry_pos;
277 idx_t position_in_entry;
278 uint32_t rle_count_offset;
279};
280
281template <class T>
282unique_ptr<SegmentScanState> RLEInitScan(ColumnSegment &segment) {
283 auto result = make_uniq<RLEScanState<T>>(segment);
284 return std::move(result);
285}
286
287//===--------------------------------------------------------------------===//
288// Scan base data
289//===--------------------------------------------------------------------===//
290template <class T>
291void RLESkip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) {
292 auto &scan_state = state.scan_state->Cast<RLEScanState<T>>();
293 scan_state.Skip(segment, skip_count);
294}
295
296template <class T>
297void RLEScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result,
298 idx_t result_offset) {
299 auto &scan_state = state.scan_state->Cast<RLEScanState<T>>();
300
301 auto data = scan_state.handle.Ptr() + segment.GetBlockOffset();
302 auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE);
303 auto index_pointer = (rle_count_t *)(data + scan_state.rle_count_offset);
304
305 auto result_data = FlatVector::GetData<T>(result);
306 result.SetVectorType(VectorType::FLAT_VECTOR);
307 for (idx_t i = 0; i < scan_count; i++) {
308 // assign the current value
309 result_data[result_offset + i] = data_pointer[scan_state.entry_pos];
310 scan_state.position_in_entry++;
311 if (scan_state.position_in_entry >= index_pointer[scan_state.entry_pos]) {
312 // handled all entries in this RLE value
313 // move to the next entry
314 scan_state.entry_pos++;
315 scan_state.position_in_entry = 0;
316 }
317 }
318}
319
320template <class T>
321void RLEScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) {
322 // FIXME: emit constant vector if repetition of single value is >= scan_count
323 RLEScanPartial<T>(segment, state, scan_count, result, 0);
324}
325
326//===--------------------------------------------------------------------===//
327// Fetch
328//===--------------------------------------------------------------------===//
329template <class T>
330void RLEFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, idx_t result_idx) {
331 RLEScanState<T> scan_state(segment);
332 scan_state.Skip(segment, row_id);
333
334 auto data = scan_state.handle.Ptr() + segment.GetBlockOffset();
335 auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE);
336 auto result_data = FlatVector::GetData<T>(result);
337 result_data[result_idx] = data_pointer[scan_state.entry_pos];
338}
339
340//===--------------------------------------------------------------------===//
341// Get Function
342//===--------------------------------------------------------------------===//
343template <class T, bool WRITE_STATISTICS = true>
344CompressionFunction GetRLEFunction(PhysicalType data_type) {
345 return CompressionFunction(CompressionType::COMPRESSION_RLE, data_type, RLEInitAnalyze<T>, RLEAnalyze<T>,
346 RLEFinalAnalyze<T>, RLEInitCompression<T, WRITE_STATISTICS>,
347 RLECompress<T, WRITE_STATISTICS>, RLEFinalizeCompress<T, WRITE_STATISTICS>,
348 RLEInitScan<T>, RLEScan<T>, RLEScanPartial<T>, RLEFetchRow<T>, RLESkip<T>);
349}
350
351CompressionFunction RLEFun::GetFunction(PhysicalType type) {
352 switch (type) {
353 case PhysicalType::BOOL:
354 case PhysicalType::INT8:
355 return GetRLEFunction<int8_t>(data_type: type);
356 case PhysicalType::INT16:
357 return GetRLEFunction<int16_t>(data_type: type);
358 case PhysicalType::INT32:
359 return GetRLEFunction<int32_t>(data_type: type);
360 case PhysicalType::INT64:
361 return GetRLEFunction<int64_t>(data_type: type);
362 case PhysicalType::INT128:
363 return GetRLEFunction<hugeint_t>(data_type: type);
364 case PhysicalType::UINT8:
365 return GetRLEFunction<uint8_t>(data_type: type);
366 case PhysicalType::UINT16:
367 return GetRLEFunction<uint16_t>(data_type: type);
368 case PhysicalType::UINT32:
369 return GetRLEFunction<uint32_t>(data_type: type);
370 case PhysicalType::UINT64:
371 return GetRLEFunction<uint64_t>(data_type: type);
372 case PhysicalType::FLOAT:
373 return GetRLEFunction<float>(data_type: type);
374 case PhysicalType::DOUBLE:
375 return GetRLEFunction<double>(data_type: type);
376 case PhysicalType::LIST:
377 return GetRLEFunction<uint64_t, false>(data_type: type);
378 default:
379 throw InternalException("Unsupported type for RLE");
380 }
381}
382
383bool RLEFun::TypeIsSupported(PhysicalType type) {
384 switch (type) {
385 case PhysicalType::BOOL:
386 case PhysicalType::INT8:
387 case PhysicalType::INT16:
388 case PhysicalType::INT32:
389 case PhysicalType::INT64:
390 case PhysicalType::INT128:
391 case PhysicalType::UINT8:
392 case PhysicalType::UINT16:
393 case PhysicalType::UINT32:
394 case PhysicalType::UINT64:
395 case PhysicalType::FLOAT:
396 case PhysicalType::DOUBLE:
397 case PhysicalType::LIST:
398 return true;
399 default:
400 return false;
401 }
402}
403
404} // namespace duckdb
405