1 | #include "duckdb/function/compression/compression.hpp" |
2 | |
3 | #include "duckdb/storage/table/column_segment.hpp" |
4 | #include "duckdb/function/compression_function.hpp" |
5 | #include "duckdb/main/config.hpp" |
6 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
7 | #include "duckdb/storage/buffer_manager.hpp" |
8 | #include "duckdb/common/types/null_value.hpp" |
9 | #include "duckdb/storage/table/scan_state.hpp" |
10 | #include <functional> |
11 | |
12 | namespace duckdb { |
13 | |
14 | using rle_count_t = uint16_t; |
15 | |
16 | //===--------------------------------------------------------------------===// |
17 | // Analyze |
18 | //===--------------------------------------------------------------------===// |
19 | struct EmptyRLEWriter { |
20 | template <class VALUE_TYPE> |
21 | static void Operation(VALUE_TYPE value, rle_count_t count, void *dataptr, bool is_null) { |
22 | } |
23 | }; |
24 | |
25 | template <class T> |
26 | struct RLEState { |
27 | RLEState() : seen_count(0), last_value(NullValue<T>()), last_seen_count(0), dataptr(nullptr) { |
28 | } |
29 | |
30 | idx_t seen_count; |
31 | T last_value; |
32 | rle_count_t last_seen_count; |
33 | void *dataptr; |
34 | bool all_null = true; |
35 | |
36 | public: |
37 | template <class OP> |
38 | void Flush() { |
39 | OP::template Operation<T>(last_value, last_seen_count, dataptr, all_null); |
40 | } |
41 | |
42 | template <class OP = EmptyRLEWriter> |
43 | void Update(const T *data, ValidityMask &validity, idx_t idx) { |
44 | if (validity.RowIsValid(row_idx: idx)) { |
45 | if (all_null) { |
46 | // no value seen yet |
47 | // assign the current value, and increment the seen_count |
48 | // note that we increment last_seen_count rather than setting it to 1 |
49 | // this is intentional: this is the first VALID value we see |
50 | // but it might not be the first value in case of nulls! |
51 | last_value = data[idx]; |
52 | seen_count++; |
53 | last_seen_count++; |
54 | all_null = false; |
55 | } else if (last_value == data[idx]) { |
56 | // the last value is identical to this value: increment the last_seen_count |
57 | last_seen_count++; |
58 | } else { |
59 | // the values are different |
60 | // issue the callback on the last value |
61 | Flush<OP>(); |
62 | |
63 | // increment the seen_count and put the new value into the RLE slot |
64 | last_value = data[idx]; |
65 | seen_count++; |
66 | last_seen_count = 1; |
67 | } |
68 | } else { |
69 | // NULL value: we merely increment the last_seen_count |
70 | last_seen_count++; |
71 | } |
72 | if (last_seen_count == NumericLimits<rle_count_t>::Maximum()) { |
73 | // we have seen the same value so many times in a row we are at the limit of what fits in our count |
74 | // write away the value and move to the next value |
75 | Flush<OP>(); |
76 | last_seen_count = 0; |
77 | seen_count++; |
78 | } |
79 | } |
80 | }; |
81 | |
82 | template <class T> |
83 | struct RLEAnalyzeState : public AnalyzeState { |
84 | RLEAnalyzeState() { |
85 | } |
86 | |
87 | RLEState<T> state; |
88 | }; |
89 | |
90 | template <class T> |
91 | unique_ptr<AnalyzeState> RLEInitAnalyze(ColumnData &col_data, PhysicalType type) { |
92 | return make_uniq<RLEAnalyzeState<T>>(); |
93 | } |
94 | |
95 | template <class T> |
96 | bool RLEAnalyze(AnalyzeState &state, Vector &input, idx_t count) { |
97 | auto &rle_state = state.template Cast<RLEAnalyzeState<T>>(); |
98 | UnifiedVectorFormat vdata; |
99 | input.ToUnifiedFormat(count, data&: vdata); |
100 | |
101 | auto data = UnifiedVectorFormat::GetData<T>(vdata); |
102 | for (idx_t i = 0; i < count; i++) { |
103 | auto idx = vdata.sel->get_index(idx: i); |
104 | rle_state.state.Update(data, vdata.validity, idx); |
105 | } |
106 | return true; |
107 | } |
108 | |
109 | template <class T> |
110 | idx_t RLEFinalAnalyze(AnalyzeState &state) { |
111 | auto &rle_state = state.template Cast<RLEAnalyzeState<T>>(); |
112 | return (sizeof(rle_count_t) + sizeof(T)) * rle_state.state.seen_count; |
113 | } |
114 | |
115 | //===--------------------------------------------------------------------===// |
116 | // Compress |
117 | //===--------------------------------------------------------------------===// |
118 | struct RLEConstants { |
119 | static constexpr const idx_t = sizeof(uint64_t); |
120 | }; |
121 | |
122 | template <class T, bool WRITE_STATISTICS> |
123 | struct RLECompressState : public CompressionState { |
124 | struct RLEWriter { |
125 | template <class VALUE_TYPE> |
126 | static void Operation(VALUE_TYPE value, rle_count_t count, void *dataptr, bool is_null) { |
127 | auto state = reinterpret_cast<RLECompressState<T, WRITE_STATISTICS> *>(dataptr); |
128 | state->WriteValue(value, count, is_null); |
129 | } |
130 | }; |
131 | |
132 | static idx_t MaxRLECount() { |
133 | auto entry_size = sizeof(T) + sizeof(rle_count_t); |
134 | auto entry_count = (Storage::BLOCK_SIZE - RLEConstants::RLE_HEADER_SIZE) / entry_size; |
135 | auto max_vector_count = entry_count / STANDARD_VECTOR_SIZE; |
136 | return max_vector_count * STANDARD_VECTOR_SIZE; |
137 | } |
138 | |
139 | explicit RLECompressState(ColumnDataCheckpointer &checkpointer_p) |
140 | : checkpointer(checkpointer_p), |
141 | function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_RLE)) { |
142 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
143 | |
144 | state.dataptr = (void *)this; |
145 | max_rle_count = MaxRLECount(); |
146 | } |
147 | |
148 | void CreateEmptySegment(idx_t row_start) { |
149 | auto &db = checkpointer.GetDatabase(); |
150 | auto &type = checkpointer.GetType(); |
151 | auto column_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
152 | column_segment->function = function; |
153 | current_segment = std::move(column_segment); |
154 | auto &buffer_manager = BufferManager::GetBufferManager(db); |
155 | handle = buffer_manager.Pin(handle&: current_segment->block); |
156 | } |
157 | |
158 | void Append(UnifiedVectorFormat &vdata, idx_t count) { |
159 | auto data = UnifiedVectorFormat::GetData<T>(vdata); |
160 | for (idx_t i = 0; i < count; i++) { |
161 | auto idx = vdata.sel->get_index(idx: i); |
162 | state.template Update<RLECompressState<T, WRITE_STATISTICS>::RLEWriter>(data, vdata.validity, idx); |
163 | } |
164 | } |
165 | |
166 | void WriteValue(T value, rle_count_t count, bool is_null) { |
167 | // write the RLE entry |
168 | auto handle_ptr = handle.Ptr() + RLEConstants::RLE_HEADER_SIZE; |
169 | auto data_pointer = (T *)handle_ptr; |
170 | auto index_pointer = (rle_count_t *)(handle_ptr + max_rle_count * sizeof(T)); |
171 | data_pointer[entry_count] = value; |
172 | index_pointer[entry_count] = count; |
173 | entry_count++; |
174 | |
175 | // update meta data |
176 | if (WRITE_STATISTICS && !is_null) { |
177 | NumericStats::Update<T>(current_segment->stats.statistics, value); |
178 | } |
179 | current_segment->count += count; |
180 | |
181 | if (entry_count == max_rle_count) { |
182 | // we have finished writing this segment: flush it and create a new segment |
183 | auto row_start = current_segment->start + current_segment->count; |
184 | FlushSegment(); |
185 | CreateEmptySegment(row_start); |
186 | entry_count = 0; |
187 | } |
188 | } |
189 | |
190 | void FlushSegment() { |
191 | // flush the segment |
192 | // we compact the segment by moving the counts so they are directly next to the values |
193 | idx_t counts_size = sizeof(rle_count_t) * entry_count; |
194 | idx_t original_rle_offset = RLEConstants::RLE_HEADER_SIZE + max_rle_count * sizeof(T); |
195 | idx_t minimal_rle_offset = AlignValue(n: RLEConstants::RLE_HEADER_SIZE + sizeof(T) * entry_count); |
196 | idx_t total_segment_size = minimal_rle_offset + counts_size; |
197 | auto data_ptr = handle.Ptr(); |
198 | memmove(dest: data_ptr + minimal_rle_offset, src: data_ptr + original_rle_offset, n: counts_size); |
199 | // store the final RLE offset within the segment |
200 | Store<uint64_t>(val: minimal_rle_offset, ptr: data_ptr); |
201 | handle.Destroy(); |
202 | |
203 | auto &state = checkpointer.GetCheckpointState(); |
204 | state.FlushSegment(segment: std::move(current_segment), segment_size: total_segment_size); |
205 | } |
206 | |
207 | void Finalize() { |
208 | state.template Flush<RLECompressState<T, WRITE_STATISTICS>::RLEWriter>(); |
209 | |
210 | FlushSegment(); |
211 | current_segment.reset(); |
212 | } |
213 | |
214 | ColumnDataCheckpointer &checkpointer; |
215 | CompressionFunction &function; |
216 | unique_ptr<ColumnSegment> current_segment; |
217 | BufferHandle handle; |
218 | |
219 | RLEState<T> state; |
220 | idx_t entry_count = 0; |
221 | idx_t max_rle_count; |
222 | }; |
223 | |
224 | template <class T, bool WRITE_STATISTICS> |
225 | unique_ptr<CompressionState> RLEInitCompression(ColumnDataCheckpointer &checkpointer, unique_ptr<AnalyzeState> state) { |
226 | return make_uniq<RLECompressState<T, WRITE_STATISTICS>>(checkpointer); |
227 | } |
228 | |
229 | template <class T, bool WRITE_STATISTICS> |
230 | void RLECompress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
231 | auto &state = (RLECompressState<T, WRITE_STATISTICS> &)state_p; |
232 | UnifiedVectorFormat vdata; |
233 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
234 | |
235 | state.Append(vdata, count); |
236 | } |
237 | |
238 | template <class T, bool WRITE_STATISTICS> |
239 | void RLEFinalizeCompress(CompressionState &state_p) { |
240 | auto &state = (RLECompressState<T, WRITE_STATISTICS> &)state_p; |
241 | state.Finalize(); |
242 | } |
243 | |
244 | //===--------------------------------------------------------------------===// |
245 | // Scan |
246 | //===--------------------------------------------------------------------===// |
247 | template <class T> |
248 | struct RLEScanState : public SegmentScanState { |
249 | explicit RLEScanState(ColumnSegment &segment) { |
250 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
251 | handle = buffer_manager.Pin(handle&: segment.block); |
252 | entry_pos = 0; |
253 | position_in_entry = 0; |
254 | rle_count_offset = Load<uint64_t>(ptr: handle.Ptr() + segment.GetBlockOffset()); |
255 | D_ASSERT(rle_count_offset <= Storage::BLOCK_SIZE); |
256 | } |
257 | |
258 | void Skip(ColumnSegment &segment, idx_t skip_count) { |
259 | auto data = handle.Ptr() + segment.GetBlockOffset(); |
260 | auto index_pointer = (rle_count_t *)(data + rle_count_offset); |
261 | |
262 | for (idx_t i = 0; i < skip_count; i++) { |
263 | // assign the current value |
264 | position_in_entry++; |
265 | if (position_in_entry >= index_pointer[entry_pos]) { |
266 | // handled all entries in this RLE value |
267 | // move to the next entry |
268 | entry_pos++; |
269 | position_in_entry = 0; |
270 | } |
271 | } |
272 | } |
273 | |
274 | BufferHandle handle; |
275 | uint32_t rle_offset; |
276 | idx_t entry_pos; |
277 | idx_t position_in_entry; |
278 | uint32_t rle_count_offset; |
279 | }; |
280 | |
281 | template <class T> |
282 | unique_ptr<SegmentScanState> RLEInitScan(ColumnSegment &segment) { |
283 | auto result = make_uniq<RLEScanState<T>>(segment); |
284 | return std::move(result); |
285 | } |
286 | |
287 | //===--------------------------------------------------------------------===// |
288 | // Scan base data |
289 | //===--------------------------------------------------------------------===// |
290 | template <class T> |
291 | void RLESkip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) { |
292 | auto &scan_state = state.scan_state->Cast<RLEScanState<T>>(); |
293 | scan_state.Skip(segment, skip_count); |
294 | } |
295 | |
296 | template <class T> |
297 | void RLEScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
298 | idx_t result_offset) { |
299 | auto &scan_state = state.scan_state->Cast<RLEScanState<T>>(); |
300 | |
301 | auto data = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
302 | auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE); |
303 | auto index_pointer = (rle_count_t *)(data + scan_state.rle_count_offset); |
304 | |
305 | auto result_data = FlatVector::GetData<T>(result); |
306 | result.SetVectorType(VectorType::FLAT_VECTOR); |
307 | for (idx_t i = 0; i < scan_count; i++) { |
308 | // assign the current value |
309 | result_data[result_offset + i] = data_pointer[scan_state.entry_pos]; |
310 | scan_state.position_in_entry++; |
311 | if (scan_state.position_in_entry >= index_pointer[scan_state.entry_pos]) { |
312 | // handled all entries in this RLE value |
313 | // move to the next entry |
314 | scan_state.entry_pos++; |
315 | scan_state.position_in_entry = 0; |
316 | } |
317 | } |
318 | } |
319 | |
320 | template <class T> |
321 | void RLEScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { |
322 | // FIXME: emit constant vector if repetition of single value is >= scan_count |
323 | RLEScanPartial<T>(segment, state, scan_count, result, 0); |
324 | } |
325 | |
326 | //===--------------------------------------------------------------------===// |
327 | // Fetch |
328 | //===--------------------------------------------------------------------===// |
329 | template <class T> |
330 | void RLEFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, idx_t result_idx) { |
331 | RLEScanState<T> scan_state(segment); |
332 | scan_state.Skip(segment, row_id); |
333 | |
334 | auto data = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
335 | auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE); |
336 | auto result_data = FlatVector::GetData<T>(result); |
337 | result_data[result_idx] = data_pointer[scan_state.entry_pos]; |
338 | } |
339 | |
340 | //===--------------------------------------------------------------------===// |
341 | // Get Function |
342 | //===--------------------------------------------------------------------===// |
343 | template <class T, bool WRITE_STATISTICS = true> |
344 | CompressionFunction GetRLEFunction(PhysicalType data_type) { |
345 | return CompressionFunction(CompressionType::COMPRESSION_RLE, data_type, RLEInitAnalyze<T>, RLEAnalyze<T>, |
346 | RLEFinalAnalyze<T>, RLEInitCompression<T, WRITE_STATISTICS>, |
347 | RLECompress<T, WRITE_STATISTICS>, RLEFinalizeCompress<T, WRITE_STATISTICS>, |
348 | RLEInitScan<T>, RLEScan<T>, RLEScanPartial<T>, RLEFetchRow<T>, RLESkip<T>); |
349 | } |
350 | |
351 | CompressionFunction RLEFun::GetFunction(PhysicalType type) { |
352 | switch (type) { |
353 | case PhysicalType::BOOL: |
354 | case PhysicalType::INT8: |
355 | return GetRLEFunction<int8_t>(data_type: type); |
356 | case PhysicalType::INT16: |
357 | return GetRLEFunction<int16_t>(data_type: type); |
358 | case PhysicalType::INT32: |
359 | return GetRLEFunction<int32_t>(data_type: type); |
360 | case PhysicalType::INT64: |
361 | return GetRLEFunction<int64_t>(data_type: type); |
362 | case PhysicalType::INT128: |
363 | return GetRLEFunction<hugeint_t>(data_type: type); |
364 | case PhysicalType::UINT8: |
365 | return GetRLEFunction<uint8_t>(data_type: type); |
366 | case PhysicalType::UINT16: |
367 | return GetRLEFunction<uint16_t>(data_type: type); |
368 | case PhysicalType::UINT32: |
369 | return GetRLEFunction<uint32_t>(data_type: type); |
370 | case PhysicalType::UINT64: |
371 | return GetRLEFunction<uint64_t>(data_type: type); |
372 | case PhysicalType::FLOAT: |
373 | return GetRLEFunction<float>(data_type: type); |
374 | case PhysicalType::DOUBLE: |
375 | return GetRLEFunction<double>(data_type: type); |
376 | case PhysicalType::LIST: |
377 | return GetRLEFunction<uint64_t, false>(data_type: type); |
378 | default: |
379 | throw InternalException("Unsupported type for RLE" ); |
380 | } |
381 | } |
382 | |
383 | bool RLEFun::TypeIsSupported(PhysicalType type) { |
384 | switch (type) { |
385 | case PhysicalType::BOOL: |
386 | case PhysicalType::INT8: |
387 | case PhysicalType::INT16: |
388 | case PhysicalType::INT32: |
389 | case PhysicalType::INT64: |
390 | case PhysicalType::INT128: |
391 | case PhysicalType::UINT8: |
392 | case PhysicalType::UINT16: |
393 | case PhysicalType::UINT32: |
394 | case PhysicalType::UINT64: |
395 | case PhysicalType::FLOAT: |
396 | case PhysicalType::DOUBLE: |
397 | case PhysicalType::LIST: |
398 | return true; |
399 | default: |
400 | return false; |
401 | } |
402 | } |
403 | |
404 | } // namespace duckdb |
405 | |