1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // Imported from Apache Impala (incubating) on 2016-01-29 and modified for use |
19 | // in parquet-cpp, Arrow |
20 | |
21 | #ifndef ARROW_UTIL_RLE_ENCODING_H |
22 | #define ARROW_UTIL_RLE_ENCODING_H |
23 | |
24 | #include <math.h> |
25 | #include <algorithm> |
26 | |
27 | #include "arrow/util/bit-stream-utils.h" |
28 | #include "arrow/util/bit-util.h" |
29 | #include "arrow/util/macros.h" |
30 | |
31 | namespace arrow { |
32 | namespace util { |
33 | |
34 | /// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
35 | /// are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
36 | /// (literal encoding). |
37 | /// For both types of runs, there is a byte-aligned indicator which encodes the length |
38 | /// of the run and the type of the run. |
39 | /// This encoding has the benefit that when there aren't any long enough runs, values |
40 | /// are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
41 | /// the run length are byte aligned. This allows for very efficient decoding |
42 | /// implementations. |
43 | /// The encoding is: |
44 | /// encoded-block := run* |
45 | /// run := literal-run | repeated-run |
46 | /// literal-run := literal-indicator < literal bytes > |
47 | /// repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
48 | /// literal-indicator := varint_encode( number_of_groups << 1 | 1) |
49 | /// repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
50 | // |
51 | /// Each run is preceded by a varint. The varint's least significant bit is |
52 | /// used to indicate whether the run is a literal run or a repeated run. The rest |
53 | /// of the varint is used to determine the length of the run (eg how many times the |
54 | /// value repeats). |
55 | // |
56 | /// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
57 | /// in groups of 8), so that no matter the bit-width of the value, the sequence will end |
58 | /// on a byte boundary without padding. |
59 | /// Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
60 | /// the actual number of encoded ints. (This means that the total number of encoded values |
61 | /// can not be determined from the encoded data, since the number of values in the last |
62 | /// group may not be a multiple of 8). For the last group of literal runs, we pad |
63 | /// the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
64 | /// without the need for additional checks. |
65 | // |
66 | /// There is a break-even point when it is more storage efficient to do run length |
67 | /// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
68 | /// for both the repeated encoding or the literal encoding. This value can always |
69 | /// be computed based on the bit-width. |
70 | /// TODO: think about how to use this for strings. The bit packing isn't quite the same. |
71 | // |
72 | /// Examples with bit-width 1 (eg encoding booleans): |
73 | /// ---------------------------------------- |
74 | /// 100 1s followed by 100 0s: |
75 | /// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
76 | /// - (total 4 bytes) |
77 | // |
78 | /// alternating 1s and 0s (200 total): |
79 | /// 200 ints = 25 groups of 8 |
80 | /// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
81 | /// (total 26 bytes, 1 byte overhead) |
82 | // |
83 | |
84 | /// Decoder class for RLE encoded data. |
85 | class RleDecoder { |
86 | public: |
87 | /// Create a decoder object. buffer/buffer_len is the decoded data. |
88 | /// bit_width is the width of each value (before encoding). |
89 | RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) |
90 | : bit_reader_(buffer, buffer_len), |
91 | bit_width_(bit_width), |
92 | current_value_(0), |
93 | repeat_count_(0), |
94 | literal_count_(0) { |
95 | DCHECK_GE(bit_width_, 0); |
96 | DCHECK_LE(bit_width_, 64); |
97 | } |
98 | |
99 | RleDecoder() : bit_width_(-1) {} |
100 | |
101 | void Reset(const uint8_t* buffer, int buffer_len, int bit_width) { |
102 | DCHECK_GE(bit_width, 0); |
103 | DCHECK_LE(bit_width, 64); |
104 | bit_reader_.Reset(buffer, buffer_len); |
105 | bit_width_ = bit_width; |
106 | current_value_ = 0; |
107 | repeat_count_ = 0; |
108 | literal_count_ = 0; |
109 | } |
110 | |
111 | /// Gets the next value. Returns false if there are no more. |
112 | template <typename T> |
113 | bool Get(T* val); |
114 | |
115 | /// Gets a batch of values. Returns the number of decoded elements. |
116 | template <typename T> |
117 | int GetBatch(T* values, int batch_size); |
118 | |
119 | /// Like GetBatch but the values are then decoded using the provided dictionary |
120 | template <typename T> |
121 | int GetBatchWithDict(const T* dictionary, T* values, int batch_size); |
122 | |
123 | /// Like GetBatchWithDict but add spacing for null entries |
124 | template <typename T> |
125 | int GetBatchWithDictSpaced(const T* dictionary, T* values, int batch_size, |
126 | int null_count, const uint8_t* valid_bits, |
127 | int64_t valid_bits_offset); |
128 | |
129 | protected: |
130 | BitUtil::BitReader bit_reader_; |
131 | /// Number of bits needed to encode the value. Must be between 0 and 64. |
132 | int bit_width_; |
133 | uint64_t current_value_; |
134 | uint32_t repeat_count_; |
135 | uint32_t literal_count_; |
136 | |
137 | private: |
138 | /// Fills literal_count_ and repeat_count_ with next values. Returns false if there |
139 | /// are no more. |
140 | template <typename T> |
141 | bool NextCounts(); |
142 | }; |
143 | |
144 | /// Class to incrementally build the rle data. This class does not allocate any memory. |
145 | /// The encoding has two modes: encoding repeated runs and literal runs. |
146 | /// If the run is sufficiently short, it is more efficient to encode as a literal run. |
147 | /// This class does so by buffering 8 values at a time. If they are not all the same |
148 | /// they are added to the literal run. If they are the same, they are added to the |
149 | /// repeated run. When we switch modes, the previous run is flushed out. |
150 | class RleEncoder { |
151 | public: |
152 | /// buffer/buffer_len: preallocated output buffer. |
153 | /// bit_width: max number of bits for value. |
154 | /// TODO: consider adding a min_repeated_run_length so the caller can control |
155 | /// when values should be encoded as repeated runs. Currently this is derived |
156 | /// based on the bit_width, which can determine a storage optimal choice. |
157 | /// TODO: allow 0 bit_width (and have dict encoder use it) |
158 | RleEncoder(uint8_t* buffer, int buffer_len, int bit_width) |
159 | : bit_width_(bit_width), bit_writer_(buffer, buffer_len) { |
160 | DCHECK_GE(bit_width_, 0); |
161 | DCHECK_LE(bit_width_, 64); |
162 | max_run_byte_size_ = MinBufferSize(bit_width); |
163 | DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough." ; |
164 | Clear(); |
165 | } |
166 | |
167 | /// Returns the minimum buffer size needed to use the encoder for 'bit_width' |
168 | /// This is the maximum length of a single run for 'bit_width'. |
169 | /// It is not valid to pass a buffer less than this length. |
170 | static int MinBufferSize(int bit_width) { |
171 | /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values. |
172 | int max_literal_run_size = |
173 | 1 + |
174 | static_cast<int>(BitUtil::BytesForBits(MAX_VALUES_PER_LITERAL_RUN * bit_width)); |
175 | /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value. |
176 | int max_repeated_run_size = BitUtil::BitReader::MAX_VLQ_BYTE_LEN + |
177 | static_cast<int>(BitUtil::BytesForBits(bit_width)); |
178 | return std::max(max_literal_run_size, max_repeated_run_size); |
179 | } |
180 | |
181 | /// Returns the maximum byte size it could take to encode 'num_values'. |
182 | static int MaxBufferSize(int bit_width, int num_values) { |
183 | // For a bit_width > 1, the worst case is the repetition of "literal run of length 8 |
184 | // and then a repeated run of length 8". |
185 | // 8 values per smallest run, 8 bits per byte |
186 | int bytes_per_run = bit_width; |
187 | int num_runs = static_cast<int>(BitUtil::CeilDiv(num_values, 8)); |
188 | int literal_max_size = num_runs + num_runs * bytes_per_run; |
189 | |
190 | // In the very worst case scenario, the data is a concatenation of repeated |
191 | // runs of 8 values. Repeated run has a 1 byte varint followed by the |
192 | // bit-packed repeated value |
193 | int min_repeated_run_size = 1 + static_cast<int>(BitUtil::BytesForBits(bit_width)); |
194 | int repeated_max_size = |
195 | static_cast<int>(BitUtil::CeilDiv(num_values, 8)) * min_repeated_run_size; |
196 | |
197 | return std::max(literal_max_size, repeated_max_size); |
198 | } |
199 | |
200 | /// Encode value. Returns true if the value fits in buffer, false otherwise. |
201 | /// This value must be representable with bit_width_ bits. |
202 | bool Put(uint64_t value); |
203 | |
204 | /// Flushes any pending values to the underlying buffer. |
205 | /// Returns the total number of bytes written |
206 | int Flush(); |
207 | |
208 | /// Resets all the state in the encoder. |
209 | void Clear(); |
210 | |
211 | /// Returns pointer to underlying buffer |
212 | uint8_t* buffer() { return bit_writer_.buffer(); } |
213 | int32_t len() { return bit_writer_.bytes_written(); } |
214 | |
215 | private: |
216 | /// Flushes any buffered values. If this is part of a repeated run, this is largely |
217 | /// a no-op. |
218 | /// If it is part of a literal run, this will call FlushLiteralRun, which writes |
219 | /// out the buffered literal values. |
220 | /// If 'done' is true, the current run would be written even if it would normally |
221 | /// have been buffered more. This should only be called at the end, when the |
222 | /// encoder has received all values even if it would normally continue to be |
223 | /// buffered. |
224 | void FlushBufferedValues(bool done); |
225 | |
226 | /// Flushes literal values to the underlying buffer. If update_indicator_byte, |
227 | /// then the current literal run is complete and the indicator byte is updated. |
228 | void FlushLiteralRun(bool update_indicator_byte); |
229 | |
230 | /// Flushes a repeated run to the underlying buffer. |
231 | void FlushRepeatedRun(); |
232 | |
233 | /// Checks and sets buffer_full_. This must be called after flushing a run to |
234 | /// make sure there are enough bytes remaining to encode the next run. |
235 | void CheckBufferFull(); |
236 | |
237 | /// The maximum number of values in a single literal run |
238 | /// (number of groups encodable by a 1-byte indicator * 8) |
239 | static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8; |
240 | |
241 | /// Number of bits needed to encode the value. Must be between 0 and 64. |
242 | const int bit_width_; |
243 | |
244 | /// Underlying buffer. |
245 | BitUtil::BitWriter bit_writer_; |
246 | |
247 | /// If true, the buffer is full and subsequent Put()'s will fail. |
248 | bool buffer_full_; |
249 | |
250 | /// The maximum byte size a single run can take. |
251 | int max_run_byte_size_; |
252 | |
253 | /// We need to buffer at most 8 values for literals. This happens when the |
254 | /// bit_width is 1 (so 8 values fit in one byte). |
255 | /// TODO: generalize this to other bit widths |
256 | int64_t buffered_values_[8]; |
257 | |
258 | /// Number of values in buffered_values_ |
259 | int num_buffered_values_; |
260 | |
261 | /// The current (also last) value that was written and the count of how |
262 | /// many times in a row that value has been seen. This is maintained even |
263 | /// if we are in a literal run. If the repeat_count_ get high enough, we switch |
264 | /// to encoding repeated runs. |
265 | uint64_t current_value_; |
266 | int repeat_count_; |
267 | |
268 | /// Number of literals in the current run. This does not include the literals |
269 | /// that might be in buffered_values_. Only after we've got a group big enough |
270 | /// can we decide if they should part of the literal_count_ or repeat_count_ |
271 | int literal_count_; |
272 | |
273 | /// Pointer to a byte in the underlying buffer that stores the indicator byte. |
274 | /// This is reserved as soon as we need a literal run but the value is written |
275 | /// when the literal run is complete. |
276 | uint8_t* literal_indicator_byte_; |
277 | }; |
278 | |
279 | template <typename T> |
280 | inline bool RleDecoder::Get(T* val) { |
281 | return GetBatch(val, 1) == 1; |
282 | } |
283 | |
284 | template <typename T> |
285 | inline int RleDecoder::GetBatch(T* values, int batch_size) { |
286 | DCHECK_GE(bit_width_, 0); |
287 | int values_read = 0; |
288 | |
289 | while (values_read < batch_size) { |
290 | if (repeat_count_ > 0) { |
291 | int repeat_batch = |
292 | std::min(batch_size - values_read, static_cast<int>(repeat_count_)); |
293 | std::fill(values + values_read, values + values_read + repeat_batch, |
294 | static_cast<T>(current_value_)); |
295 | repeat_count_ -= repeat_batch; |
296 | values_read += repeat_batch; |
297 | } else if (literal_count_ > 0) { |
298 | int literal_batch = |
299 | std::min(batch_size - values_read, static_cast<int>(literal_count_)); |
300 | int actual_read = |
301 | bit_reader_.GetBatch(bit_width_, values + values_read, literal_batch); |
302 | DCHECK_EQ(actual_read, literal_batch); |
303 | literal_count_ -= literal_batch; |
304 | values_read += literal_batch; |
305 | } else { |
306 | if (!NextCounts<T>()) return values_read; |
307 | } |
308 | } |
309 | |
310 | return values_read; |
311 | } |
312 | |
313 | template <typename T> |
314 | inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batch_size) { |
315 | DCHECK_GE(bit_width_, 0); |
316 | int values_read = 0; |
317 | |
318 | while (values_read < batch_size) { |
319 | if (repeat_count_ > 0) { |
320 | int repeat_batch = |
321 | std::min(batch_size - values_read, static_cast<int>(repeat_count_)); |
322 | std::fill(values + values_read, values + values_read + repeat_batch, |
323 | dictionary[current_value_]); |
324 | repeat_count_ -= repeat_batch; |
325 | values_read += repeat_batch; |
326 | } else if (literal_count_ > 0) { |
327 | int literal_batch = |
328 | std::min(batch_size - values_read, static_cast<int>(literal_count_)); |
329 | |
330 | const int buffer_size = 1024; |
331 | int indices[buffer_size]; |
332 | literal_batch = std::min(literal_batch, buffer_size); |
333 | int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch); |
334 | DCHECK_EQ(actual_read, literal_batch); |
335 | for (int i = 0; i < literal_batch; ++i) { |
336 | values[values_read + i] = dictionary[indices[i]]; |
337 | } |
338 | literal_count_ -= literal_batch; |
339 | values_read += literal_batch; |
340 | } else { |
341 | if (!NextCounts<T>()) return values_read; |
342 | } |
343 | } |
344 | |
345 | return values_read; |
346 | } |
347 | |
348 | template <typename T> |
349 | inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values, |
350 | int batch_size, int null_count, |
351 | const uint8_t* valid_bits, |
352 | int64_t valid_bits_offset) { |
353 | DCHECK_GE(bit_width_, 0); |
354 | int values_read = 0; |
355 | int remaining_nulls = null_count; |
356 | |
357 | internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, batch_size); |
358 | |
359 | while (values_read < batch_size) { |
360 | bool is_valid = bit_reader.IsSet(); |
361 | bit_reader.Next(); |
362 | |
363 | if (is_valid) { |
364 | if ((repeat_count_ == 0) && (literal_count_ == 0)) { |
365 | if (!NextCounts<T>()) return values_read; |
366 | } |
367 | if (repeat_count_ > 0) { |
368 | T value = dictionary[current_value_]; |
369 | // The current index is already valid, we don't need to check that again |
370 | int repeat_batch = 1; |
371 | repeat_count_--; |
372 | |
373 | while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) { |
374 | if (bit_reader.IsSet()) { |
375 | repeat_count_--; |
376 | } else { |
377 | remaining_nulls--; |
378 | } |
379 | repeat_batch++; |
380 | |
381 | bit_reader.Next(); |
382 | } |
383 | std::fill(values + values_read, values + values_read + repeat_batch, value); |
384 | values_read += repeat_batch; |
385 | } else if (literal_count_ > 0) { |
386 | int literal_batch = std::min(batch_size - values_read - remaining_nulls, |
387 | static_cast<int>(literal_count_)); |
388 | |
389 | // Decode the literals |
390 | constexpr int kBufferSize = 1024; |
391 | int indices[kBufferSize]; |
392 | literal_batch = std::min(literal_batch, kBufferSize); |
393 | int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch); |
394 | DCHECK_EQ(actual_read, literal_batch); |
395 | |
396 | int skipped = 0; |
397 | int literals_read = 1; |
398 | values[values_read] = dictionary[indices[0]]; |
399 | |
400 | // Read the first bitset to the end |
401 | while (literals_read < literal_batch) { |
402 | if (bit_reader.IsSet()) { |
403 | values[values_read + literals_read + skipped] = |
404 | dictionary[indices[literals_read]]; |
405 | literals_read++; |
406 | } else { |
407 | skipped++; |
408 | } |
409 | |
410 | bit_reader.Next(); |
411 | } |
412 | literal_count_ -= literal_batch; |
413 | values_read += literal_batch + skipped; |
414 | remaining_nulls -= skipped; |
415 | } |
416 | } else { |
417 | values_read++; |
418 | remaining_nulls--; |
419 | } |
420 | } |
421 | |
422 | return values_read; |
423 | } |
424 | |
425 | template <typename T> |
426 | bool RleDecoder::NextCounts() { |
427 | // Read the next run's indicator int, it could be a literal or repeated run. |
428 | // The int is encoded as a vlq-encoded value. |
429 | int32_t indicator_value = 0; |
430 | bool result = bit_reader_.GetVlqInt(&indicator_value); |
431 | if (!result) return false; |
432 | |
433 | // lsb indicates if it is a literal run or repeated run |
434 | bool is_literal = indicator_value & 1; |
435 | if (is_literal) { |
436 | literal_count_ = (indicator_value >> 1) * 8; |
437 | } else { |
438 | repeat_count_ = indicator_value >> 1; |
439 | // XXX (ARROW-4018) this is not big-endian compatible |
440 | bool result = |
441 | bit_reader_.GetAligned<T>(static_cast<int>(BitUtil::CeilDiv(bit_width_, 8)), |
442 | reinterpret_cast<T*>(¤t_value_)); |
443 | DCHECK(result); |
444 | } |
445 | return true; |
446 | } |
447 | |
448 | /// This function buffers input values 8 at a time. After seeing all 8 values, |
449 | /// it decides whether they should be encoded as a literal or repeated run. |
450 | inline bool RleEncoder::Put(uint64_t value) { |
451 | DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_)); |
452 | if (ARROW_PREDICT_FALSE(buffer_full_)) return false; |
453 | |
454 | if (ARROW_PREDICT_TRUE(current_value_ == value)) { |
455 | ++repeat_count_; |
456 | if (repeat_count_ > 8) { |
457 | // This is just a continuation of the current run, no need to buffer the |
458 | // values. |
459 | // Note that this is the fast path for long repeated runs. |
460 | return true; |
461 | } |
462 | } else { |
463 | if (repeat_count_ >= 8) { |
464 | // We had a run that was long enough but it has ended. Flush the |
465 | // current repeated run. |
466 | DCHECK_EQ(literal_count_, 0); |
467 | FlushRepeatedRun(); |
468 | } |
469 | repeat_count_ = 1; |
470 | current_value_ = value; |
471 | } |
472 | |
473 | buffered_values_[num_buffered_values_] = value; |
474 | if (++num_buffered_values_ == 8) { |
475 | DCHECK_EQ(literal_count_ % 8, 0); |
476 | FlushBufferedValues(false); |
477 | } |
478 | return true; |
479 | } |
480 | |
481 | inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) { |
482 | if (literal_indicator_byte_ == NULL) { |
483 | // The literal indicator byte has not been reserved yet, get one now. |
484 | literal_indicator_byte_ = bit_writer_.GetNextBytePtr(); |
485 | DCHECK(literal_indicator_byte_ != NULL); |
486 | } |
487 | |
488 | // Write all the buffered values as bit packed literals |
489 | for (int i = 0; i < num_buffered_values_; ++i) { |
490 | bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_); |
491 | DCHECK(success) << "There is a bug in using CheckBufferFull()" ; |
492 | } |
493 | num_buffered_values_ = 0; |
494 | |
495 | if (update_indicator_byte) { |
496 | // At this point we need to write the indicator byte for the literal run. |
497 | // We only reserve one byte, to allow for streaming writes of literal values. |
498 | // The logic makes sure we flush literal runs often enough to not overrun |
499 | // the 1 byte. |
500 | DCHECK_EQ(literal_count_ % 8, 0); |
501 | int num_groups = literal_count_ / 8; |
502 | int32_t indicator_value = (num_groups << 1) | 1; |
503 | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); |
504 | *literal_indicator_byte_ = static_cast<uint8_t>(indicator_value); |
505 | literal_indicator_byte_ = NULL; |
506 | literal_count_ = 0; |
507 | CheckBufferFull(); |
508 | } |
509 | } |
510 | |
511 | inline void RleEncoder::FlushRepeatedRun() { |
512 | DCHECK_GT(repeat_count_, 0); |
513 | bool result = true; |
514 | // The lsb of 0 indicates this is a repeated run |
515 | int32_t indicator_value = repeat_count_ << 1 | 0; |
516 | result &= bit_writer_.PutVlqInt(indicator_value); |
517 | result &= bit_writer_.PutAligned(current_value_, |
518 | static_cast<int>(BitUtil::CeilDiv(bit_width_, 8))); |
519 | DCHECK(result); |
520 | num_buffered_values_ = 0; |
521 | repeat_count_ = 0; |
522 | CheckBufferFull(); |
523 | } |
524 | |
525 | /// Flush the values that have been buffered. At this point we decide whether |
526 | /// we need to switch between the run types or continue the current one. |
527 | inline void RleEncoder::FlushBufferedValues(bool done) { |
528 | if (repeat_count_ >= 8) { |
529 | // Clear the buffered values. They are part of the repeated run now and we |
530 | // don't want to flush them out as literals. |
531 | num_buffered_values_ = 0; |
532 | if (literal_count_ != 0) { |
533 | // There was a current literal run. All the values in it have been flushed |
534 | // but we still need to update the indicator byte. |
535 | DCHECK_EQ(literal_count_ % 8, 0); |
536 | DCHECK_EQ(repeat_count_, 8); |
537 | FlushLiteralRun(true); |
538 | } |
539 | DCHECK_EQ(literal_count_, 0); |
540 | return; |
541 | } |
542 | |
543 | literal_count_ += num_buffered_values_; |
544 | DCHECK_EQ(literal_count_ % 8, 0); |
545 | int num_groups = literal_count_ / 8; |
546 | if (num_groups + 1 >= (1 << 6)) { |
547 | // We need to start a new literal run because the indicator byte we've reserved |
548 | // cannot store more values. |
549 | DCHECK(literal_indicator_byte_ != NULL); |
550 | FlushLiteralRun(true); |
551 | } else { |
552 | FlushLiteralRun(done); |
553 | } |
554 | repeat_count_ = 0; |
555 | } |
556 | |
557 | inline int RleEncoder::Flush() { |
558 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { |
559 | bool all_repeat = literal_count_ == 0 && (repeat_count_ == num_buffered_values_ || |
560 | num_buffered_values_ == 0); |
561 | // There is something pending, figure out if it's a repeated or literal run |
562 | if (repeat_count_ > 0 && all_repeat) { |
563 | FlushRepeatedRun(); |
564 | } else { |
565 | DCHECK_EQ(literal_count_ % 8, 0); |
566 | // Buffer the last group of literals to 8 by padding with 0s. |
567 | for (; num_buffered_values_ != 0 && num_buffered_values_ < 8; |
568 | ++num_buffered_values_) { |
569 | buffered_values_[num_buffered_values_] = 0; |
570 | } |
571 | literal_count_ += num_buffered_values_; |
572 | FlushLiteralRun(true); |
573 | repeat_count_ = 0; |
574 | } |
575 | } |
576 | bit_writer_.Flush(); |
577 | DCHECK_EQ(num_buffered_values_, 0); |
578 | DCHECK_EQ(literal_count_, 0); |
579 | DCHECK_EQ(repeat_count_, 0); |
580 | |
581 | return bit_writer_.bytes_written(); |
582 | } |
583 | |
584 | inline void RleEncoder::CheckBufferFull() { |
585 | int bytes_written = bit_writer_.bytes_written(); |
586 | if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) { |
587 | buffer_full_ = true; |
588 | } |
589 | } |
590 | |
591 | inline void RleEncoder::Clear() { |
592 | buffer_full_ = false; |
593 | current_value_ = 0; |
594 | repeat_count_ = 0; |
595 | num_buffered_values_ = 0; |
596 | literal_count_ = 0; |
597 | literal_indicator_byte_ = NULL; |
598 | bit_writer_.Clear(); |
599 | } |
600 | |
601 | } // namespace util |
602 | } // namespace arrow |
603 | |
604 | #endif // ARROW_UTIL_RLE_ENCODING_H |
605 | |