| 1 | #include "duckdb/function/compression/compression.hpp" |
| 2 | |
| 3 | #include "duckdb/storage/table/column_segment.hpp" |
| 4 | #include "duckdb/function/compression_function.hpp" |
| 5 | #include "duckdb/main/config.hpp" |
| 6 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
| 7 | #include "duckdb/storage/buffer_manager.hpp" |
| 8 | #include "duckdb/common/types/null_value.hpp" |
| 9 | #include "duckdb/storage/table/scan_state.hpp" |
| 10 | #include <functional> |
| 11 | |
| 12 | namespace duckdb { |
| 13 | |
| 14 | using rle_count_t = uint16_t; |
| 15 | |
| 16 | //===--------------------------------------------------------------------===// |
| 17 | // Analyze |
| 18 | //===--------------------------------------------------------------------===// |
| 19 | struct EmptyRLEWriter { |
| 20 | template <class VALUE_TYPE> |
| 21 | static void Operation(VALUE_TYPE value, rle_count_t count, void *dataptr, bool is_null) { |
| 22 | } |
| 23 | }; |
| 24 | |
| 25 | template <class T> |
| 26 | struct RLEState { |
| 27 | RLEState() : seen_count(0), last_value(NullValue<T>()), last_seen_count(0), dataptr(nullptr) { |
| 28 | } |
| 29 | |
| 30 | idx_t seen_count; |
| 31 | T last_value; |
| 32 | rle_count_t last_seen_count; |
| 33 | void *dataptr; |
| 34 | bool all_null = true; |
| 35 | |
| 36 | public: |
| 37 | template <class OP> |
| 38 | void Flush() { |
| 39 | OP::template Operation<T>(last_value, last_seen_count, dataptr, all_null); |
| 40 | } |
| 41 | |
| 42 | template <class OP = EmptyRLEWriter> |
| 43 | void Update(const T *data, ValidityMask &validity, idx_t idx) { |
| 44 | if (validity.RowIsValid(row_idx: idx)) { |
| 45 | if (all_null) { |
| 46 | // no value seen yet |
| 47 | // assign the current value, and increment the seen_count |
| 48 | // note that we increment last_seen_count rather than setting it to 1 |
| 49 | // this is intentional: this is the first VALID value we see |
| 50 | // but it might not be the first value in case of nulls! |
| 51 | last_value = data[idx]; |
| 52 | seen_count++; |
| 53 | last_seen_count++; |
| 54 | all_null = false; |
| 55 | } else if (last_value == data[idx]) { |
| 56 | // the last value is identical to this value: increment the last_seen_count |
| 57 | last_seen_count++; |
| 58 | } else { |
| 59 | // the values are different |
| 60 | // issue the callback on the last value |
| 61 | Flush<OP>(); |
| 62 | |
| 63 | // increment the seen_count and put the new value into the RLE slot |
| 64 | last_value = data[idx]; |
| 65 | seen_count++; |
| 66 | last_seen_count = 1; |
| 67 | } |
| 68 | } else { |
| 69 | // NULL value: we merely increment the last_seen_count |
| 70 | last_seen_count++; |
| 71 | } |
| 72 | if (last_seen_count == NumericLimits<rle_count_t>::Maximum()) { |
| 73 | // we have seen the same value so many times in a row we are at the limit of what fits in our count |
| 74 | // write away the value and move to the next value |
| 75 | Flush<OP>(); |
| 76 | last_seen_count = 0; |
| 77 | seen_count++; |
| 78 | } |
| 79 | } |
| 80 | }; |
| 81 | |
| 82 | template <class T> |
| 83 | struct RLEAnalyzeState : public AnalyzeState { |
| 84 | RLEAnalyzeState() { |
| 85 | } |
| 86 | |
| 87 | RLEState<T> state; |
| 88 | }; |
| 89 | |
| 90 | template <class T> |
| 91 | unique_ptr<AnalyzeState> RLEInitAnalyze(ColumnData &col_data, PhysicalType type) { |
| 92 | return make_uniq<RLEAnalyzeState<T>>(); |
| 93 | } |
| 94 | |
| 95 | template <class T> |
| 96 | bool RLEAnalyze(AnalyzeState &state, Vector &input, idx_t count) { |
| 97 | auto &rle_state = state.template Cast<RLEAnalyzeState<T>>(); |
| 98 | UnifiedVectorFormat vdata; |
| 99 | input.ToUnifiedFormat(count, data&: vdata); |
| 100 | |
| 101 | auto data = UnifiedVectorFormat::GetData<T>(vdata); |
| 102 | for (idx_t i = 0; i < count; i++) { |
| 103 | auto idx = vdata.sel->get_index(idx: i); |
| 104 | rle_state.state.Update(data, vdata.validity, idx); |
| 105 | } |
| 106 | return true; |
| 107 | } |
| 108 | |
| 109 | template <class T> |
| 110 | idx_t RLEFinalAnalyze(AnalyzeState &state) { |
| 111 | auto &rle_state = state.template Cast<RLEAnalyzeState<T>>(); |
| 112 | return (sizeof(rle_count_t) + sizeof(T)) * rle_state.state.seen_count; |
| 113 | } |
| 114 | |
| 115 | //===--------------------------------------------------------------------===// |
| 116 | // Compress |
| 117 | //===--------------------------------------------------------------------===// |
| 118 | struct RLEConstants { |
| 119 | static constexpr const idx_t = sizeof(uint64_t); |
| 120 | }; |
| 121 | |
| 122 | template <class T, bool WRITE_STATISTICS> |
| 123 | struct RLECompressState : public CompressionState { |
| 124 | struct RLEWriter { |
| 125 | template <class VALUE_TYPE> |
| 126 | static void Operation(VALUE_TYPE value, rle_count_t count, void *dataptr, bool is_null) { |
| 127 | auto state = reinterpret_cast<RLECompressState<T, WRITE_STATISTICS> *>(dataptr); |
| 128 | state->WriteValue(value, count, is_null); |
| 129 | } |
| 130 | }; |
| 131 | |
| 132 | static idx_t MaxRLECount() { |
| 133 | auto entry_size = sizeof(T) + sizeof(rle_count_t); |
| 134 | auto entry_count = (Storage::BLOCK_SIZE - RLEConstants::RLE_HEADER_SIZE) / entry_size; |
| 135 | auto max_vector_count = entry_count / STANDARD_VECTOR_SIZE; |
| 136 | return max_vector_count * STANDARD_VECTOR_SIZE; |
| 137 | } |
| 138 | |
| 139 | explicit RLECompressState(ColumnDataCheckpointer &checkpointer_p) |
| 140 | : checkpointer(checkpointer_p), |
| 141 | function(checkpointer.GetCompressionFunction(type: CompressionType::COMPRESSION_RLE)) { |
| 142 | CreateEmptySegment(row_start: checkpointer.GetRowGroup().start); |
| 143 | |
| 144 | state.dataptr = (void *)this; |
| 145 | max_rle_count = MaxRLECount(); |
| 146 | } |
| 147 | |
| 148 | void CreateEmptySegment(idx_t row_start) { |
| 149 | auto &db = checkpointer.GetDatabase(); |
| 150 | auto &type = checkpointer.GetType(); |
| 151 | auto column_segment = ColumnSegment::CreateTransientSegment(db, type, start: row_start); |
| 152 | column_segment->function = function; |
| 153 | current_segment = std::move(column_segment); |
| 154 | auto &buffer_manager = BufferManager::GetBufferManager(db); |
| 155 | handle = buffer_manager.Pin(handle&: current_segment->block); |
| 156 | } |
| 157 | |
| 158 | void Append(UnifiedVectorFormat &vdata, idx_t count) { |
| 159 | auto data = UnifiedVectorFormat::GetData<T>(vdata); |
| 160 | for (idx_t i = 0; i < count; i++) { |
| 161 | auto idx = vdata.sel->get_index(idx: i); |
| 162 | state.template Update<RLECompressState<T, WRITE_STATISTICS>::RLEWriter>(data, vdata.validity, idx); |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | void WriteValue(T value, rle_count_t count, bool is_null) { |
| 167 | // write the RLE entry |
| 168 | auto handle_ptr = handle.Ptr() + RLEConstants::RLE_HEADER_SIZE; |
| 169 | auto data_pointer = (T *)handle_ptr; |
| 170 | auto index_pointer = (rle_count_t *)(handle_ptr + max_rle_count * sizeof(T)); |
| 171 | data_pointer[entry_count] = value; |
| 172 | index_pointer[entry_count] = count; |
| 173 | entry_count++; |
| 174 | |
| 175 | // update meta data |
| 176 | if (WRITE_STATISTICS && !is_null) { |
| 177 | NumericStats::Update<T>(current_segment->stats.statistics, value); |
| 178 | } |
| 179 | current_segment->count += count; |
| 180 | |
| 181 | if (entry_count == max_rle_count) { |
| 182 | // we have finished writing this segment: flush it and create a new segment |
| 183 | auto row_start = current_segment->start + current_segment->count; |
| 184 | FlushSegment(); |
| 185 | CreateEmptySegment(row_start); |
| 186 | entry_count = 0; |
| 187 | } |
| 188 | } |
| 189 | |
| 190 | void FlushSegment() { |
| 191 | // flush the segment |
| 192 | // we compact the segment by moving the counts so they are directly next to the values |
| 193 | idx_t counts_size = sizeof(rle_count_t) * entry_count; |
| 194 | idx_t original_rle_offset = RLEConstants::RLE_HEADER_SIZE + max_rle_count * sizeof(T); |
| 195 | idx_t minimal_rle_offset = AlignValue(n: RLEConstants::RLE_HEADER_SIZE + sizeof(T) * entry_count); |
| 196 | idx_t total_segment_size = minimal_rle_offset + counts_size; |
| 197 | auto data_ptr = handle.Ptr(); |
| 198 | memmove(dest: data_ptr + minimal_rle_offset, src: data_ptr + original_rle_offset, n: counts_size); |
| 199 | // store the final RLE offset within the segment |
| 200 | Store<uint64_t>(val: minimal_rle_offset, ptr: data_ptr); |
| 201 | handle.Destroy(); |
| 202 | |
| 203 | auto &state = checkpointer.GetCheckpointState(); |
| 204 | state.FlushSegment(segment: std::move(current_segment), segment_size: total_segment_size); |
| 205 | } |
| 206 | |
| 207 | void Finalize() { |
| 208 | state.template Flush<RLECompressState<T, WRITE_STATISTICS>::RLEWriter>(); |
| 209 | |
| 210 | FlushSegment(); |
| 211 | current_segment.reset(); |
| 212 | } |
| 213 | |
| 214 | ColumnDataCheckpointer &checkpointer; |
| 215 | CompressionFunction &function; |
| 216 | unique_ptr<ColumnSegment> current_segment; |
| 217 | BufferHandle handle; |
| 218 | |
| 219 | RLEState<T> state; |
| 220 | idx_t entry_count = 0; |
| 221 | idx_t max_rle_count; |
| 222 | }; |
| 223 | |
| 224 | template <class T, bool WRITE_STATISTICS> |
| 225 | unique_ptr<CompressionState> RLEInitCompression(ColumnDataCheckpointer &checkpointer, unique_ptr<AnalyzeState> state) { |
| 226 | return make_uniq<RLECompressState<T, WRITE_STATISTICS>>(checkpointer); |
| 227 | } |
| 228 | |
| 229 | template <class T, bool WRITE_STATISTICS> |
| 230 | void RLECompress(CompressionState &state_p, Vector &scan_vector, idx_t count) { |
| 231 | auto &state = (RLECompressState<T, WRITE_STATISTICS> &)state_p; |
| 232 | UnifiedVectorFormat vdata; |
| 233 | scan_vector.ToUnifiedFormat(count, data&: vdata); |
| 234 | |
| 235 | state.Append(vdata, count); |
| 236 | } |
| 237 | |
| 238 | template <class T, bool WRITE_STATISTICS> |
| 239 | void RLEFinalizeCompress(CompressionState &state_p) { |
| 240 | auto &state = (RLECompressState<T, WRITE_STATISTICS> &)state_p; |
| 241 | state.Finalize(); |
| 242 | } |
| 243 | |
| 244 | //===--------------------------------------------------------------------===// |
| 245 | // Scan |
| 246 | //===--------------------------------------------------------------------===// |
| 247 | template <class T> |
| 248 | struct RLEScanState : public SegmentScanState { |
| 249 | explicit RLEScanState(ColumnSegment &segment) { |
| 250 | auto &buffer_manager = BufferManager::GetBufferManager(db&: segment.db); |
| 251 | handle = buffer_manager.Pin(handle&: segment.block); |
| 252 | entry_pos = 0; |
| 253 | position_in_entry = 0; |
| 254 | rle_count_offset = Load<uint64_t>(ptr: handle.Ptr() + segment.GetBlockOffset()); |
| 255 | D_ASSERT(rle_count_offset <= Storage::BLOCK_SIZE); |
| 256 | } |
| 257 | |
| 258 | void Skip(ColumnSegment &segment, idx_t skip_count) { |
| 259 | auto data = handle.Ptr() + segment.GetBlockOffset(); |
| 260 | auto index_pointer = (rle_count_t *)(data + rle_count_offset); |
| 261 | |
| 262 | for (idx_t i = 0; i < skip_count; i++) { |
| 263 | // assign the current value |
| 264 | position_in_entry++; |
| 265 | if (position_in_entry >= index_pointer[entry_pos]) { |
| 266 | // handled all entries in this RLE value |
| 267 | // move to the next entry |
| 268 | entry_pos++; |
| 269 | position_in_entry = 0; |
| 270 | } |
| 271 | } |
| 272 | } |
| 273 | |
| 274 | BufferHandle handle; |
| 275 | uint32_t rle_offset; |
| 276 | idx_t entry_pos; |
| 277 | idx_t position_in_entry; |
| 278 | uint32_t rle_count_offset; |
| 279 | }; |
| 280 | |
| 281 | template <class T> |
| 282 | unique_ptr<SegmentScanState> RLEInitScan(ColumnSegment &segment) { |
| 283 | auto result = make_uniq<RLEScanState<T>>(segment); |
| 284 | return std::move(result); |
| 285 | } |
| 286 | |
| 287 | //===--------------------------------------------------------------------===// |
| 288 | // Scan base data |
| 289 | //===--------------------------------------------------------------------===// |
| 290 | template <class T> |
| 291 | void RLESkip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) { |
| 292 | auto &scan_state = state.scan_state->Cast<RLEScanState<T>>(); |
| 293 | scan_state.Skip(segment, skip_count); |
| 294 | } |
| 295 | |
| 296 | template <class T> |
| 297 | void RLEScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, |
| 298 | idx_t result_offset) { |
| 299 | auto &scan_state = state.scan_state->Cast<RLEScanState<T>>(); |
| 300 | |
| 301 | auto data = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
| 302 | auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE); |
| 303 | auto index_pointer = (rle_count_t *)(data + scan_state.rle_count_offset); |
| 304 | |
| 305 | auto result_data = FlatVector::GetData<T>(result); |
| 306 | result.SetVectorType(VectorType::FLAT_VECTOR); |
| 307 | for (idx_t i = 0; i < scan_count; i++) { |
| 308 | // assign the current value |
| 309 | result_data[result_offset + i] = data_pointer[scan_state.entry_pos]; |
| 310 | scan_state.position_in_entry++; |
| 311 | if (scan_state.position_in_entry >= index_pointer[scan_state.entry_pos]) { |
| 312 | // handled all entries in this RLE value |
| 313 | // move to the next entry |
| 314 | scan_state.entry_pos++; |
| 315 | scan_state.position_in_entry = 0; |
| 316 | } |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | template <class T> |
| 321 | void RLEScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { |
| 322 | // FIXME: emit constant vector if repetition of single value is >= scan_count |
| 323 | RLEScanPartial<T>(segment, state, scan_count, result, 0); |
| 324 | } |
| 325 | |
| 326 | //===--------------------------------------------------------------------===// |
| 327 | // Fetch |
| 328 | //===--------------------------------------------------------------------===// |
| 329 | template <class T> |
| 330 | void RLEFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, idx_t result_idx) { |
| 331 | RLEScanState<T> scan_state(segment); |
| 332 | scan_state.Skip(segment, row_id); |
| 333 | |
| 334 | auto data = scan_state.handle.Ptr() + segment.GetBlockOffset(); |
| 335 | auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE); |
| 336 | auto result_data = FlatVector::GetData<T>(result); |
| 337 | result_data[result_idx] = data_pointer[scan_state.entry_pos]; |
| 338 | } |
| 339 | |
| 340 | //===--------------------------------------------------------------------===// |
| 341 | // Get Function |
| 342 | //===--------------------------------------------------------------------===// |
| 343 | template <class T, bool WRITE_STATISTICS = true> |
| 344 | CompressionFunction GetRLEFunction(PhysicalType data_type) { |
| 345 | return CompressionFunction(CompressionType::COMPRESSION_RLE, data_type, RLEInitAnalyze<T>, RLEAnalyze<T>, |
| 346 | RLEFinalAnalyze<T>, RLEInitCompression<T, WRITE_STATISTICS>, |
| 347 | RLECompress<T, WRITE_STATISTICS>, RLEFinalizeCompress<T, WRITE_STATISTICS>, |
| 348 | RLEInitScan<T>, RLEScan<T>, RLEScanPartial<T>, RLEFetchRow<T>, RLESkip<T>); |
| 349 | } |
| 350 | |
| 351 | CompressionFunction RLEFun::GetFunction(PhysicalType type) { |
| 352 | switch (type) { |
| 353 | case PhysicalType::BOOL: |
| 354 | case PhysicalType::INT8: |
| 355 | return GetRLEFunction<int8_t>(data_type: type); |
| 356 | case PhysicalType::INT16: |
| 357 | return GetRLEFunction<int16_t>(data_type: type); |
| 358 | case PhysicalType::INT32: |
| 359 | return GetRLEFunction<int32_t>(data_type: type); |
| 360 | case PhysicalType::INT64: |
| 361 | return GetRLEFunction<int64_t>(data_type: type); |
| 362 | case PhysicalType::INT128: |
| 363 | return GetRLEFunction<hugeint_t>(data_type: type); |
| 364 | case PhysicalType::UINT8: |
| 365 | return GetRLEFunction<uint8_t>(data_type: type); |
| 366 | case PhysicalType::UINT16: |
| 367 | return GetRLEFunction<uint16_t>(data_type: type); |
| 368 | case PhysicalType::UINT32: |
| 369 | return GetRLEFunction<uint32_t>(data_type: type); |
| 370 | case PhysicalType::UINT64: |
| 371 | return GetRLEFunction<uint64_t>(data_type: type); |
| 372 | case PhysicalType::FLOAT: |
| 373 | return GetRLEFunction<float>(data_type: type); |
| 374 | case PhysicalType::DOUBLE: |
| 375 | return GetRLEFunction<double>(data_type: type); |
| 376 | case PhysicalType::LIST: |
| 377 | return GetRLEFunction<uint64_t, false>(data_type: type); |
| 378 | default: |
| 379 | throw InternalException("Unsupported type for RLE" ); |
| 380 | } |
| 381 | } |
| 382 | |
| 383 | bool RLEFun::TypeIsSupported(PhysicalType type) { |
| 384 | switch (type) { |
| 385 | case PhysicalType::BOOL: |
| 386 | case PhysicalType::INT8: |
| 387 | case PhysicalType::INT16: |
| 388 | case PhysicalType::INT32: |
| 389 | case PhysicalType::INT64: |
| 390 | case PhysicalType::INT128: |
| 391 | case PhysicalType::UINT8: |
| 392 | case PhysicalType::UINT16: |
| 393 | case PhysicalType::UINT32: |
| 394 | case PhysicalType::UINT64: |
| 395 | case PhysicalType::FLOAT: |
| 396 | case PhysicalType::DOUBLE: |
| 397 | case PhysicalType::LIST: |
| 398 | return true; |
| 399 | default: |
| 400 | return false; |
| 401 | } |
| 402 | } |
| 403 | |
| 404 | } // namespace duckdb |
| 405 | |