1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_ENCODING_INTERNAL_H
19#define PARQUET_ENCODING_INTERNAL_H
20
21#include <algorithm>
22#include <cstdint>
23#include <limits>
24#include <memory>
25#include <utility>
26#include <vector>
27
28#include "arrow/util/bit-stream-utils.h"
29#include "arrow/util/bit-util.h"
30#include "arrow/util/hashing.h"
31#include "arrow/util/macros.h"
32#include "arrow/util/rle-encoding.h"
33
34#include "parquet/encoding.h"
35#include "parquet/exception.h"
36#include "parquet/schema.h"
37#include "parquet/types.h"
38#include "parquet/util/memory.h"
39
40namespace parquet {
41
42namespace BitUtil = ::arrow::BitUtil;
43
44class ColumnDescriptor;
45
46// ----------------------------------------------------------------------
47// Encoding::PLAIN decoder implementation
48
49template <typename DType>
50class PlainDecoder : public Decoder<DType> {
51 public:
52 typedef typename DType::c_type T;
53 using Decoder<DType>::num_values_;
54
55 explicit PlainDecoder(const ColumnDescriptor* descr)
56 : Decoder<DType>(descr, Encoding::PLAIN), data_(nullptr), len_(0) {
57 if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
58 type_length_ = descr_->type_length();
59 } else {
60 type_length_ = -1;
61 }
62 }
63
64 virtual void SetData(int num_values, const uint8_t* data, int len) {
65 num_values_ = num_values;
66 data_ = data;
67 len_ = len;
68 }
69
70 virtual int Decode(T* buffer, int max_values);
71
72 private:
73 using Decoder<DType>::descr_;
74 const uint8_t* data_;
75 int len_;
76 int type_length_;
77};
78
79// Decode routine templated on C++ type rather than type enum
80template <typename T>
81inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
82 int type_length, T* out) {
83 int bytes_to_decode = num_values * static_cast<int>(sizeof(T));
84 if (data_size < bytes_to_decode) {
85 ParquetException::EofException();
86 }
87 // If bytes_to_decode == 0, data could be null
88 if (bytes_to_decode > 0) {
89 memcpy(out, data, bytes_to_decode);
90 }
91 return bytes_to_decode;
92}
93
94// Template specialization for BYTE_ARRAY. The written values do not own their
95// own data.
96template <>
97inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
98 int type_length, ByteArray* out) {
99 int bytes_decoded = 0;
100 int increment;
101 for (int i = 0; i < num_values; ++i) {
102 uint32_t len = out[i].len = *reinterpret_cast<const uint32_t*>(data);
103 increment = static_cast<int>(sizeof(uint32_t) + len);
104 if (data_size < increment) ParquetException::EofException();
105 out[i].ptr = data + sizeof(uint32_t);
106 data += increment;
107 data_size -= increment;
108 bytes_decoded += increment;
109 }
110 return bytes_decoded;
111}
112
113// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
114// own their own data.
115template <>
116inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
117 int num_values, int type_length,
118 FixedLenByteArray* out) {
119 int bytes_to_decode = type_length * num_values;
120 if (data_size < bytes_to_decode) {
121 ParquetException::EofException();
122 }
123 for (int i = 0; i < num_values; ++i) {
124 out[i].ptr = data;
125 data += type_length;
126 data_size -= type_length;
127 }
128 return bytes_to_decode;
129}
130
131template <typename DType>
132inline int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
133 max_values = std::min(max_values, num_values_);
134 int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
135 data_ += bytes_consumed;
136 len_ -= bytes_consumed;
137 num_values_ -= max_values;
138 return max_values;
139}
140
141template <>
142class PlainDecoder<BooleanType> : public Decoder<BooleanType> {
143 public:
144 explicit PlainDecoder(const ColumnDescriptor* descr)
145 : Decoder<BooleanType>(descr, Encoding::PLAIN) {}
146
147 virtual void SetData(int num_values, const uint8_t* data, int len) {
148 num_values_ = num_values;
149 bit_reader_ = BitUtil::BitReader(data, len);
150 }
151
152 // Two flavors of bool decoding
153 int Decode(uint8_t* buffer, int max_values) {
154 max_values = std::min(max_values, num_values_);
155 bool val;
156 ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
157 for (int i = 0; i < max_values; ++i) {
158 if (!bit_reader_.GetValue(1, &val)) {
159 ParquetException::EofException();
160 }
161 if (val) {
162 bit_writer.Set();
163 }
164 bit_writer.Next();
165 }
166 bit_writer.Finish();
167 num_values_ -= max_values;
168 return max_values;
169 }
170
171 virtual int Decode(bool* buffer, int max_values) {
172 max_values = std::min(max_values, num_values_);
173 if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) {
174 ParquetException::EofException();
175 }
176 num_values_ -= max_values;
177 return max_values;
178 }
179
180 private:
181 BitUtil::BitReader bit_reader_;
182};
183
184// ----------------------------------------------------------------------
185// Encoding::PLAIN encoder implementation
186
187template <typename DType>
188class PlainEncoder : public Encoder<DType> {
189 public:
190 typedef typename DType::c_type T;
191
192 explicit PlainEncoder(const ColumnDescriptor* descr,
193 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
194 : Encoder<DType>(descr, Encoding::PLAIN, pool) {
195 values_sink_.reset(new InMemoryOutputStream(pool));
196 }
197
198 int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
199
200 std::shared_ptr<Buffer> FlushValues() override;
201 void Put(const T* src, int num_values) override;
202
203 protected:
204 std::unique_ptr<InMemoryOutputStream> values_sink_;
205};
206
207template <>
208class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
209 public:
210 explicit PlainEncoder(const ColumnDescriptor* descr,
211 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
212 : Encoder<BooleanType>(descr, Encoding::PLAIN, pool),
213 bits_available_(kInMemoryDefaultCapacity * 8),
214 bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
215 values_sink_(new InMemoryOutputStream(pool)) {
216 bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(),
217 static_cast<int>(bits_buffer_->size())));
218 }
219
220 int64_t EstimatedDataEncodedSize() override {
221 return values_sink_->Tell() + bit_writer_->bytes_written();
222 }
223
224 std::shared_ptr<Buffer> FlushValues() override {
225 if (bits_available_ > 0) {
226 bit_writer_->Flush();
227 values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
228 bit_writer_->Clear();
229 bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
230 }
231
232 std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
233 values_sink_.reset(new InMemoryOutputStream(this->pool_));
234 return buffer;
235 }
236
237#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \
238 void Put(input_type src, int num_values) function_attributes { \
239 int bit_offset = 0; \
240 if (bits_available_ > 0) { \
241 int bits_to_write = std::min(bits_available_, num_values); \
242 for (int i = 0; i < bits_to_write; i++) { \
243 bit_writer_->PutValue(src[i], 1); \
244 } \
245 bits_available_ -= bits_to_write; \
246 bit_offset = bits_to_write; \
247 \
248 if (bits_available_ == 0) { \
249 bit_writer_->Flush(); \
250 values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \
251 bit_writer_->Clear(); \
252 } \
253 } \
254 \
255 int bits_remaining = num_values - bit_offset; \
256 while (bit_offset < num_values) { \
257 bits_available_ = static_cast<int>(bits_buffer_->size()) * 8; \
258 \
259 int bits_to_write = std::min(bits_available_, bits_remaining); \
260 for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \
261 bit_writer_->PutValue(src[i], 1); \
262 } \
263 bit_offset += bits_to_write; \
264 bits_available_ -= bits_to_write; \
265 bits_remaining -= bits_to_write; \
266 \
267 if (bits_available_ == 0) { \
268 bit_writer_->Flush(); \
269 values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \
270 bit_writer_->Clear(); \
271 } \
272 } \
273 }
274
275 PLAINDECODER_BOOLEAN_PUT(const bool*, override)
276 PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, )
277
278 protected:
279 int bits_available_;
280 std::unique_ptr<BitUtil::BitWriter> bit_writer_;
281 std::shared_ptr<ResizableBuffer> bits_buffer_;
282 std::unique_ptr<InMemoryOutputStream> values_sink_;
283};
284
285template <typename DType>
286inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() {
287 std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
288 values_sink_.reset(new InMemoryOutputStream(this->pool_));
289 return buffer;
290}
291
292template <typename DType>
293inline void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
294 values_sink_->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T));
295}
296
297template <>
298inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
299 for (int i = 0; i < num_values; ++i) {
300 // Write the result to the output stream
301 values_sink_->Write(reinterpret_cast<const uint8_t*>(&src[i].len), sizeof(uint32_t));
302 if (src[i].len > 0) {
303 DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL";
304 }
305 values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), src[i].len);
306 }
307}
308
309template <>
310inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
311 for (int i = 0; i < num_values; ++i) {
312 // Write the result to the output stream
313 if (descr_->type_length() > 0) {
314 DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL";
315 }
316 values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr),
317 descr_->type_length());
318 }
319}
320
321// ----------------------------------------------------------------------
322// Dictionary encoding and decoding
323
324template <typename Type>
325class DictionaryDecoder : public Decoder<Type> {
326 public:
327 typedef typename Type::c_type T;
328
329 // Initializes the dictionary with values from 'dictionary'. The data in
330 // dictionary is not guaranteed to persist in memory after this call so the
331 // dictionary decoder needs to copy the data out if necessary.
332 explicit DictionaryDecoder(const ColumnDescriptor* descr,
333 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
334 : Decoder<Type>(descr, Encoding::RLE_DICTIONARY),
335 dictionary_(0, pool),
336 byte_array_data_(AllocateBuffer(pool, 0)) {}
337
338 // Perform type-specific initiatialization
339 void SetDict(Decoder<Type>* dictionary);
340
341 void SetData(int num_values, const uint8_t* data, int len) override {
342 num_values_ = num_values;
343 if (len == 0) return;
344 uint8_t bit_width = *data;
345 ++data;
346 --len;
347 idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width);
348 }
349
350 int Decode(T* buffer, int max_values) override {
351 max_values = std::min(max_values, num_values_);
352 int decoded_values =
353 idx_decoder_.GetBatchWithDict(dictionary_.data(), buffer, max_values);
354 if (decoded_values != max_values) {
355 ParquetException::EofException();
356 }
357 num_values_ -= max_values;
358 return max_values;
359 }
360
361 int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
362 int64_t valid_bits_offset) override {
363 int decoded_values =
364 idx_decoder_.GetBatchWithDictSpaced(dictionary_.data(), buffer, num_values,
365 null_count, valid_bits, valid_bits_offset);
366 if (decoded_values != num_values) {
367 ParquetException::EofException();
368 }
369 return decoded_values;
370 }
371
372 private:
373 using Decoder<Type>::num_values_;
374
375 // Only one is set.
376 Vector<T> dictionary_;
377
378 // Data that contains the byte array data (byte_array_dictionary_ just has the
379 // pointers).
380 std::shared_ptr<ResizableBuffer> byte_array_data_;
381
382 ::arrow::util::RleDecoder idx_decoder_;
383};
384
385template <typename Type>
386inline void DictionaryDecoder<Type>::SetDict(Decoder<Type>* dictionary) {
387 int num_dictionary_values = dictionary->values_left();
388 dictionary_.Resize(num_dictionary_values);
389 dictionary->Decode(dictionary_.data(), num_dictionary_values);
390}
391
392template <>
393inline void DictionaryDecoder<BooleanType>::SetDict(Decoder<BooleanType>* dictionary) {
394 ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
395}
396
397template <>
398inline void DictionaryDecoder<ByteArrayType>::SetDict(
399 Decoder<ByteArrayType>* dictionary) {
400 int num_dictionary_values = dictionary->values_left();
401 dictionary_.Resize(num_dictionary_values);
402 dictionary->Decode(&dictionary_[0], num_dictionary_values);
403
404 int total_size = 0;
405 for (int i = 0; i < num_dictionary_values; ++i) {
406 total_size += dictionary_[i].len;
407 }
408 if (total_size > 0) {
409 PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
410 }
411
412 int offset = 0;
413 uint8_t* bytes_data = byte_array_data_->mutable_data();
414 for (int i = 0; i < num_dictionary_values; ++i) {
415 memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len);
416 dictionary_[i].ptr = bytes_data + offset;
417 offset += dictionary_[i].len;
418 }
419}
420
421template <>
422inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary) {
423 int num_dictionary_values = dictionary->values_left();
424 dictionary_.Resize(num_dictionary_values);
425 dictionary->Decode(&dictionary_[0], num_dictionary_values);
426
427 int fixed_len = descr_->type_length();
428 int total_size = num_dictionary_values * fixed_len;
429
430 PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
431 uint8_t* bytes_data = byte_array_data_->mutable_data();
432 for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) {
433 memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len);
434 dictionary_[i].ptr = bytes_data + offset;
435 }
436}
437
438// ----------------------------------------------------------------------
439// Dictionary encoder
440
441template <typename DType>
442struct DictEncoderTraits {
443 using c_type = typename DType::c_type;
444 using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>;
445};
446
447template <>
448struct DictEncoderTraits<ByteArrayType> {
449 using MemoTableType = ::arrow::internal::BinaryMemoTable;
450};
451
452template <>
453struct DictEncoderTraits<FLBAType> {
454 using MemoTableType = ::arrow::internal::BinaryMemoTable;
455};
456
457// Initially 1024 elements
458static constexpr int32_t INITIAL_HASH_TABLE_SIZE = 1 << 10;
459
460/// See the dictionary encoding section of https://github.com/Parquet/parquet-format.
461/// The encoding supports streaming encoding. Values are encoded as they are added while
462/// the dictionary is being constructed. At any time, the buffered values can be
463/// written out with the current dictionary size. More values can then be added to
464/// the encoder, including new dictionary entries.
465template <typename DType>
466class DictEncoder : public Encoder<DType> {
467 using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
468
469 public:
470 typedef typename DType::c_type T;
471
472 explicit DictEncoder(const ColumnDescriptor* desc,
473 ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool())
474 : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
475 allocator_(allocator),
476 dict_encoded_size_(0),
477 type_length_(desc->type_length()),
478 memo_table_(INITIAL_HASH_TABLE_SIZE) {}
479
480 ~DictEncoder() override { DCHECK(buffered_indices_.empty()); }
481
482 void set_type_length(int type_length) { type_length_ = type_length; }
483
484 /// Returns a conservative estimate of the number of bytes needed to encode the buffered
485 /// indices. Used to size the buffer passed to WriteIndices().
486 int64_t EstimatedDataEncodedSize() override {
487 // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
488 // reserve
489 // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
490 // but not reserving them would cause the encoder to fail.
491 return 1 +
492 ::arrow::util::RleEncoder::MaxBufferSize(
493 bit_width(), static_cast<int>(buffered_indices_.size())) +
494 ::arrow::util::RleEncoder::MinBufferSize(bit_width());
495 }
496
497 /// The minimum bit width required to encode the currently buffered indices.
498 int bit_width() const {
499 if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
500 if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
501 return BitUtil::Log2(num_entries());
502 }
503
504 /// Writes out any buffered indices to buffer preceded by the bit width of this data.
505 /// Returns the number of bytes written.
506 /// If the supplied buffer is not big enough, returns -1.
507 /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize()
508 /// to size buffer.
509 int WriteIndices(uint8_t* buffer, int buffer_len);
510
511 int dict_encoded_size() { return dict_encoded_size_; }
512
513 /// Encode value. Note that this does not actually write any data, just
514 /// buffers the value's index to be written later.
515 inline void Put(const T& value);
516 void Put(const T* values, int num_values) override;
517
518 std::shared_ptr<Buffer> FlushValues() override {
519 std::shared_ptr<ResizableBuffer> buffer =
520 AllocateBuffer(this->allocator_, EstimatedDataEncodedSize());
521 int result_size = WriteIndices(buffer->mutable_data(),
522 static_cast<int>(EstimatedDataEncodedSize()));
523 PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
524 return std::move(buffer);
525 }
526
527 void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
528 int64_t valid_bits_offset) override {
529 ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
530 num_values);
531 for (int32_t i = 0; i < num_values; i++) {
532 if (valid_bits_reader.IsSet()) {
533 Put(src[i]);
534 }
535 valid_bits_reader.Next();
536 }
537 }
538
539 /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
540 /// dict_encoded_size() bytes.
541 void WriteDict(uint8_t* buffer);
542
543 /// The number of entries in the dictionary.
544 int num_entries() const { return memo_table_.size(); }
545
546 private:
547 /// Clears all the indices (but leaves the dictionary).
548 void ClearIndices() { buffered_indices_.clear(); }
549
550 ::arrow::MemoryPool* allocator_;
551
552 /// Indices that have not yet be written out by WriteIndices().
553 std::vector<int> buffered_indices_;
554
555 /// The number of bytes needed to encode the dictionary.
556 int dict_encoded_size_;
557
558 /// Size of each encoded dictionary value. -1 for variable-length types.
559 int type_length_;
560
561 MemoTableType memo_table_;
562};
563
564template <typename DType>
565void DictEncoder<DType>::Put(const T* src, int num_values) {
566 for (int32_t i = 0; i < num_values; i++) {
567 Put(src[i]);
568 }
569}
570
571template <typename DType>
572inline void DictEncoder<DType>::Put(const T& v) {
573 // Put() implementation for primitive types
574 auto on_found = [](int32_t memo_index) {};
575 auto on_not_found = [this](int32_t memo_index) {
576 dict_encoded_size_ += static_cast<int>(sizeof(T));
577 };
578
579 auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found);
580 buffered_indices_.push_back(memo_index);
581}
582
583template <>
584inline void DictEncoder<ByteArrayType>::Put(const ByteArray& v) {
585 static const uint8_t empty[] = {0};
586
587 auto on_found = [](int32_t memo_index) {};
588 auto on_not_found = [&](int32_t memo_index) {
589 dict_encoded_size_ += static_cast<int>(v.len + sizeof(uint32_t));
590 };
591
592 DCHECK(v.ptr != nullptr || v.len == 0);
593 const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
594 auto memo_index =
595 memo_table_.GetOrInsert(ptr, static_cast<int32_t>(v.len), on_found, on_not_found);
596 buffered_indices_.push_back(memo_index);
597}
598
599template <>
600inline void DictEncoder<FLBAType>::Put(const FixedLenByteArray& v) {
601 static const uint8_t empty[] = {0};
602
603 auto on_found = [](int32_t memo_index) {};
604 auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
605
606 DCHECK(v.ptr != nullptr || type_length_ == 0);
607 const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
608 auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found);
609 buffered_indices_.push_back(memo_index);
610}
611
612template <typename DType>
613inline void DictEncoder<DType>::WriteDict(uint8_t* buffer) {
614 // For primitive types, only a memcpy
615 DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
616 memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
617}
618
619// ByteArray and FLBA already have the dictionary encoded in their data heaps
620template <>
621inline void DictEncoder<ByteArrayType>::WriteDict(uint8_t* buffer) {
622 memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) {
623 uint32_t len = static_cast<uint32_t>(v.length());
624 memcpy(buffer, &len, sizeof(uint32_t));
625 buffer += sizeof(uint32_t);
626 memcpy(buffer, v.data(), v.length());
627 buffer += v.length();
628 });
629}
630
631template <>
632inline void DictEncoder<FLBAType>::WriteDict(uint8_t* buffer) {
633 memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) {
634 DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
635 memcpy(buffer, v.data(), type_length_);
636 buffer += type_length_;
637 });
638}
639
640template <typename DType>
641inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) {
642 // Write bit width in first byte
643 *buffer = static_cast<uint8_t>(bit_width());
644 ++buffer;
645 --buffer_len;
646
647 ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
648 for (int index : buffered_indices_) {
649 if (!encoder.Put(index)) return -1;
650 }
651 encoder.Flush();
652
653 ClearIndices();
654 return 1 + encoder.len();
655}
656
657// ----------------------------------------------------------------------
658// DeltaBitPackDecoder
659
660template <typename DType>
661class DeltaBitPackDecoder : public Decoder<DType> {
662 public:
663 typedef typename DType::c_type T;
664
665 explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
666 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
667 : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
668 if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
669 throw ParquetException("Delta bit pack encoding should only be for integer data.");
670 }
671 }
672
673 virtual void SetData(int num_values, const uint8_t* data, int len) {
674 num_values_ = num_values;
675 decoder_ = BitUtil::BitReader(data, len);
676 values_current_block_ = 0;
677 values_current_mini_block_ = 0;
678 }
679
680 virtual int Decode(T* buffer, int max_values) {
681 return GetInternal(buffer, max_values);
682 }
683
684 private:
685 using Decoder<DType>::num_values_;
686
687 void InitBlock() {
688 int32_t block_size;
689 if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
690 if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
691 if (!decoder_.GetVlqInt(&values_current_block_)) {
692 ParquetException::EofException();
693 }
694 if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
695
696 delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
697 uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
698
699 if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
700 for (int i = 0; i < num_mini_blocks_; ++i) {
701 if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
702 ParquetException::EofException();
703 }
704 }
705 values_per_mini_block_ = block_size / num_mini_blocks_;
706 mini_block_idx_ = 0;
707 delta_bit_width_ = bit_width_data[0];
708 values_current_mini_block_ = values_per_mini_block_;
709 }
710
711 template <typename T>
712 int GetInternal(T* buffer, int max_values) {
713 max_values = std::min(max_values, num_values_);
714 const uint8_t* bit_width_data = delta_bit_widths_->data();
715 for (int i = 0; i < max_values; ++i) {
716 if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
717 ++mini_block_idx_;
718 if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
719 delta_bit_width_ = bit_width_data[mini_block_idx_];
720 values_current_mini_block_ = values_per_mini_block_;
721 } else {
722 InitBlock();
723 buffer[i] = last_value_;
724 continue;
725 }
726 }
727
728 // TODO: the key to this algorithm is to decode the entire miniblock at once.
729 int64_t delta;
730 if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
731 delta += min_delta_;
732 last_value_ += static_cast<int32_t>(delta);
733 buffer[i] = last_value_;
734 --values_current_mini_block_;
735 }
736 num_values_ -= max_values;
737 return max_values;
738 }
739
740 ::arrow::MemoryPool* pool_;
741 BitUtil::BitReader decoder_;
742 int32_t values_current_block_;
743 int32_t num_mini_blocks_;
744 uint64_t values_per_mini_block_;
745 uint64_t values_current_mini_block_;
746
747 int32_t min_delta_;
748 size_t mini_block_idx_;
749 std::shared_ptr<ResizableBuffer> delta_bit_widths_;
750 int delta_bit_width_;
751
752 int32_t last_value_;
753};
754
755// ----------------------------------------------------------------------
756// DELTA_LENGTH_BYTE_ARRAY
757
758class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> {
759 public:
760 explicit DeltaLengthByteArrayDecoder(
761 const ColumnDescriptor* descr,
762 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
763 : Decoder<ByteArrayType>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
764 len_decoder_(nullptr, pool) {}
765
766 virtual void SetData(int num_values, const uint8_t* data, int len) {
767 num_values_ = num_values;
768 if (len == 0) return;
769 int total_lengths_len = *reinterpret_cast<const int*>(data);
770 data += 4;
771 len_decoder_.SetData(num_values, data, total_lengths_len);
772 data_ = data + total_lengths_len;
773 len_ = len - 4 - total_lengths_len;
774 }
775
776 virtual int Decode(ByteArray* buffer, int max_values) {
777 max_values = std::min(max_values, num_values_);
778 std::vector<int> lengths(max_values);
779 len_decoder_.Decode(lengths.data(), max_values);
780 for (int i = 0; i < max_values; ++i) {
781 buffer[i].len = lengths[i];
782 buffer[i].ptr = data_;
783 data_ += lengths[i];
784 len_ -= lengths[i];
785 }
786 num_values_ -= max_values;
787 return max_values;
788 }
789
790 private:
791 using Decoder<ByteArrayType>::num_values_;
792 DeltaBitPackDecoder<Int32Type> len_decoder_;
793 const uint8_t* data_;
794 int len_;
795};
796
797// ----------------------------------------------------------------------
798// DELTA_BYTE_ARRAY
799
800class DeltaByteArrayDecoder : public Decoder<ByteArrayType> {
801 public:
802 explicit DeltaByteArrayDecoder(
803 const ColumnDescriptor* descr,
804 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
805 : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY),
806 prefix_len_decoder_(nullptr, pool),
807 suffix_decoder_(nullptr, pool),
808 last_value_(0, nullptr) {}
809
810 virtual void SetData(int num_values, const uint8_t* data, int len) {
811 num_values_ = num_values;
812 if (len == 0) return;
813 int prefix_len_length = *reinterpret_cast<const int*>(data);
814 data += 4;
815 len -= 4;
816 prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
817 data += prefix_len_length;
818 len -= prefix_len_length;
819 suffix_decoder_.SetData(num_values, data, len);
820 }
821
822 // TODO: this doesn't work and requires memory management. We need to allocate
823 // new strings to store the results.
824 virtual int Decode(ByteArray* buffer, int max_values) {
825 max_values = std::min(max_values, num_values_);
826 for (int i = 0; i < max_values; ++i) {
827 int prefix_len = 0;
828 prefix_len_decoder_.Decode(&prefix_len, 1);
829 ByteArray suffix = {0, nullptr};
830 suffix_decoder_.Decode(&suffix, 1);
831 buffer[i].len = prefix_len + suffix.len;
832
833 uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
834 memcpy(result, last_value_.ptr, prefix_len);
835 memcpy(result + prefix_len, suffix.ptr, suffix.len);
836
837 buffer[i].ptr = result;
838 last_value_ = buffer[i];
839 }
840 num_values_ -= max_values;
841 return max_values;
842 }
843
844 private:
845 using Decoder<ByteArrayType>::num_values_;
846
847 DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
848 DeltaLengthByteArrayDecoder suffix_decoder_;
849 ByteArray last_value_;
850};
851
852} // namespace parquet
853
854#endif // PARQUET_ENCODING_INTERNAL_H
855