1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// From Apache Impala (incubating) as of 2016-01-29
19
20#ifndef ARROW_UTIL_BIT_STREAM_UTILS_H
21#define ARROW_UTIL_BIT_STREAM_UTILS_H
22
23#include <string.h>
24#include <algorithm>
25#include <cstdint>
26
27#include "arrow/util/bit_util.h"
28#include "arrow/util/bpacking.h"
29#include "arrow/util/logging.h"
30#include "arrow/util/macros.h"
31
32namespace arrow {
33namespace BitUtil {
34
35/// Utility class to write bit/byte streams. This class can write data to either be
36/// bit packed or byte aligned (and a single stream that has a mix of both).
37/// This class does not allocate memory.
38class BitWriter {
39 public:
40 /// buffer: buffer to write bits to. Buffer should be preallocated with
41 /// 'buffer_len' bytes.
42 BitWriter(uint8_t* buffer, int buffer_len) : buffer_(buffer), max_bytes_(buffer_len) {
43 Clear();
44 }
45
46 void Clear() {
47 buffered_values_ = 0;
48 byte_offset_ = 0;
49 bit_offset_ = 0;
50 }
51
52 /// The number of current bytes written, including the current byte (i.e. may include a
53 /// fraction of a byte). Includes buffered values.
54 int bytes_written() const {
55 return byte_offset_ + static_cast<int>(BitUtil::BytesForBits(bit_offset_));
56 }
57 uint8_t* buffer() const { return buffer_; }
58 int buffer_len() const { return max_bytes_; }
59
60 /// Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit
61 /// packed. Returns false if there was not enough space. num_bits must be <= 32.
62 bool PutValue(uint64_t v, int num_bits);
63
64 /// Writes v to the next aligned byte using num_bytes. If T is larger than
65 /// num_bytes, the extra high-order bytes will be ignored. Returns false if
66 /// there was not enough space.
67 template <typename T>
68 bool PutAligned(T v, int num_bytes);
69
70 /// Write a Vlq encoded int to the buffer. Returns false if there was not enough
71 /// room. The value is written byte aligned.
72 /// For more details on vlq:
73 /// en.wikipedia.org/wiki/Variable-length_quantity
74 bool PutVlqInt(uint32_t v);
75
76 // Writes an int zigzag encoded.
77 bool PutZigZagVlqInt(int32_t v);
78
79 /// Get a pointer to the next aligned byte and advance the underlying buffer
80 /// by num_bytes.
81 /// Returns NULL if there was not enough space.
82 uint8_t* GetNextBytePtr(int num_bytes = 1);
83
84 /// Flushes all buffered values to the buffer. Call this when done writing to
85 /// the buffer. If 'align' is true, buffered_values_ is reset and any future
86 /// writes will be written to the next byte boundary.
87 void Flush(bool align = false);
88
89 private:
90 uint8_t* buffer_;
91 int max_bytes_;
92
93 /// Bit-packed values are initially written to this variable before being memcpy'd to
94 /// buffer_. This is faster than writing values byte by byte directly to buffer_.
95 uint64_t buffered_values_;
96
97 int byte_offset_; // Offset in buffer_
98 int bit_offset_; // Offset in buffered_values_
99};
100
101/// Utility class to read bit/byte stream. This class can read bits or bytes
102/// that are either byte aligned or not. It also has utilities to read multiple
103/// bytes in one read (e.g. encoded int).
104class BitReader {
105 public:
106 /// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
107 BitReader(const uint8_t* buffer, int buffer_len)
108 : buffer_(buffer), max_bytes_(buffer_len), byte_offset_(0), bit_offset_(0) {
109 int num_bytes = std::min(8, max_bytes_ - byte_offset_);
110 memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
111 }
112
113 BitReader()
114 : buffer_(NULL),
115 max_bytes_(0),
116 buffered_values_(0),
117 byte_offset_(0),
118 bit_offset_(0) {}
119
120 void Reset(const uint8_t* buffer, int buffer_len) {
121 buffer_ = buffer;
122 max_bytes_ = buffer_len;
123 byte_offset_ = 0;
124 bit_offset_ = 0;
125 int num_bytes = std::min(8, max_bytes_ - byte_offset_);
126 memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
127 }
128
129 /// Gets the next value from the buffer. Returns true if 'v' could be read or false if
130 /// there are not enough bytes left. num_bits must be <= 32.
131 template <typename T>
132 bool GetValue(int num_bits, T* v);
133
134 /// Get a number of values from the buffer. Return the number of values actually read.
135 template <typename T>
136 int GetBatch(int num_bits, T* v, int batch_size);
137
138 /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T
139 /// needs to be a little-endian native type and big enough to store
140 /// 'num_bytes'. The value is assumed to be byte-aligned so the stream will
141 /// be advanced to the start of the next byte before 'v' is read. Returns
142 /// false if there are not enough bytes left.
143 template <typename T>
144 bool GetAligned(int num_bytes, T* v);
145
146 /// Reads a vlq encoded int from the stream. The encoded int must start at
147 /// the beginning of a byte. Return false if there were not enough bytes in
148 /// the buffer.
149 bool GetVlqInt(int32_t* v);
150
151 // Reads a zigzag encoded int `into` v.
152 bool GetZigZagVlqInt(int32_t* v);
153
154 /// Returns the number of bytes left in the stream, not including the current
155 /// byte (i.e., there may be an additional fraction of a byte).
156 int bytes_left() {
157 return max_bytes_ -
158 (byte_offset_ + static_cast<int>(BitUtil::BytesForBits(bit_offset_)));
159 }
160
161 /// Maximum byte length of a vlq encoded int
162 static const int MAX_VLQ_BYTE_LEN = 5;
163
164 private:
165 const uint8_t* buffer_;
166 int max_bytes_;
167
168 /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
169 /// faster than reading values byte by byte directly from buffer_.
170 uint64_t buffered_values_;
171
172 int byte_offset_; // Offset in buffer_
173 int bit_offset_; // Offset in buffered_values_
174};
175
176inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
177 // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases)
178 DCHECK_LE(num_bits, 32);
179 DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
180
181 if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8))
182 return false;
183
184 buffered_values_ |= v << bit_offset_;
185 bit_offset_ += num_bits;
186
187 if (ARROW_PREDICT_FALSE(bit_offset_ >= 64)) {
188 // Flush buffered_values_ and write out bits of v that did not fit
189 memcpy(buffer_ + byte_offset_, &buffered_values_, 8);
190 buffered_values_ = 0;
191 byte_offset_ += 8;
192 bit_offset_ -= 64;
193 buffered_values_ = v >> (num_bits - bit_offset_);
194 }
195 DCHECK_LT(bit_offset_, 64);
196 return true;
197}
198
199inline void BitWriter::Flush(bool align) {
200 int num_bytes = static_cast<int>(BitUtil::BytesForBits(bit_offset_));
201 DCHECK_LE(byte_offset_ + num_bytes, max_bytes_);
202 memcpy(buffer_ + byte_offset_, &buffered_values_, num_bytes);
203
204 if (align) {
205 buffered_values_ = 0;
206 byte_offset_ += num_bytes;
207 bit_offset_ = 0;
208 }
209}
210
211inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) {
212 Flush(/* align */ true);
213 DCHECK_LE(byte_offset_, max_bytes_);
214 if (byte_offset_ + num_bytes > max_bytes_) return NULL;
215 uint8_t* ptr = buffer_ + byte_offset_;
216 byte_offset_ += num_bytes;
217 return ptr;
218}
219
220template <typename T>
221inline bool BitWriter::PutAligned(T val, int num_bytes) {
222 uint8_t* ptr = GetNextBytePtr(num_bytes);
223 if (ptr == NULL) return false;
224 memcpy(ptr, &val, num_bytes);
225 return true;
226}
227
228inline bool BitWriter::PutVlqInt(uint32_t v) {
229 bool result = true;
230 while ((v & 0xFFFFFF80) != 0L) {
231 result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
232 v >>= 7;
233 }
234 result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
235 return result;
236}
237
238namespace detail {
239
240template <typename T>
241inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
242 int* bit_offset, int* byte_offset, uint64_t* buffered_values) {
243#ifdef _MSC_VER
244#pragma warning(push)
245#pragma warning(disable : 4800)
246#endif
247 *v = static_cast<T>(BitUtil::TrailingBits(*buffered_values, *bit_offset + num_bits) >>
248 *bit_offset);
249#ifdef _MSC_VER
250#pragma warning(pop)
251#endif
252 *bit_offset += num_bits;
253 if (*bit_offset >= 64) {
254 *byte_offset += 8;
255 *bit_offset -= 64;
256
257 int bytes_remaining = max_bytes - *byte_offset;
258 if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
259 memcpy(buffered_values, buffer + *byte_offset, 8);
260 } else {
261 memcpy(buffered_values, buffer + *byte_offset, bytes_remaining);
262 }
263#ifdef _MSC_VER
264#pragma warning(push)
265#pragma warning(disable : 4800 4805)
266#endif
267 // Read bits of v that crossed into new buffered_values_
268 *v = *v | static_cast<T>(BitUtil::TrailingBits(*buffered_values, *bit_offset)
269 << (num_bits - *bit_offset));
270#ifdef _MSC_VER
271#pragma warning(pop)
272#endif
273 DCHECK_LE(*bit_offset, 64);
274 }
275}
276
277} // namespace detail
278
279template <typename T>
280inline bool BitReader::GetValue(int num_bits, T* v) {
281 return GetBatch(num_bits, v, 1) == 1;
282}
283
284template <typename T>
285inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
286 DCHECK(buffer_ != NULL);
287 // TODO: revisit this limit if necessary
288 DCHECK_LE(num_bits, 32);
289 DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));
290
291 int bit_offset = bit_offset_;
292 int byte_offset = byte_offset_;
293 uint64_t buffered_values = buffered_values_;
294 int max_bytes = max_bytes_;
295 const uint8_t* buffer = buffer_;
296
297 uint64_t needed_bits = num_bits * batch_size;
298 uint64_t remaining_bits = (max_bytes - byte_offset) * 8 - bit_offset;
299 if (remaining_bits < needed_bits) {
300 batch_size = static_cast<int>(remaining_bits) / num_bits;
301 }
302
303 int i = 0;
304 if (ARROW_PREDICT_FALSE(bit_offset != 0)) {
305 for (; i < batch_size && bit_offset != 0; ++i) {
306 detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
307 &buffered_values);
308 }
309 }
310
311 if (sizeof(T) == 4) {
312 int num_unpacked =
313 internal::unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
314 reinterpret_cast<uint32_t*>(v + i), batch_size - i, num_bits);
315 i += num_unpacked;
316 byte_offset += num_unpacked * num_bits / 8;
317 } else {
318 const int buffer_size = 1024;
319 uint32_t unpack_buffer[buffer_size];
320 while (i < batch_size) {
321 int unpack_size = std::min(buffer_size, batch_size - i);
322 int num_unpacked =
323 internal::unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
324 unpack_buffer, unpack_size, num_bits);
325 if (num_unpacked == 0) {
326 break;
327 }
328 for (int k = 0; k < num_unpacked; ++k) {
329#ifdef _MSC_VER
330#pragma warning(push)
331#pragma warning(disable : 4800)
332#endif
333 v[i + k] = static_cast<T>(unpack_buffer[k]);
334#ifdef _MSC_VER
335#pragma warning(pop)
336#endif
337 }
338 i += num_unpacked;
339 byte_offset += num_unpacked * num_bits / 8;
340 }
341 }
342
343 int bytes_remaining = max_bytes - byte_offset;
344 if (bytes_remaining >= 8) {
345 memcpy(&buffered_values, buffer + byte_offset, 8);
346 } else {
347 memcpy(&buffered_values, buffer + byte_offset, bytes_remaining);
348 }
349
350 for (; i < batch_size; ++i) {
351 detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
352 &buffered_values);
353 }
354
355 bit_offset_ = bit_offset;
356 byte_offset_ = byte_offset;
357 buffered_values_ = buffered_values;
358
359 return batch_size;
360}
361
362template <typename T>
363inline bool BitReader::GetAligned(int num_bytes, T* v) {
364 DCHECK_LE(num_bytes, static_cast<int>(sizeof(T)));
365 int bytes_read = static_cast<int>(BitUtil::BytesForBits(bit_offset_));
366 if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_read + num_bytes > max_bytes_))
367 return false;
368
369 // Advance byte_offset to next unread byte and read num_bytes
370 byte_offset_ += bytes_read;
371 memcpy(v, buffer_ + byte_offset_, num_bytes);
372 byte_offset_ += num_bytes;
373
374 // Reset buffered_values_
375 bit_offset_ = 0;
376 int bytes_remaining = max_bytes_ - byte_offset_;
377 if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
378 memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
379 } else {
380 memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
381 }
382 return true;
383}
384
385inline bool BitReader::GetVlqInt(int32_t* v) {
386 *v = 0;
387 int shift = 0;
388 int num_bytes = 0;
389 uint8_t byte = 0;
390 do {
391 if (!GetAligned<uint8_t>(1, &byte)) return false;
392 *v |= (byte & 0x7F) << shift;
393 shift += 7;
394 DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
395 } while ((byte & 0x80) != 0);
396 return true;
397}
398
399inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
400 // Note negative left shift is undefined
401 uint32_t u = (static_cast<uint32_t>(v) << 1) ^ (v >> 31);
402 return PutVlqInt(u);
403}
404
405inline bool BitReader::GetZigZagVlqInt(int32_t* v) {
406 int32_t u_signed;
407 if (!GetVlqInt(&u_signed)) return false;
408 uint32_t u = static_cast<uint32_t>(u_signed);
409 *reinterpret_cast<uint32_t*>(v) = (u >> 1) ^ -(static_cast<int32_t>(u & 1));
410 return true;
411}
412
413} // namespace BitUtil
414} // namespace arrow
415
416#endif // ARROW_UTIL_BIT_STREAM_UTILS_H
417