1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Imported from Apache Impala (incubating) on 2016-01-29 and modified for use
19// in parquet-cpp, Arrow
20
21#ifndef ARROW_UTIL_RLE_ENCODING_H
22#define ARROW_UTIL_RLE_ENCODING_H
23
24#include <math.h>
25#include <algorithm>
26
27#include "arrow/util/bit-stream-utils.h"
28#include "arrow/util/bit-util.h"
29#include "arrow/util/macros.h"
30
31namespace arrow {
32namespace util {
33
34/// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs
35/// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
36/// (literal encoding).
37/// For both types of runs, there is a byte-aligned indicator which encodes the length
38/// of the run and the type of the run.
39/// This encoding has the benefit that when there aren't any long enough runs, values
40/// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
41/// the run length are byte aligned. This allows for very efficient decoding
42/// implementations.
43/// The encoding is:
44/// encoded-block := run*
45/// run := literal-run | repeated-run
46/// literal-run := literal-indicator < literal bytes >
47/// repeated-run := repeated-indicator < repeated value. padded to byte boundary >
48/// literal-indicator := varint_encode( number_of_groups << 1 | 1)
49/// repeated-indicator := varint_encode( number_of_repetitions << 1 )
50//
51/// Each run is preceded by a varint. The varint's least significant bit is
52/// used to indicate whether the run is a literal run or a repeated run. The rest
53/// of the varint is used to determine the length of the run (eg how many times the
54/// value repeats).
55//
56/// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
57/// in groups of 8), so that no matter the bit-width of the value, the sequence will end
58/// on a byte boundary without padding.
59/// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
60/// the actual number of encoded ints. (This means that the total number of encoded values
61/// can not be determined from the encoded data, since the number of values in the last
62/// group may not be a multiple of 8). For the last group of literal runs, we pad
63/// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
64/// without the need for additional checks.
65//
66/// There is a break-even point when it is more storage efficient to do run length
67/// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes
68/// for both the repeated encoding or the literal encoding. This value can always
69/// be computed based on the bit-width.
70/// TODO: think about how to use this for strings. The bit packing isn't quite the same.
71//
72/// Examples with bit-width 1 (eg encoding booleans):
73/// ----------------------------------------
74/// 100 1s followed by 100 0s:
75/// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
76/// - (total 4 bytes)
77//
78/// alternating 1s and 0s (200 total):
79/// 200 ints = 25 groups of 8
80/// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
81/// (total 26 bytes, 1 byte overhead)
82//
83
84/// Decoder class for RLE encoded data.
85class RleDecoder {
86 public:
87 /// Create a decoder object. buffer/buffer_len is the decoded data.
88 /// bit_width is the width of each value (before encoding).
89 RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
90 : bit_reader_(buffer, buffer_len),
91 bit_width_(bit_width),
92 current_value_(0),
93 repeat_count_(0),
94 literal_count_(0) {
95 DCHECK_GE(bit_width_, 0);
96 DCHECK_LE(bit_width_, 64);
97 }
98
99 RleDecoder() : bit_width_(-1) {}
100
101 void Reset(const uint8_t* buffer, int buffer_len, int bit_width) {
102 DCHECK_GE(bit_width, 0);
103 DCHECK_LE(bit_width, 64);
104 bit_reader_.Reset(buffer, buffer_len);
105 bit_width_ = bit_width;
106 current_value_ = 0;
107 repeat_count_ = 0;
108 literal_count_ = 0;
109 }
110
111 /// Gets the next value. Returns false if there are no more.
112 template <typename T>
113 bool Get(T* val);
114
115 /// Gets a batch of values. Returns the number of decoded elements.
116 template <typename T>
117 int GetBatch(T* values, int batch_size);
118
119 /// Like GetBatch but the values are then decoded using the provided dictionary
120 template <typename T>
121 int GetBatchWithDict(const T* dictionary, T* values, int batch_size);
122
123 /// Like GetBatchWithDict but add spacing for null entries
124 template <typename T>
125 int GetBatchWithDictSpaced(const T* dictionary, T* values, int batch_size,
126 int null_count, const uint8_t* valid_bits,
127 int64_t valid_bits_offset);
128
129 protected:
130 BitUtil::BitReader bit_reader_;
131 /// Number of bits needed to encode the value. Must be between 0 and 64.
132 int bit_width_;
133 uint64_t current_value_;
134 uint32_t repeat_count_;
135 uint32_t literal_count_;
136
137 private:
138 /// Fills literal_count_ and repeat_count_ with next values. Returns false if there
139 /// are no more.
140 template <typename T>
141 bool NextCounts();
142};
143
144/// Class to incrementally build the rle data. This class does not allocate any memory.
145/// The encoding has two modes: encoding repeated runs and literal runs.
146/// If the run is sufficiently short, it is more efficient to encode as a literal run.
147/// This class does so by buffering 8 values at a time. If they are not all the same
148/// they are added to the literal run. If they are the same, they are added to the
149/// repeated run. When we switch modes, the previous run is flushed out.
150class RleEncoder {
151 public:
152 /// buffer/buffer_len: preallocated output buffer.
153 /// bit_width: max number of bits for value.
154 /// TODO: consider adding a min_repeated_run_length so the caller can control
155 /// when values should be encoded as repeated runs. Currently this is derived
156 /// based on the bit_width, which can determine a storage optimal choice.
157 /// TODO: allow 0 bit_width (and have dict encoder use it)
158 RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
159 : bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
160 DCHECK_GE(bit_width_, 0);
161 DCHECK_LE(bit_width_, 64);
162 max_run_byte_size_ = MinBufferSize(bit_width);
163 DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough.";
164 Clear();
165 }
166
167 /// Returns the minimum buffer size needed to use the encoder for 'bit_width'
168 /// This is the maximum length of a single run for 'bit_width'.
169 /// It is not valid to pass a buffer less than this length.
170 static int MinBufferSize(int bit_width) {
171 /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
172 int max_literal_run_size =
173 1 +
174 static_cast<int>(BitUtil::BytesForBits(MAX_VALUES_PER_LITERAL_RUN * bit_width));
175 /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value.
176 int max_repeated_run_size = BitUtil::BitReader::MAX_VLQ_BYTE_LEN +
177 static_cast<int>(BitUtil::BytesForBits(bit_width));
178 return std::max(max_literal_run_size, max_repeated_run_size);
179 }
180
181 /// Returns the maximum byte size it could take to encode 'num_values'.
182 static int MaxBufferSize(int bit_width, int num_values) {
183 // For a bit_width > 1, the worst case is the repetition of "literal run of length 8
184 // and then a repeated run of length 8".
185 // 8 values per smallest run, 8 bits per byte
186 int bytes_per_run = bit_width;
187 int num_runs = static_cast<int>(BitUtil::CeilDiv(num_values, 8));
188 int literal_max_size = num_runs + num_runs * bytes_per_run;
189
190 // In the very worst case scenario, the data is a concatenation of repeated
191 // runs of 8 values. Repeated run has a 1 byte varint followed by the
192 // bit-packed repeated value
193 int min_repeated_run_size = 1 + static_cast<int>(BitUtil::BytesForBits(bit_width));
194 int repeated_max_size =
195 static_cast<int>(BitUtil::CeilDiv(num_values, 8)) * min_repeated_run_size;
196
197 return std::max(literal_max_size, repeated_max_size);
198 }
199
200 /// Encode value. Returns true if the value fits in buffer, false otherwise.
201 /// This value must be representable with bit_width_ bits.
202 bool Put(uint64_t value);
203
204 /// Flushes any pending values to the underlying buffer.
205 /// Returns the total number of bytes written
206 int Flush();
207
208 /// Resets all the state in the encoder.
209 void Clear();
210
211 /// Returns pointer to underlying buffer
212 uint8_t* buffer() { return bit_writer_.buffer(); }
213 int32_t len() { return bit_writer_.bytes_written(); }
214
215 private:
216 /// Flushes any buffered values. If this is part of a repeated run, this is largely
217 /// a no-op.
218 /// If it is part of a literal run, this will call FlushLiteralRun, which writes
219 /// out the buffered literal values.
220 /// If 'done' is true, the current run would be written even if it would normally
221 /// have been buffered more. This should only be called at the end, when the
222 /// encoder has received all values even if it would normally continue to be
223 /// buffered.
224 void FlushBufferedValues(bool done);
225
226 /// Flushes literal values to the underlying buffer. If update_indicator_byte,
227 /// then the current literal run is complete and the indicator byte is updated.
228 void FlushLiteralRun(bool update_indicator_byte);
229
230 /// Flushes a repeated run to the underlying buffer.
231 void FlushRepeatedRun();
232
233 /// Checks and sets buffer_full_. This must be called after flushing a run to
234 /// make sure there are enough bytes remaining to encode the next run.
235 void CheckBufferFull();
236
237 /// The maximum number of values in a single literal run
238 /// (number of groups encodable by a 1-byte indicator * 8)
239 static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8;
240
241 /// Number of bits needed to encode the value. Must be between 0 and 64.
242 const int bit_width_;
243
244 /// Underlying buffer.
245 BitUtil::BitWriter bit_writer_;
246
247 /// If true, the buffer is full and subsequent Put()'s will fail.
248 bool buffer_full_;
249
250 /// The maximum byte size a single run can take.
251 int max_run_byte_size_;
252
253 /// We need to buffer at most 8 values for literals. This happens when the
254 /// bit_width is 1 (so 8 values fit in one byte).
255 /// TODO: generalize this to other bit widths
256 int64_t buffered_values_[8];
257
258 /// Number of values in buffered_values_
259 int num_buffered_values_;
260
261 /// The current (also last) value that was written and the count of how
262 /// many times in a row that value has been seen. This is maintained even
263 /// if we are in a literal run. If the repeat_count_ get high enough, we switch
264 /// to encoding repeated runs.
265 uint64_t current_value_;
266 int repeat_count_;
267
268 /// Number of literals in the current run. This does not include the literals
269 /// that might be in buffered_values_. Only after we've got a group big enough
270 /// can we decide if they should part of the literal_count_ or repeat_count_
271 int literal_count_;
272
273 /// Pointer to a byte in the underlying buffer that stores the indicator byte.
274 /// This is reserved as soon as we need a literal run but the value is written
275 /// when the literal run is complete.
276 uint8_t* literal_indicator_byte_;
277};
278
279template <typename T>
280inline bool RleDecoder::Get(T* val) {
281 return GetBatch(val, 1) == 1;
282}
283
284template <typename T>
285inline int RleDecoder::GetBatch(T* values, int batch_size) {
286 DCHECK_GE(bit_width_, 0);
287 int values_read = 0;
288
289 while (values_read < batch_size) {
290 if (repeat_count_ > 0) {
291 int repeat_batch =
292 std::min(batch_size - values_read, static_cast<int>(repeat_count_));
293 std::fill(values + values_read, values + values_read + repeat_batch,
294 static_cast<T>(current_value_));
295 repeat_count_ -= repeat_batch;
296 values_read += repeat_batch;
297 } else if (literal_count_ > 0) {
298 int literal_batch =
299 std::min(batch_size - values_read, static_cast<int>(literal_count_));
300 int actual_read =
301 bit_reader_.GetBatch(bit_width_, values + values_read, literal_batch);
302 DCHECK_EQ(actual_read, literal_batch);
303 literal_count_ -= literal_batch;
304 values_read += literal_batch;
305 } else {
306 if (!NextCounts<T>()) return values_read;
307 }
308 }
309
310 return values_read;
311}
312
313template <typename T>
314inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batch_size) {
315 DCHECK_GE(bit_width_, 0);
316 int values_read = 0;
317
318 while (values_read < batch_size) {
319 if (repeat_count_ > 0) {
320 int repeat_batch =
321 std::min(batch_size - values_read, static_cast<int>(repeat_count_));
322 std::fill(values + values_read, values + values_read + repeat_batch,
323 dictionary[current_value_]);
324 repeat_count_ -= repeat_batch;
325 values_read += repeat_batch;
326 } else if (literal_count_ > 0) {
327 int literal_batch =
328 std::min(batch_size - values_read, static_cast<int>(literal_count_));
329
330 const int buffer_size = 1024;
331 int indices[buffer_size];
332 literal_batch = std::min(literal_batch, buffer_size);
333 int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
334 DCHECK_EQ(actual_read, literal_batch);
335 for (int i = 0; i < literal_batch; ++i) {
336 values[values_read + i] = dictionary[indices[i]];
337 }
338 literal_count_ -= literal_batch;
339 values_read += literal_batch;
340 } else {
341 if (!NextCounts<T>()) return values_read;
342 }
343 }
344
345 return values_read;
346}
347
348template <typename T>
349inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
350 int batch_size, int null_count,
351 const uint8_t* valid_bits,
352 int64_t valid_bits_offset) {
353 DCHECK_GE(bit_width_, 0);
354 int values_read = 0;
355 int remaining_nulls = null_count;
356
357 internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, batch_size);
358
359 while (values_read < batch_size) {
360 bool is_valid = bit_reader.IsSet();
361 bit_reader.Next();
362
363 if (is_valid) {
364 if ((repeat_count_ == 0) && (literal_count_ == 0)) {
365 if (!NextCounts<T>()) return values_read;
366 }
367 if (repeat_count_ > 0) {
368 T value = dictionary[current_value_];
369 // The current index is already valid, we don't need to check that again
370 int repeat_batch = 1;
371 repeat_count_--;
372
373 while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
374 if (bit_reader.IsSet()) {
375 repeat_count_--;
376 } else {
377 remaining_nulls--;
378 }
379 repeat_batch++;
380
381 bit_reader.Next();
382 }
383 std::fill(values + values_read, values + values_read + repeat_batch, value);
384 values_read += repeat_batch;
385 } else if (literal_count_ > 0) {
386 int literal_batch = std::min(batch_size - values_read - remaining_nulls,
387 static_cast<int>(literal_count_));
388
389 // Decode the literals
390 constexpr int kBufferSize = 1024;
391 int indices[kBufferSize];
392 literal_batch = std::min(literal_batch, kBufferSize);
393 int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
394 DCHECK_EQ(actual_read, literal_batch);
395
396 int skipped = 0;
397 int literals_read = 1;
398 values[values_read] = dictionary[indices[0]];
399
400 // Read the first bitset to the end
401 while (literals_read < literal_batch) {
402 if (bit_reader.IsSet()) {
403 values[values_read + literals_read + skipped] =
404 dictionary[indices[literals_read]];
405 literals_read++;
406 } else {
407 skipped++;
408 }
409
410 bit_reader.Next();
411 }
412 literal_count_ -= literal_batch;
413 values_read += literal_batch + skipped;
414 remaining_nulls -= skipped;
415 }
416 } else {
417 values_read++;
418 remaining_nulls--;
419 }
420 }
421
422 return values_read;
423}
424
425template <typename T>
426bool RleDecoder::NextCounts() {
427 // Read the next run's indicator int, it could be a literal or repeated run.
428 // The int is encoded as a vlq-encoded value.
429 int32_t indicator_value = 0;
430 bool result = bit_reader_.GetVlqInt(&indicator_value);
431 if (!result) return false;
432
433 // lsb indicates if it is a literal run or repeated run
434 bool is_literal = indicator_value & 1;
435 if (is_literal) {
436 literal_count_ = (indicator_value >> 1) * 8;
437 } else {
438 repeat_count_ = indicator_value >> 1;
439 // XXX (ARROW-4018) this is not big-endian compatible
440 bool result =
441 bit_reader_.GetAligned<T>(static_cast<int>(BitUtil::CeilDiv(bit_width_, 8)),
442 reinterpret_cast<T*>(&current_value_));
443 DCHECK(result);
444 }
445 return true;
446}
447
448/// This function buffers input values 8 at a time. After seeing all 8 values,
449/// it decides whether they should be encoded as a literal or repeated run.
450inline bool RleEncoder::Put(uint64_t value) {
451 DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_));
452 if (ARROW_PREDICT_FALSE(buffer_full_)) return false;
453
454 if (ARROW_PREDICT_TRUE(current_value_ == value)) {
455 ++repeat_count_;
456 if (repeat_count_ > 8) {
457 // This is just a continuation of the current run, no need to buffer the
458 // values.
459 // Note that this is the fast path for long repeated runs.
460 return true;
461 }
462 } else {
463 if (repeat_count_ >= 8) {
464 // We had a run that was long enough but it has ended. Flush the
465 // current repeated run.
466 DCHECK_EQ(literal_count_, 0);
467 FlushRepeatedRun();
468 }
469 repeat_count_ = 1;
470 current_value_ = value;
471 }
472
473 buffered_values_[num_buffered_values_] = value;
474 if (++num_buffered_values_ == 8) {
475 DCHECK_EQ(literal_count_ % 8, 0);
476 FlushBufferedValues(false);
477 }
478 return true;
479}
480
481inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
482 if (literal_indicator_byte_ == NULL) {
483 // The literal indicator byte has not been reserved yet, get one now.
484 literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
485 DCHECK(literal_indicator_byte_ != NULL);
486 }
487
488 // Write all the buffered values as bit packed literals
489 for (int i = 0; i < num_buffered_values_; ++i) {
490 bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_);
491 DCHECK(success) << "There is a bug in using CheckBufferFull()";
492 }
493 num_buffered_values_ = 0;
494
495 if (update_indicator_byte) {
496 // At this point we need to write the indicator byte for the literal run.
497 // We only reserve one byte, to allow for streaming writes of literal values.
498 // The logic makes sure we flush literal runs often enough to not overrun
499 // the 1 byte.
500 DCHECK_EQ(literal_count_ % 8, 0);
501 int num_groups = literal_count_ / 8;
502 int32_t indicator_value = (num_groups << 1) | 1;
503 DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
504 *literal_indicator_byte_ = static_cast<uint8_t>(indicator_value);
505 literal_indicator_byte_ = NULL;
506 literal_count_ = 0;
507 CheckBufferFull();
508 }
509}
510
511inline void RleEncoder::FlushRepeatedRun() {
512 DCHECK_GT(repeat_count_, 0);
513 bool result = true;
514 // The lsb of 0 indicates this is a repeated run
515 int32_t indicator_value = repeat_count_ << 1 | 0;
516 result &= bit_writer_.PutVlqInt(indicator_value);
517 result &= bit_writer_.PutAligned(current_value_,
518 static_cast<int>(BitUtil::CeilDiv(bit_width_, 8)));
519 DCHECK(result);
520 num_buffered_values_ = 0;
521 repeat_count_ = 0;
522 CheckBufferFull();
523}
524
525/// Flush the values that have been buffered. At this point we decide whether
526/// we need to switch between the run types or continue the current one.
527inline void RleEncoder::FlushBufferedValues(bool done) {
528 if (repeat_count_ >= 8) {
529 // Clear the buffered values. They are part of the repeated run now and we
530 // don't want to flush them out as literals.
531 num_buffered_values_ = 0;
532 if (literal_count_ != 0) {
533 // There was a current literal run. All the values in it have been flushed
534 // but we still need to update the indicator byte.
535 DCHECK_EQ(literal_count_ % 8, 0);
536 DCHECK_EQ(repeat_count_, 8);
537 FlushLiteralRun(true);
538 }
539 DCHECK_EQ(literal_count_, 0);
540 return;
541 }
542
543 literal_count_ += num_buffered_values_;
544 DCHECK_EQ(literal_count_ % 8, 0);
545 int num_groups = literal_count_ / 8;
546 if (num_groups + 1 >= (1 << 6)) {
547 // We need to start a new literal run because the indicator byte we've reserved
548 // cannot store more values.
549 DCHECK(literal_indicator_byte_ != NULL);
550 FlushLiteralRun(true);
551 } else {
552 FlushLiteralRun(done);
553 }
554 repeat_count_ = 0;
555}
556
557inline int RleEncoder::Flush() {
558 if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
559 bool all_repeat = literal_count_ == 0 && (repeat_count_ == num_buffered_values_ ||
560 num_buffered_values_ == 0);
561 // There is something pending, figure out if it's a repeated or literal run
562 if (repeat_count_ > 0 && all_repeat) {
563 FlushRepeatedRun();
564 } else {
565 DCHECK_EQ(literal_count_ % 8, 0);
566 // Buffer the last group of literals to 8 by padding with 0s.
567 for (; num_buffered_values_ != 0 && num_buffered_values_ < 8;
568 ++num_buffered_values_) {
569 buffered_values_[num_buffered_values_] = 0;
570 }
571 literal_count_ += num_buffered_values_;
572 FlushLiteralRun(true);
573 repeat_count_ = 0;
574 }
575 }
576 bit_writer_.Flush();
577 DCHECK_EQ(num_buffered_values_, 0);
578 DCHECK_EQ(literal_count_, 0);
579 DCHECK_EQ(repeat_count_, 0);
580
581 return bit_writer_.bytes_written();
582}
583
584inline void RleEncoder::CheckBufferFull() {
585 int bytes_written = bit_writer_.bytes_written();
586 if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
587 buffer_full_ = true;
588 }
589}
590
591inline void RleEncoder::Clear() {
592 buffer_full_ = false;
593 current_value_ = 0;
594 repeat_count_ = 0;
595 num_buffered_values_ = 0;
596 literal_count_ = 0;
597 literal_indicator_byte_ = NULL;
598 bit_writer_.Clear();
599}
600
601} // namespace util
602} // namespace arrow
603
604#endif // ARROW_UTIL_RLE_ENCODING_H
605